Skip to content

Commit 4e3bf9f

Browse files
committed
implemented configurable operation timeout
1 parent 95d284f commit 4e3bf9f

File tree

2 files changed

+80
-53
lines changed

2 files changed

+80
-53
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 75 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.clickhouse.client.api.query.QuerySettings;
3030
import com.clickhouse.client.api.query.Records;
3131
import com.clickhouse.client.config.ClickHouseClientOption;
32+
import com.clickhouse.client.config.ClickHouseDefaults;
3233
import com.clickhouse.data.ClickHouseColumn;
3334
import com.clickhouse.data.ClickHouseDataStreamFactory;
3435
import com.clickhouse.data.ClickHouseFormat;
@@ -44,7 +45,6 @@
4445
import java.io.InputStream;
4546
import java.lang.reflect.InvocationTargetException;
4647
import java.lang.reflect.Method;
47-
import java.net.URISyntaxException;
4848
import java.net.URL;
4949
import java.time.Duration;
5050
import java.time.temporal.ChronoUnit;
@@ -96,18 +96,13 @@
9696
*
9797
*/
9898
public class Client {
99-
private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30);
100-
101-
private static final String DEFAULT_DB_NAME = "default";
102-
103-
private static final String DEFAULT_THREADS_PER_CLIENT = "10";
10499

105100
private Set<String> endpoints;
106101
private Map<String, String> configuration;
107102
private List<ClickHouseNode> serverNodes = new ArrayList<>();
108-
private Map<Class<?>, List<POJOSerializer>> serializers;//Order is important to preserve for RowBinary
103+
private Map<Class<?>, List<POJOSerializer>> serializers; //Order is important to preserve for RowBinary
109104
private Map<Class<?>, Map<String, Method>> getterMethods;
110-
private Map<Class<?>, Boolean> hasDefaults;
105+
private Map<Class<?>, Boolean> hasDefaults; // Whether the POJO has defaults
111106
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
112107
private ExecutorService sharedOperationExecutor;
113108

@@ -123,9 +118,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration) {
123118
this.getterMethods = new HashMap<>();
124119
this.hasDefaults = new HashMap<>();
125120

126-
final int numThreads = Integer.parseInt(configuration.getOrDefault(
127-
ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(), DEFAULT_THREADS_PER_CLIENT));
128-
121+
final int numThreads = Integer.parseInt(configuration.get(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey()));
129122
this.sharedOperationExecutor = Executors.newFixedThreadPool(numThreads, new DefaultThreadFactory("chc-operation"));
130123
LOG.debug("Query executor created with {} threads", numThreads);
131124
}
@@ -141,6 +134,8 @@ public String getDefaultDatabase() {
141134

142135
public static class Builder {
143136
private Set<String> endpoints;
137+
138+
// Read-only configuration
144139
private Map<String, String> configuration;
145140

146141
public Builder() {
@@ -396,6 +391,17 @@ public Builder addProxy(ProxyType type, String host, int port) {
396391
return this;
397392
}
398393

394+
/**
395+
* Sets the maximum time for operation to complete. By default, it is set to 3 hours.
396+
* @param timeout
397+
* @param timeUnit
398+
* @return
399+
*/
400+
public Builder setExecutionTimeout(long timeout, TimeUnit timeUnit) {
401+
this.configuration.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), String.valueOf(timeUnit.toMillis(timeout)));
402+
return this;
403+
}
404+
399405
public Client build() {
400406
// check if endpoint are empty. so can not initiate client
401407
if (this.endpoints.isEmpty()) {
@@ -405,11 +411,30 @@ public Client build() {
405411
if (!this.configuration.containsKey("access_token") && (!this.configuration.containsKey("user") || !this.configuration.containsKey("password"))) {
406412
throw new IllegalArgumentException("Username and password are required");
407413
}
414+
415+
this.configuration = setDefaults(this.configuration);
416+
417+
return new Client(this.endpoints, this.configuration);
418+
}
419+
420+
private Map<String, String> setDefaults(Map<String, String> userConfig) {
421+
408422
// set default database name if not specified
409-
if (!this.configuration.containsKey("database")) {
410-
this.configuration.put("database", DEFAULT_DB_NAME);
423+
if (!userConfig.containsKey("database")) {
424+
userConfig.put("database", (String) ClickHouseDefaults.DATABASE.getDefaultValue());
411425
}
412-
return new Client(this.endpoints, this.configuration);
426+
427+
if (!userConfig.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) {
428+
userConfig.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(),
429+
String.valueOf(ClickHouseClientOption.MAX_EXECUTION_TIME.getDefaultValue()));
430+
}
431+
432+
if (!userConfig.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) {
433+
userConfig.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(),
434+
String.valueOf(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getDefaultValue()));
435+
}
436+
437+
return userConfig;
413438
}
414439
}
415440

@@ -423,7 +448,7 @@ private ClickHouseNode getServerNode() {
423448
* @return true if the server is alive, false otherwise
424449
*/
425450
public boolean ping() {
426-
return ping(Client.TIMEOUT);
451+
return ping(getOperationTimeout());
427452
}
428453

429454
/**
@@ -641,7 +666,14 @@ public CompletableFuture<InsertResponse> insert(String tableName,
641666

642667
if (!responseFuture.isCompletedExceptionally()) {
643668
try {
644-
InsertResponse response = new InsertResponse(client, future.get(TIMEOUT, TimeUnit.MILLISECONDS), clientStats);
669+
int operationTimeout = getOperationTimeout();
670+
ClickHouseResponse clickHouseResponse;
671+
if (operationTimeout > 0) {
672+
clickHouseResponse = future.get(operationTimeout, TimeUnit.MILLISECONDS);
673+
} else {
674+
clickHouseResponse = future.get();
675+
}
676+
InsertResponse response = new InsertResponse(client, clickHouseResponse, clientStats);
645677
responseFuture.complete(response);
646678
} catch (ExecutionException e) {
647679
responseFuture.completeExceptionally(new ClientException("Failed to get insert response", e.getCause()));
@@ -729,9 +761,16 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
729761
CompletableFuture<QueryResponse> future = CompletableFuture.supplyAsync(() -> {
730762
LOG.trace("Executing request: {}", request);
731763
try {
732-
QueryResponse queryResponse = new QueryResponse(client, request.execute(), finalSettings, format, clientStats);
733-
queryResponse.ensureDone();
734-
return queryResponse;
764+
765+
int operationTimeout = getOperationTimeout();
766+
ClickHouseResponse clickHouseResponse;
767+
if (operationTimeout > 0) {
768+
clickHouseResponse = request.execute().get(operationTimeout, TimeUnit.MILLISECONDS);
769+
} else {
770+
clickHouseResponse = request.execute().get();
771+
}
772+
773+
return new QueryResponse(client, clickHouseResponse, finalSettings, format, clientStats);
735774
} catch (ClientException e) {
736775
throw e;
737776
} catch (Exception e) {
@@ -782,8 +821,15 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
782821
CompletableFuture<Records> future = CompletableFuture.supplyAsync(() -> {
783822
LOG.trace("Executing request: {}", request);
784823
try {
785-
QueryResponse queryResponse = new QueryResponse(client, request.execute(), finalSettings, format, clientStats);
786-
queryResponse.ensureDone();
824+
int operationTimeout = getOperationTimeout();
825+
ClickHouseResponse clickHouseResponse;
826+
if (operationTimeout > 0) {
827+
clickHouseResponse = request.execute().get(operationTimeout, TimeUnit.MILLISECONDS);
828+
} else {
829+
clickHouseResponse = request.execute().get();
830+
}
831+
832+
QueryResponse queryResponse = new QueryResponse(client, clickHouseResponse, finalSettings, format, clientStats);
787833
return new Records(queryResponse, finalSettings);
788834
} catch (ClientException e) {
789835
throw e;
@@ -804,7 +850,10 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
804850
*/
805851
public List<GenericRecord> queryAll(String sqlQuery) {
806852
try {
807-
try (QueryResponse response = query(sqlQuery).get(TIMEOUT, TimeUnit.MILLISECONDS)) {
853+
int operationTimeout = getOperationTimeout();
854+
QueryResponse response = operationTimeout == 0 ? query(sqlQuery).get() :
855+
query(sqlQuery).get(operationTimeout, TimeUnit.MILLISECONDS);
856+
try (response) {
808857
List<GenericRecord> records = new ArrayList<>();
809858
if (response.getResultRows() > 0) {
810859
ClickHouseBinaryFormatReader reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream());
@@ -914,6 +963,10 @@ public Map<String, String> getConfiguration() {
914963
return Collections.unmodifiableMap(configuration);
915964
}
916965

966+
protected int getOperationTimeout() {
967+
return Integer.parseInt(configuration.get(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey()));
968+
}
969+
917970
/**
918971
* Returns unmodifiable set of endpoints.
919972
* @return - set of endpoints

client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.clickhouse.data.ClickHouseInputStream;
1111

1212
import java.util.concurrent.ExecutionException;
13-
import java.util.concurrent.Future;
1413
import java.util.concurrent.TimeUnit;
1514
import java.util.concurrent.TimeoutException;
1615

@@ -29,7 +28,7 @@
2928
*/
3029
public class QueryResponse implements AutoCloseable {
3130

32-
private final Future<ClickHouseResponse> responseRef;
31+
private final ClickHouseResponse clickHouseResponse;
3332
private final ClickHouseFormat format;
3433

3534
private long completeTimeout = TimeUnit.MINUTES.toMillis(1);
@@ -42,43 +41,19 @@ public class QueryResponse implements AutoCloseable {
4241

4342
private volatile boolean completed = false;
4443

45-
public QueryResponse(ClickHouseClient client, Future<ClickHouseResponse> responseRef,
44+
public QueryResponse(ClickHouseClient client, ClickHouseResponse clickHouseResponse,
4645
QuerySettings settings, ClickHouseFormat format,
4746
ClientStatisticsHolder clientStatisticsHolder) {
4847
this.client = client;
49-
this.responseRef = responseRef;
48+
this.clickHouseResponse = clickHouseResponse;
5049
this.format = format;
5150
this.settings = settings;
5251
this.operationMetrics = new OperationMetrics(clientStatisticsHolder);
5352
}
5453

55-
/**
56-
* Called internally to finalize the query execution.
57-
* Do not call this method directly.
58-
*/
59-
public void ensureDone() {
60-
if (!completed) {
61-
// TODO: thread-safety
62-
makeComplete();
63-
}
64-
}
65-
66-
private void makeComplete() {
67-
try {
68-
ClickHouseResponse response = responseRef.get(completeTimeout, TimeUnit.MILLISECONDS);
69-
completed = true;
70-
operationMetrics.operationComplete(response.getSummary());
71-
} catch (ExecutionException e) {
72-
throw new ClientException("Failed to get command response", e.getCause());
73-
} catch (TimeoutException | InterruptedException e) {
74-
throw new ClientException("Query request failed", e);
75-
}
76-
}
77-
7854
public ClickHouseInputStream getInputStream() {
79-
ensureDone();
8055
try {
81-
return responseRef.get().getInputStream();
56+
return clickHouseResponse.getInputStream();
8257
} catch (Exception e) {
8358
throw new RuntimeException(e); // TODO: handle exception
8459
}
@@ -87,7 +62,7 @@ public ClickHouseInputStream getInputStream() {
8762
@Override
8863
public void close() throws Exception {
8964
try {
90-
responseRef.get(completeTimeout, TimeUnit.MILLISECONDS ).close();
65+
clickHouseResponse.close();
9166
} catch (Exception e) {
9267
throw new ClientException("Failed to close response", e);
9368
}
@@ -109,7 +84,6 @@ public ClickHouseFormat getFormat() {
10984
* @return metrics of this operation
11085
*/
11186
public OperationMetrics getMetrics() {
112-
ensureDone();
11387
return operationMetrics;
11488
}
11589

0 commit comments

Comments
 (0)