Skip to content

Commit 386958e

Browse files
committed
fixed NPE and added test
1 parent a5e9c34 commit 386958e

File tree

2 files changed

+36
-5
lines changed

2 files changed

+36
-5
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2153,7 +2153,11 @@ private void applyDefaults(QuerySettings settings) {
21532153

21542154
private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, Map<String, Object> requestSettings) {
21552155
boolean isAsync = MapUtils.getFlag(requestSettings, configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey());
2156-
return isAsync ? CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor) : CompletableFuture.completedFuture(resultSupplier.get());
2156+
if (isAsync) {
2157+
return sharedOperationExecutor == null ? CompletableFuture.supplyAsync(resultSupplier) :
2158+
CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor);
2159+
}
2160+
return CompletableFuture.completedFuture(resultSupplier.get());
21572161
}
21582162

21592163
@Override

client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.clickhouse.client.ClickHouseProtocol;
66
import com.clickhouse.client.ClickHouseServerForTest;
77
import com.clickhouse.client.api.Client;
8+
import com.clickhouse.client.api.ClientConfigProperties;
89
import com.clickhouse.client.api.ClientException;
910
import com.clickhouse.client.api.DataTypeUtils;
1011
import com.clickhouse.client.api.command.CommandResponse;
@@ -269,6 +270,33 @@ public void insertRawData() throws Exception {
269270
assertEquals(records.size(), 1000);
270271
}
271272

273+
@Test(groups = { "integration" }, enabled = true)
274+
public void insertRawDataAsync() throws Exception {
275+
final String tableName = "raw_data_table_async";
276+
final String createSQL = "CREATE TABLE " + tableName +
277+
" (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()";
278+
279+
initTable(tableName, createSQL);
280+
281+
InsertSettings localSettings = new InsertSettings(settings.getAllSettings());
282+
localSettings.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), true);
283+
ByteArrayOutputStream data = new ByteArrayOutputStream();
284+
PrintWriter writer = new PrintWriter(data);
285+
for (int i = 0; i < 1000; i++) {
286+
writer.printf("%d\t%s\t%s\t%d\t%s\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2");
287+
}
288+
writer.flush();
289+
client.insert(tableName, new ByteArrayInputStream(data.toByteArray()),
290+
ClickHouseFormat.TSV, localSettings).whenComplete((response, throwable) -> {
291+
OperationMetrics metrics = response.getMetrics();
292+
assertEquals((int)response.getWrittenRows(), 1000 );
293+
294+
List<GenericRecord> records = client.queryAll("SELECT * FROM " + tableName);
295+
assertEquals(records.size(), 1000);
296+
})
297+
.join(); // wait operation complete. only for tests
298+
}
299+
272300
@Test(groups = { "integration" }, dataProvider = "insertRawDataSimpleDataProvider", dataProviderClass = InsertTests.class)
273301
public void insertRawDataSimple(String tableName) throws Exception {
274302
// final String tableName = "raw_data_table";
@@ -639,10 +667,9 @@ public void testCollectionInsert() throws Exception {
639667
}
640668
}
641669

642-
643-
static {
644-
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
645-
}
670+
// static {
671+
// System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
672+
// }
646673

647674
@Test(groups = {"integration"}, dataProvider = "testAppCompressionDataProvider", dataProviderClass = InsertTests.class)
648675
public void testAppCompression(String algo) throws Exception {

0 commit comments

Comments
 (0)