Skip to content

Commit cb27526

Browse files
committed
added async config and executor
1 parent c50b920 commit cb27526

File tree

4 files changed

+142
-75
lines changed

4 files changed

+142
-75
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 89 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@
4040
import com.clickhouse.client.config.ClickHouseDefaults;
4141
import com.clickhouse.client.http.ClickHouseHttpProto;
4242
import com.clickhouse.data.ClickHouseColumn;
43-
import com.clickhouse.data.ClickHouseDataStreamFactory;
4443
import com.clickhouse.data.ClickHouseFormat;
45-
import com.clickhouse.data.ClickHousePipedOutputStream;
4644
import com.clickhouse.data.format.BinaryStreamUtils;
4745
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
4846
import org.apache.hc.core5.http.ClassicHttpResponse;
@@ -114,22 +112,23 @@
114112
public class Client implements AutoCloseable {
115113
private HttpAPIClientHelper httpClientHelper = null;
116114

117-
private Set<String> endpoints;
118-
private Map<String, String> configuration;
119-
private List<ClickHouseNode> serverNodes = new ArrayList<>();
120-
private Map<Class<?>, List<POJOSerializer>> serializers; //Order is important to preserve for RowBinary
121-
private Map<Class<?>, Map<String, Method>> getterMethods;
122-
private Map<Class<?>, Boolean> hasDefaults; // Whether the POJO has defaults
115+
private final Set<String> endpoints;
116+
private final Map<String, String> configuration;
117+
private final List<ClickHouseNode> serverNodes = new ArrayList<>();
118+
private final Map<Class<?>, List<POJOSerializer>> serializers; //Order is important to preserve for RowBinary
119+
private final Map<Class<?>, Map<String, Method>> getterMethods;
120+
private final Map<Class<?>, Boolean> hasDefaults; // Whether the POJO has defaults
123121
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
124-
private ExecutorService sharedOperationExecutor;
122+
private final ExecutorService sharedOperationExecutor;
125123

126-
private Map<String, ClientStatisticsHolder> globalClientStats = new ConcurrentHashMap<>();
124+
private final Map<String, ClientStatisticsHolder> globalClientStats = new ConcurrentHashMap<>();
127125

128126
private boolean useNewImplementation = false;
129127

130128
private ClickHouseClient oldClient = null;
131129

132-
private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation) {
130+
private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
131+
ExecutorService sharedOperationExecutor) {
133132
this.endpoints = endpoints;
134133
this.configuration = configuration;
135134
this.endpoints.forEach(endpoint -> {
@@ -139,7 +138,12 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
139138
this.getterMethods = new HashMap<>();
140139
this.hasDefaults = new HashMap<>();
141140

142-
this.sharedOperationExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("chc-operation"));
141+
boolean isAsyncEnabled = MapUtils.getFlag(this.configuration, ClickHouseClientOption.ASYNC.getKey());
142+
if (isAsyncEnabled && sharedOperationExecutor == null) {
143+
this.sharedOperationExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("chc-operation"));
144+
} else {
145+
this.sharedOperationExecutor = sharedOperationExecutor;
146+
}
143147
this.useNewImplementation = useNewImplementation;
144148
if (useNewImplementation) {
145149
this.httpClientHelper = new HttpAPIClientHelper(configuration);
@@ -169,7 +173,7 @@ public String getDefaultDatabase() {
169173
@Override
170174
public void close() {
171175
try {
172-
if (!sharedOperationExecutor.isShutdown()) {
176+
if (sharedOperationExecutor != null && !sharedOperationExecutor.isShutdown()) {
173177
this.sharedOperationExecutor.shutdownNow();
174178
}
175179
} catch (Exception e) {
@@ -188,6 +192,8 @@ public static class Builder {
188192
private Map<String, String> configuration;
189193
private boolean useNewImplementation = false;
190194

195+
private ExecutorService sharedOperationExecutor = null;
196+
191197
public Builder() {
192198
this.endpoints = new HashSet<>();
193199
this.configuration = new HashMap<String, String>();
@@ -603,6 +609,34 @@ public Builder setServerTimeZone(String timeZone) {
603609
return this;
604610
}
605611

612+
/**
613+
* Configures client to execute requests in a separate thread. By default, operations (query, insert)
614+
* are executed in the same thread as the caller.
615+
* It is possible to set a shared executor for all operations. See {@link #setSharedOperationExecutor(ExecutorService)}
616+
*
617+
* Note: Async operations a using executor what expects having a queue of tasks for a pool of executors.
618+
* The queue size limit is small it may quickly become a problem for scheduling new tasks.
619+
*
620+
* @param async - if to use async requests
621+
* @return
622+
*/
623+
public Builder useAsyncRequests(boolean async) {
624+
this.configuration.put(ClickHouseClientOption.ASYNC.getKey(), String.valueOf(async));
625+
return this;
626+
}
627+
628+
/**
629+
* Sets an executor for running operations. If async operations are enabled and no executor is specified
630+
* client will create a default executor.
631+
*
632+
* @param executorService - executor service for async operations
633+
* @return
634+
*/
635+
public Builder setSharedOperationExecutor(ExecutorService executorService) {
636+
this.sharedOperationExecutor = executorService;
637+
return this;
638+
}
639+
606640
public Client build() {
607641
this.configuration = setDefaults(this.configuration);
608642

@@ -648,7 +682,7 @@ public Client build() {
648682
throw new IllegalArgumentException("Nor server timezone nor specific timezone is set");
649683
}
650684

651-
return new Client(this.endpoints, this.configuration, this.useNewImplementation);
685+
return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor);
652686
}
653687

654688
private Map<String, String> setDefaults(Map<String, String> userConfig) {
@@ -908,8 +942,8 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
908942
}
909943
throw new ClientException("Failed to get table schema: too many retries");
910944
};
911-
boolean isAsync = MapUtils.getFlag(configuration, settings.getAllSettings(), ClickHouseClientOption.ASYNC.getKey());
912-
return isAsync ? CompletableFuture.supplyAsync(supplier, sharedOperationExecutor) : CompletableFuture.completedFuture(supplier.get());
945+
946+
return runAsyncOperation(supplier, settings.getAllSettings());
913947
} else {
914948
//Create an output stream to write the data to
915949
ByteArrayOutputStream stream = new ByteArrayOutputStream();
@@ -964,6 +998,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
964998
ClientStatisticsHolder clientStats = globalClientStats.remove(operationId);
965999
clientStats.start(ClientMetrics.OP_DURATION);
9661000

1001+
Supplier<InsertResponse> responseSupplier;
9671002
if (useNewImplementation) {
9681003

9691004
String retry = configuration.get(ClickHouseClientOption.RETRY.getKey());
@@ -978,7 +1013,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
9781013

9791014
settings.setOption(ClickHouseClientOption.FORMAT.getKey(), format.name());
9801015
final InsertSettings finalSettings = settings;
981-
Supplier<InsertResponse> supplier = () -> {
1016+
responseSupplier = () -> {
9821017
// Selecting some node
9831018
ClickHouseNode selectedNode = getNextAliveNode();
9841019

@@ -1036,29 +1071,22 @@ public CompletableFuture<InsertResponse> insert(String tableName,
10361071
}
10371072
throw new ClientException("Failed to insert data: too many retries");
10381073
};
1039-
boolean isAsync = MapUtils.getFlag(configuration, settings.getAllSettings(), ClickHouseClientOption.ASYNC.getKey());
1040-
return isAsync ? CompletableFuture.supplyAsync(supplier, sharedOperationExecutor) : CompletableFuture.completedFuture(supplier.get());
10411074
} else {
1042-
CompletableFuture<InsertResponse> responseFuture = new CompletableFuture<>();
1043-
1044-
ClickHouseRequest.Mutation request = ClientV1AdaptorHelper
1045-
.createMutationRequest(oldClient.write(getServerNode()), tableName, settings, configuration).format(format);
1046-
1047-
CompletableFuture<ClickHouseResponse> future = null;
1048-
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(request.getConfig())) {
1049-
future = request.data(stream.getInputStream()).execute();
1050-
1051-
//Copy the data from the input stream to the output stream
1052-
byte[] buffer = new byte[settings.getInputStreamCopyBufferSize()];
1053-
int bytesRead;
1054-
while ((bytesRead = data.read(buffer)) != -1) {
1055-
stream.write(buffer, 0, bytesRead);
1056-
}
1057-
} catch (IOException e) {
1058-
responseFuture.completeExceptionally(new ClientException("Failed to write data to the output stream", e));
1059-
}
1075+
responseSupplier = () -> {
1076+
ClickHouseRequest.Mutation request = ClientV1AdaptorHelper
1077+
.createMutationRequest(oldClient.write(getServerNode()), tableName, settings, configuration).format(format);
1078+
1079+
CompletableFuture<ClickHouseResponse> future = null;
1080+
future = request.data(output -> {
1081+
//Copy the data from the input stream to the output stream
1082+
byte[] buffer = new byte[settings.getInputStreamCopyBufferSize()];
1083+
int bytesRead;
1084+
while ((bytesRead = data.read(buffer)) != -1) {
1085+
output.write(buffer, 0, bytesRead);
1086+
}
1087+
output.close();
1088+
}).option(ClickHouseClientOption.ASYNC, false).execute();
10601089

1061-
if (!responseFuture.isCompletedExceptionally()) {
10621090
try {
10631091
int operationTimeout = getOperationTimeout();
10641092
ClickHouseResponse clickHouseResponse;
@@ -1068,17 +1096,17 @@ public CompletableFuture<InsertResponse> insert(String tableName,
10681096
clickHouseResponse = future.get();
10691097
}
10701098
InsertResponse response = new InsertResponse(clickHouseResponse, clientStats);
1071-
responseFuture.complete(response);
1099+
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));
1100+
return response;
10721101
} catch (ExecutionException e) {
1073-
responseFuture.completeExceptionally(new ClientException("Failed to get insert response", e.getCause()));
1102+
throw new ClientException("Failed to get insert response", e.getCause());
10741103
} catch (InterruptedException | TimeoutException e) {
1075-
responseFuture.completeExceptionally(new ClientException("Operation has likely timed out.", e));
1104+
throw new ClientException("Operation has likely timed out.", e);
10761105
}
1077-
}
1078-
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));
1079-
1080-
return responseFuture;
1106+
};
10811107
}
1108+
1109+
return runAsyncOperation(responseSupplier, settings.getAllSettings());
10821110
}
10831111

10841112
/**
@@ -1144,6 +1172,8 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11441172
clientStats.start(ClientMetrics.OP_DURATION);
11451173
applyDefaults(settings);
11461174

1175+
Supplier<QueryResponse> responseSupplier;
1176+
11471177
if (useNewImplementation) {
11481178
String retry = configuration.get(ClickHouseClientOption.RETRY.getKey());
11491179
final int maxRetries = retry == null ? (int) ClickHouseClientOption.RETRY.getDefaultValue() : Integer.parseInt(retry);
@@ -1152,7 +1182,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11521182
settings.setOption("statement_params", queryParams);
11531183
}
11541184
final QuerySettings finalSettings = settings;
1155-
Supplier<QueryResponse> supplier = () -> {
1185+
responseSupplier = () -> {
11561186
// Selecting some node
11571187
ClickHouseNode selectedNode = getNextAliveNode();
11581188
for (int i = 0; i <= maxRetries; i++) {
@@ -1188,8 +1218,6 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11881218
}
11891219
throw new ClientException("Failed to get table schema: too many retries");
11901220
};
1191-
boolean isAsync = MapUtils.getFlag(configuration, settings.getAllSettings(), ClickHouseClientOption.ASYNC.getKey());
1192-
return isAsync ? CompletableFuture.supplyAsync(supplier, sharedOperationExecutor) : CompletableFuture.completedFuture(supplier.get());
11931221
} else {
11941222
ClickHouseRequest<?> request = oldClient.read(getServerNode());
11951223
request.options(SettingsConverter.toRequestOptions(settings.getAllSettings()));
@@ -1200,7 +1228,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
12001228
request.format(format);
12011229

12021230
final QuerySettings finalSettings = settings;
1203-
CompletableFuture<QueryResponse> future = CompletableFuture.supplyAsync(() -> {
1231+
responseSupplier = () -> {
12041232
LOG.trace("Executing request: {}", request);
12051233
try {
12061234

@@ -1218,9 +1246,10 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
12181246
} catch (Exception e) {
12191247
throw new ClientException("Failed to get query response", e);
12201248
}
1221-
}, sharedOperationExecutor);
1222-
return future;
1249+
};
12231250
}
1251+
1252+
return runAsyncOperation(responseSupplier, settings.getAllSettings());
12241253
}
12251254

12261255
/**
@@ -1252,13 +1281,13 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
12521281
settings.waitEndOfQuery(true); // we rely on the summery
12531282

12541283
final QuerySettings finalSettings = settings;
1255-
return query(sqlQuery, settings).thenApplyAsync(response -> {
1284+
return query(sqlQuery, settings).thenApply(response -> {
12561285
try {
12571286
return new Records(response, finalSettings);
12581287
} catch (Exception e) {
12591288
throw new ClientException("Failed to get query response", e);
12601289
}
1261-
}, sharedOperationExecutor);
1290+
});
12621291
}
12631292

12641293
/**
@@ -1360,13 +1389,13 @@ public CompletableFuture<CommandResponse> execute(String sql, CommandSettings se
13601389
*/
13611390
public CompletableFuture<CommandResponse> execute(String sql) {
13621391
return query(sql)
1363-
.thenApplyAsync(response -> {
1392+
.thenApply(response -> {
13641393
try {
13651394
return new CommandResponse(response);
13661395
} catch (Exception e) {
13671396
throw new ClientException("Failed to get command response", e);
13681397
}
1369-
}, sharedOperationExecutor);
1398+
});
13701399
}
13711400

13721401
/**
@@ -1427,6 +1456,11 @@ private void applyDefaults(QuerySettings settings) {
14271456
}
14281457
}
14291458

1459+
private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, Map<String, Object> requestSettings) {
1460+
boolean isAsync = MapUtils.getFlag(configuration, requestSettings, ClickHouseClientOption.ASYNC.getKey());
1461+
return isAsync ? CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor) : CompletableFuture.completedFuture(resultSupplier.get());
1462+
}
1463+
14301464
public String toString() {
14311465
return "Client{" +
14321466
"endpoints=" + endpoints +

client-v2/src/test/java/com/clickhouse/client/ProxyTests.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
import com.clickhouse.client.insert.SamplePOJO;
1111
import com.github.tomakehurst.wiremock.WireMockServer;
1212
import com.github.tomakehurst.wiremock.client.WireMock;
13-
import com.github.tomakehurst.wiremock.common.Slf4jNotifier;
1413
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
15-
import com.github.tomakehurst.wiremock.http.Fault;
1614
import com.github.tomakehurst.wiremock.stubbing.Scenario;
1715
import org.apache.hc.core5.http.HttpHeaders;
1816
import org.apache.hc.core5.http.HttpStatus;
@@ -28,6 +26,7 @@
2826
import java.util.concurrent.TimeUnit;
2927

3028
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
29+
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
3130
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
3231
import static com.github.tomakehurst.wiremock.client.WireMock.post;
3332
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
@@ -88,9 +87,12 @@ public void testPrivateProxyWithoutAuth() {
8887
try {
8988
client.get().execute("select 1").get();
9089
Assert.fail("Should have thrown exception.");
90+
} catch (ExecutionException e) {
91+
Assert.assertTrue(e.getCause() instanceof ClientException);
92+
} catch (ClientMisconfigurationException e) {
93+
Assert.assertTrue(e.getMessage().contains("Proxy authentication required"));
9194
} catch (Exception e) {
92-
e.printStackTrace();
93-
Assert.assertTrue(e.getCause() instanceof ClientMisconfigurationException);
95+
Assert.fail("Should have thrown exception.", e);
9496
}
9597
}
9698

@@ -138,18 +140,25 @@ public void testProxyWithDisabledCookies() {
138140
client.set(clientBuilder(initProxy(), true).setHttpCookiesEnabled(false).build());
139141
final int targetPort = getServer(ClickHouseProtocol.HTTP).getPort();
140142

141-
proxy.get().addStubMapping(post(urlMatching("/.*"))
143+
proxy.get().addStubMapping(post(anyUrl())
142144
.inScenario("routeCookies")
143145
.whenScenarioStateIs(Scenario.STARTED)
144146
.willReturn(aResponse().withHeader(HttpHeaders.SET_COOKIE, "routeName=routeA")
145-
.proxiedFrom("http://localhost:" + targetPort)).willSetStateTo("cookies").build());
147+
.proxiedFrom("http://localhost:" + targetPort))
148+
.willSetStateTo("cookies").build());
146149

147-
proxy.get().addStubMapping(post(urlMatching("/.*"))
150+
proxy.get().addStubMapping(post(anyUrl())
148151
.inScenario("routeCookies")
149152
.whenScenarioStateIs("cookies")
150153
.withHeader(HttpHeaders.COOKIE, equalTo("routeName=routeA"))
151154
.willReturn(aResponse().proxiedFrom("http://localhost:" + targetPort)).build());
152155

156+
proxy.get().addStubMapping(post(anyUrl())
157+
.inScenario("routeCookies")
158+
.whenScenarioStateIs("cookies")
159+
.withHeader(HttpHeaders.COOKIE, WireMock.absent())
160+
.willReturn(aResponse().withStatus(HttpStatus.SC_BAD_GATEWAY)).build());
161+
153162
try {
154163
client.get().execute("select 1").get();
155164
} catch (Exception e) {
@@ -159,8 +168,10 @@ public void testProxyWithDisabledCookies() {
159168
client.get().execute("select 1").get();
160169
} catch (ExecutionException e) {
161170
Assert.assertTrue(e.getCause() instanceof ClientException);
171+
} catch (ClientException e) {
172+
Assert.assertTrue(e.getMessage().contains("Server returned '502 Bad gateway'"));
162173
} catch (Exception e) {
163-
fail("Should have thrown exception.", e);
174+
Assert.fail("Should have thrown exception.", e);
164175
}
165176
}
166177

@@ -175,7 +186,9 @@ private Client.Builder clientBuilder(int proxyPort, boolean onlyNewImplementatio
175186
}
176187

177188
private int initProxy() {
178-
WireMockServer wireMock = new WireMockServer(WireMockConfiguration.options().notifier(new Slf4jNotifier(true)));
189+
WireMockServer wireMock = new WireMockServer(WireMockConfiguration.options()
190+
// .notifier(new Slf4jNotifier(true))
191+
);
179192
wireMock.start();
180193
proxy.set(wireMock);
181194
return wireMock.port();

0 commit comments

Comments
 (0)