Skip to content

Commit 6d33827

Browse files
committed
made query spawn only one thread per request
1 parent 4ecf685 commit 6d33827

File tree

2 files changed

+19
-7
lines changed

2 files changed

+19
-7
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.concurrent.ExecutorService;
6262
import java.util.concurrent.Executors;
6363
import java.util.concurrent.TimeUnit;
64+
import java.util.concurrent.TimeoutException;
6465

6566
import static java.time.temporal.ChronoUnit.SECONDS;
6667

@@ -98,14 +99,16 @@ public class Client {
9899

99100
private static final String DEFAULT_DB_NAME = "default";
100101

102+
private static final String DEFAULT_THREADS_PER_CLIENT = "10";
103+
101104
private Set<String> endpoints;
102105
private Map<String, String> configuration;
103106
private List<ClickHouseNode> serverNodes = new ArrayList<>();
104107
private Map<Class<?>, List<POJOSerializer>> serializers;//Order is important to preserve for RowBinary
105108
private Map<Class<?>, Map<String, Method>> getterMethods;
106109
private Map<Class<?>, Boolean> hasDefaults;
107110
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
108-
private ExecutorService queryExecutor;
111+
private ExecutorService sharedOperationExecutor;
109112

110113
private Map<String, ClientStatisticsHolder> globalClientStats = new ConcurrentHashMap<>();
111114

@@ -120,8 +123,9 @@ private Client(Set<String> endpoints, Map<String,String> configuration) {
120123
this.hasDefaults = new HashMap<>();
121124

122125
final int numThreads = Integer.parseInt(configuration.getOrDefault(
123-
ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(), "3"));
124-
this.queryExecutor = Executors.newFixedThreadPool(numThreads, r -> {
126+
ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(), DEFAULT_THREADS_PER_CLIENT));
127+
128+
this.sharedOperationExecutor = Executors.newFixedThreadPool(numThreads, r -> {
125129
Thread t = new Thread(r);
126130
t.setName("ClickHouse-Query-Executor");
127131
t.setUncaughtExceptionHandler((t1, e) -> {
@@ -642,11 +646,11 @@ public CompletableFuture<InsertResponse> insert(String tableName,
642646

643647
if (!responseFuture.isCompletedExceptionally()) {
644648
try {
645-
InsertResponse response = new InsertResponse(client, future.get(), clientStats);
649+
InsertResponse response = new InsertResponse(client, future.get(TIMEOUT, TimeUnit.MILLISECONDS), clientStats);
646650
responseFuture.complete(response);
647651
} catch (ExecutionException e) {
648652
responseFuture.completeExceptionally(new ClientException("Failed to get insert response", e.getCause()));
649-
} catch (InterruptedException e) {
653+
} catch (InterruptedException | TimeoutException e) {
650654
responseFuture.completeExceptionally(new ClientException("Operation has likely timed out.", e));
651655
}
652656
}
@@ -721,6 +725,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
721725
ClickHouseRequest<?> request = client.read(getServerNode());
722726
request.options(SettingsConverter.toRequestOptions(settings.getAllSettings()));
723727
request.settings(SettingsConverter.toRequestSettings(settings.getAllSettings(), queryParams));
728+
request.option(ClickHouseClientOption.ASYNC, false); // we have own async handling
724729
request.query(sqlQuery, settings.getQueryId());
725730
final ClickHouseFormat format = settings.getFormat();
726731
request.format(format);
@@ -737,7 +742,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
737742
} catch (Exception e) {
738743
throw new ClientException("Failed to get query response", e);
739744
}
740-
}, queryExecutor);
745+
}, sharedOperationExecutor);
741746
return future;
742747
}
743748

@@ -773,6 +778,7 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
773778
ClickHouseRequest<?> request = client.read(getServerNode());
774779
request.options(SettingsConverter.toRequestOptions(settings.getAllSettings()));
775780
request.settings(SettingsConverter.toRequestSettings(settings.getAllSettings(), null));
781+
request.option(ClickHouseClientOption.ASYNC, false); // we have own async handling
776782
request.query(sqlQuery, settings.getQueryId());
777783
final ClickHouseFormat format = settings.getFormat();
778784
request.format(format);
@@ -789,7 +795,7 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
789795
} catch (Exception e) {
790796
throw new ClientException("Failed to get query response", e);
791797
}
792-
}, queryExecutor);
798+
}, sharedOperationExecutor);
793799

794800
return future;
795801
}

client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ public ClickHouseInputStream getInputStream() {
8686

8787
@Override
8888
public void close() throws Exception {
89+
try {
90+
responseRef.get(completeTimeout, TimeUnit.MILLISECONDS ).close();
91+
} catch (Exception e) {
92+
throw new ClientException("Failed to close response", e);
93+
}
94+
8995
try {
9096
client.close();
9197
} catch (Exception e) {

0 commit comments

Comments
 (0)