Skip to content

Commit 3df4c5b

Browse files
committed
async configuration & reader factory method
1 parent 9c309e2 commit 3df4c5b

File tree

2 files changed

+69
-20
lines changed

2 files changed

+69
-20
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import com.clickhouse.client.api.command.CommandResponse;
88
import com.clickhouse.client.api.command.CommandSettings;
99
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
10+
import com.clickhouse.client.api.data_formats.NativeFormatReader;
11+
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
1012
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
13+
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
1114
import com.clickhouse.client.api.data_formats.internal.MapBackedRecord;
1215
import com.clickhouse.client.api.data_formats.internal.ProcessParser;
1316
import com.clickhouse.client.api.enums.Protocol;
@@ -36,14 +39,11 @@
3639
import com.clickhouse.client.config.ClickHouseClientOption;
3740
import com.clickhouse.client.config.ClickHouseDefaults;
3841
import com.clickhouse.client.http.ClickHouseHttpProto;
39-
import com.clickhouse.client.http.config.ClickHouseHttpOption;
4042
import com.clickhouse.data.ClickHouseColumn;
4143
import com.clickhouse.data.ClickHouseDataStreamFactory;
4244
import com.clickhouse.data.ClickHouseFormat;
4345
import com.clickhouse.data.ClickHousePipedOutputStream;
4446
import com.clickhouse.data.format.BinaryStreamUtils;
45-
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
46-
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
4747
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
4848
import org.apache.hc.core5.http.ClassicHttpResponse;
4949
import org.apache.hc.core5.http.HttpStatus;
@@ -78,6 +78,7 @@
7878
import java.util.concurrent.Executors;
7979
import java.util.concurrent.TimeUnit;
8080
import java.util.concurrent.TimeoutException;
81+
import java.util.function.Supplier;
8182

8283
import static java.time.temporal.ChronoUnit.SECONDS;
8384

@@ -680,6 +681,10 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
680681
userConfig.put(ClickHouseClientOption.SERVER_TIME_ZONE.getKey(), "UTC");
681682
}
682683

684+
if (!userConfig.containsKey(ClickHouseClientOption.ASYNC.getKey())) {
685+
userConfig.put(ClickHouseClientOption.ASYNC.getKey(), "false");
686+
}
687+
683688
return userConfig;
684689
}
685690
}
@@ -850,7 +855,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
850855

851856
settings.setOption(ClickHouseClientOption.FORMAT.getKey(), format.name());
852857
final InsertSettings finalSettings = settings;
853-
CompletableFuture<InsertResponse> future = CompletableFuture.supplyAsync(() -> {
858+
Supplier<InsertResponse> supplier = () -> {
854859
// Selecting some node
855860
ClickHouseNode selectedNode = getNextAliveNode();
856861

@@ -902,8 +907,9 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
902907
}
903908
}
904909
throw new ClientException("Failed to get table schema: too many retries");
905-
}, sharedOperationExecutor);
906-
return future;
910+
};
911+
boolean isAsync = MapUtils.getFlag(configuration, settings.getAllSettings(), ClickHouseClientOption.ASYNC.getKey());
912+
return isAsync ? CompletableFuture.supplyAsync(supplier, sharedOperationExecutor) : CompletableFuture.completedFuture(supplier.get());
907913
} else {
908914
//Create an output stream to write the data to
909915
ByteArrayOutputStream stream = new ByteArrayOutputStream();
@@ -972,7 +978,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
972978

973979
settings.setOption(ClickHouseClientOption.FORMAT.getKey(), format.name());
974980
final InsertSettings finalSettings = settings;
975-
CompletableFuture<InsertResponse> future = CompletableFuture.supplyAsync(() -> {
981+
Supplier<InsertResponse> supplier = () -> {
976982
// Selecting some node
977983
ClickHouseNode selectedNode = getNextAliveNode();
978984

@@ -1029,8 +1035,9 @@ public CompletableFuture<InsertResponse> insert(String tableName,
10291035
}
10301036
}
10311037
throw new ClientException("Failed to insert data: too many retries");
1032-
}, sharedOperationExecutor);
1033-
return future;
1038+
};
1039+
boolean isAsync = MapUtils.getFlag(configuration, settings.getAllSettings(), ClickHouseClientOption.ASYNC.getKey());
1040+
return isAsync ? CompletableFuture.supplyAsync(supplier, sharedOperationExecutor) : CompletableFuture.completedFuture(supplier.get());
10341041
} else {
10351042
CompletableFuture<InsertResponse> responseFuture = new CompletableFuture<>();
10361043

@@ -1145,7 +1152,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11451152
settings.setOption("statement_params", queryParams);
11461153
}
11471154
final QuerySettings finalSettings = settings;
1148-
CompletableFuture<QueryResponse> future = CompletableFuture.supplyAsync(() -> {
1155+
Supplier<QueryResponse> supplier = () -> {
11491156
// Selecting some node
11501157
ClickHouseNode selectedNode = getNextAliveNode();
11511158
for (int i = 0; i <= maxRetries; i++) {
@@ -1180,8 +1187,9 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11801187
}
11811188
}
11821189
throw new ClientException("Failed to get table schema: too many retries");
1183-
}, sharedOperationExecutor);
1184-
return future;
1190+
};
1191+
boolean isAsync = MapUtils.getFlag(configuration, settings.getAllSettings(), ClickHouseClientOption.ASYNC.getKey());
1192+
return isAsync ? CompletableFuture.supplyAsync(supplier, sharedOperationExecutor) : CompletableFuture.completedFuture(supplier.get());
11851193
} else {
11861194
ClickHouseRequest<?> request = oldClient.read(getServerNode());
11871195
request.options(SettingsConverter.toRequestOptions(settings.getAllSettings()));
@@ -1361,6 +1369,39 @@ public CompletableFuture<CommandResponse> execute(String sql) {
13611369
}, sharedOperationExecutor);
13621370
}
13631371

1372+
/**
1373+
* <p>Create an instance of {@link ClickHouseBinaryFormatReader} based on response. Table schema is option and only
1374+
* required for {@link ClickHouseFormat#RowBinaryWithNames}, {@link ClickHouseFormat#RowBinary}.
1375+
* Format {@link ClickHouseFormat#RowBinaryWithDefaults} is not supported for output (read operations).</p>
1376+
* @param response
1377+
* @param schema
1378+
* @return
1379+
*/
1380+
public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
1381+
ClickHouseBinaryFormatReader reader = null;
1382+
switch (response.getFormat()) {
1383+
case Native:
1384+
reader = new NativeFormatReader(response.getInputStream(), response.getSettings());
1385+
break;
1386+
case RowBinaryWithNamesAndTypes:
1387+
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
1388+
break;
1389+
case RowBinaryWithNames:
1390+
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema);
1391+
break;
1392+
case RowBinary:
1393+
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema);
1394+
break;
1395+
default:
1396+
throw new IllegalArgumentException("Unsupported format: " + response.getFormat());
1397+
}
1398+
return reader;
1399+
}
1400+
1401+
public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
1402+
return newBinaryFormatReader(response, null);
1403+
}
1404+
13641405
private String startOperation() {
13651406
String operationId = UUID.randomUUID().toString();
13661407
globalClientStats.put(operationId, new ClientStatisticsHolder());

client-v2/src/main/java/com/clickhouse/client/api/internal/MapUtils.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,28 @@ public static boolean getFlag(Map<String, String> map, String key) {
5656
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
5757
}
5858

59-
public static boolean getFlag(Map<String, String> p1, Map<String, String> p2, String key) {
60-
String val = p1.get(key);
59+
public static boolean getFlag(Map<String, ?> p1, Map<String, ?> p2, String key) {
60+
Object val = p1.get(key);
6161
if (val == null) {
6262
val = p2.get(key);
6363
}
6464
if (val == null) {
6565
throw new NullPointerException("Missing value for the key '" + key + "'");
6666
}
67-
if (val.equalsIgnoreCase("true")) {
68-
return true;
69-
} else if (val.equalsIgnoreCase("false")) {
70-
return false;
71-
}
7267

73-
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
68+
if (val instanceof Boolean) {
69+
return (Boolean) val;
70+
} else if (val instanceof String) {
71+
String str = (String) val;
72+
if (str.equalsIgnoreCase("true")) {
73+
return true;
74+
} else if (str.equalsIgnoreCase("false")) {
75+
return false;
76+
} else {
77+
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
78+
}
79+
} else {
80+
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
81+
}
7482
}
7583
}

0 commit comments

Comments
 (0)