Skip to content

Commit 4ac08b0

Browse files
committed
Connection pool configuration
1 parent bab37d8 commit 4ac08b0

File tree

4 files changed

+142
-46
lines changed

4 files changed

+142
-46
lines changed

clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java renamed to clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpClientConnectionImpl.java

File renamed without changes.

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
import com.clickhouse.data.ClickHouseFormat;
4646
import com.clickhouse.data.ClickHousePipedOutputStream;
4747
import com.clickhouse.data.format.BinaryStreamUtils;
48-
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
49-
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
5048
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
5149
import org.apache.hc.core5.http.ClassicHttpResponse;
5250
import org.apache.hc.core5.http.HttpStatus;
@@ -298,6 +296,16 @@ public Builder setAccessToken(String accessToken) {
298296
return this;
299297
}
300298

299+
/**
300+
* Configures client to use build-in connection pool
301+
* @param enable - if connection pool should be enabled
302+
* @return
303+
*/
304+
public Builder enableConnectionPool(boolean enable) {
305+
this.configuration.put("connection_pool_enabled", String.valueOf(enable));
306+
return this;
307+
}
308+
301309
/**
302310
* Default connection timeout in milliseconds. Timeout is applied to establish a connection.
303311
*
@@ -331,15 +339,42 @@ public Builder setConnectionRequestTimeout(long timeout, ChronoUnit unit) {
331339
}
332340

333341
/**
334-
* Sets the maximum number of connections that can be opened at the same time to a single server.
335-
* Default is 10.
342+
* Sets the maximum number of connections that can be opened at the same time to a single server. Limit is not
343+
* a hard stop. It is done to prevent threads stuck inside a connection pool waiting for a connection.
344+
* Default is 10. It is recommended to set a higher value for a high concurrent applications. It will let
345+
* more threads to get a connection and execute a query.
346+
*
336347
* @param maxConnections - maximum number of connections
337348
*/
338349
public Builder setMaxConnections(int maxConnections) {
339350
this.configuration.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), String.valueOf(maxConnections));
340351
return this;
341352
}
342353

354+
/**
355+
* Sets how long any connection would be considered as active and able for a lease.
356+
* After this time connection will be marked for sweep and will not be returned from a pool.
357+
* @param timeout - time in unit
358+
* @param unit - time unit
359+
* @return
360+
*/
361+
public Builder setConnectionTTL(long timeout, ChronoUnit unit) {
362+
this.configuration.put("connection_ttl", String.valueOf(Duration.of(timeout, unit).toMillis()));
363+
return this;
364+
}
365+
366+
/**
367+
* Sets strategy of how connections are reuse.
368+
* Default is {@link ConnectionReuseStrategy#FIFO} to evenly distribute load between them.
369+
*
370+
* @param strategy - strategy for connection reuse
371+
* @return
372+
*/
373+
public Builder setConnectionReuseStrategy(ConnectionReuseStrategy strategy) {
374+
this.configuration.put("connection_reuse_strategy", strategy.name());
375+
return this;
376+
}
377+
343378
// SOCKET SETTINGS
344379

345380
/**
@@ -503,8 +538,8 @@ public Builder setProxyCredentials(String user, String pass) {
503538
* @param timeUnit
504539
* @return
505540
*/
506-
public Builder setExecutionTimeout(long timeout, TimeUnit timeUnit) {
507-
this.configuration.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), String.valueOf(timeUnit.toMillis(timeout)));
541+
public Builder setExecutionTimeout(long timeout, ChronoUnit timeUnit) {
542+
this.configuration.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), String.valueOf(Duration.of(timeout, timeUnit).toMillis()));
508543
return this;
509544
}
510545

@@ -713,6 +748,14 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
713748
userConfig.put("connection_request_timeout", "10000");
714749
}
715750

751+
if (!userConfig.containsKey("connection_reuse_strategy")) {
752+
userConfig.put("connection_reuse_strategy", ConnectionReuseStrategy.FIFO.name());
753+
}
754+
755+
if (!userConfig.containsKey("connection_pool_enabled")) {
756+
userConfig.put("connection_pool_enabled", "true");
757+
}
758+
716759
return userConfig;
717760
}
718761
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.clickhouse.client.api;
2+
3+
public enum ConnectionReuseStrategy {
4+
5+
/**
6+
* Reuse recently freed connection and returned to a pool
7+
*/
8+
LIFO,
9+
10+
/**
11+
* Reuse mostly all connections
12+
*/
13+
FIFO
14+
;
15+
}

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
import org.apache.hc.client5.http.config.RequestConfig;
1717
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1818
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
19+
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
1920
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
21+
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
2022
import org.apache.hc.client5.http.protocol.HttpClientContext;
23+
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
24+
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
2125
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
2226
import org.apache.hc.core5.http.ClassicHttpResponse;
2327
import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
@@ -28,6 +32,7 @@
2832
import org.apache.hc.core5.http.HttpHost;
2933
import org.apache.hc.core5.http.HttpStatus;
3034
import org.apache.hc.core5.http.NoHttpResponseException;
35+
import org.apache.hc.core5.http.config.RegistryBuilder;
3136
import org.apache.hc.core5.http.io.SocketConfig;
3237
import org.apache.hc.core5.http.io.entity.EntityTemplate;
3338
import org.apache.hc.core5.io.IOCallback;
@@ -48,6 +53,7 @@
4853
import java.net.URI;
4954
import java.net.URISyntaxException;
5055
import java.net.UnknownHostException;
56+
import java.security.NoSuchAlgorithmException;
5157
import java.util.Base64;
5258
import java.util.Map;
5359
import java.util.concurrent.TimeUnit;
@@ -72,7 +78,8 @@ public HttpAPIClientHelper(Map<String, String> configuration) {
7278

7379
RequestConfig.Builder reqConfBuilder = RequestConfig.custom();
7480
MapUtils.applyLong(chConfiguration, "connection_request_timeout",
75-
(t) -> reqConfBuilder.setConnectionRequestTimeout(t, TimeUnit.MILLISECONDS));
81+
(t) -> reqConfBuilder
82+
.setConnectionRequestTimeout(t, TimeUnit.MILLISECONDS));
7683

7784
this.baseRequestConfig = reqConfBuilder.build();
7885

@@ -82,42 +89,19 @@ public HttpAPIClientHelper(Map<String, String> configuration) {
8289
LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression);
8390
}
8491

85-
public CloseableHttpClient createHttpClient() {
86-
87-
// Top Level builders
88-
HttpClientBuilder clientBuilder = HttpClientBuilder.create();
89-
90-
91-
// Socket configuration
92-
SocketConfig.Builder soCfgBuilder = SocketConfig.custom();
93-
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_TIMEOUT.getKey(),
94-
(t) -> soCfgBuilder.setSoTimeout(t, TimeUnit.MILLISECONDS));
95-
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_RCVBUF.getKey(),
96-
soCfgBuilder::setRcvBufSize);
97-
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_SNDBUF.getKey(),
98-
soCfgBuilder::setSndBufSize);
99-
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_LINGER.getKey(),
100-
(v) -> soCfgBuilder.setSoLinger(v, TimeUnit.SECONDS));
101-
102-
// Connection manager
103-
PoolingHttpClientConnectionManagerBuilder connMgrBuilder = PoolingHttpClientConnectionManagerBuilder.create()
104-
.setConnPoolPolicy(PoolReusePolicy.LIFO)
105-
.setPoolConcurrencyPolicy(PoolConcurrencyPolicy.STRICT);
106-
107-
ConnectionConfig.Builder connConfig = ConnectionConfig.custom();
108-
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.CONNECTION_TIMEOUT.getKey(),
109-
(t) -> connConfig.setConnectTimeout(t, TimeUnit.MILLISECONDS));
110-
111-
112-
connMgrBuilder.setDefaultConnectionConfig(connConfig.build());
113-
connMgrBuilder.setMaxConnTotal(Integer.MAX_VALUE); // as we do not know how many routes we will have
114-
MapUtils.applyInt(chConfiguration, ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(),
115-
connMgrBuilder::setMaxConnPerRoute);
116-
92+
/**
93+
* Creates or returns default SSL context.
94+
* @return SSLContext
95+
*/
96+
public SSLContext createSSLContext() {
97+
SSLContext sslContext;
98+
try {
99+
sslContext = SSLContext.getDefault();
100+
} catch (NoSuchAlgorithmException e) {
101+
throw new ClientException("Failed to create default SSL context", e);
102+
}
117103
ClickHouseSslContextProvider sslContextProvider = ClickHouseSslContextProvider.getProvider();
118-
119104
String trustStorePath = chConfiguration.get(ClickHouseClientOption.TRUST_STORE.getKey());
120-
SSLContext sslContext = null;
121105
if (trustStorePath != null ) {
122106
try {
123107
sslContext = sslContextProvider.getSslContextFromKeyStore(
@@ -142,12 +126,57 @@ public CloseableHttpClient createHttpClient() {
142126
throw new ClientMisconfigurationException("Failed to create SSL context from certificates", e);
143127
}
144128
}
145-
if (sslContext !=null) {
146-
connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext));
147-
}
129+
return sslContext;
130+
}
131+
148132

149-
connMgrBuilder.setDefaultSocketConfig(soCfgBuilder.build());
150-
clientBuilder.setConnectionManager(connMgrBuilder.build());
133+
private HttpClientConnectionManager basicConnectionManager(SSLContext sslContext, SocketConfig socketConfig) {
134+
RegistryBuilder<ConnectionSocketFactory> registryBuilder = RegistryBuilder.create();
135+
registryBuilder.register("http", PlainConnectionSocketFactory.getSocketFactory());
136+
registryBuilder.register("https", new SSLConnectionSocketFactory(sslContext));
137+
138+
139+
BasicHttpClientConnectionManager connManager = new BasicHttpClientConnectionManager(registryBuilder.build());
140+
connManager.setSocketConfig(socketConfig);
141+
return connManager;
142+
}
143+
144+
private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext, SocketConfig socketConfig) {
145+
PoolingHttpClientConnectionManagerBuilder connMgrBuilder = PoolingHttpClientConnectionManagerBuilder.create()
146+
.setConnPoolPolicy(PoolReusePolicy.LIFO)
147+
.setPoolConcurrencyPolicy(PoolConcurrencyPolicy.STRICT);
148+
149+
ConnectionConfig.Builder connConfig = ConnectionConfig.custom();
150+
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.CONNECTION_TIMEOUT.getKey(),
151+
(t) -> connConfig.setConnectTimeout(t, TimeUnit.MILLISECONDS));
152+
153+
154+
connMgrBuilder.setDefaultConnectionConfig(connConfig.build());
155+
connMgrBuilder.setMaxConnTotal(Integer.MAX_VALUE); // as we do not know how many routes we will have
156+
MapUtils.applyInt(chConfiguration, ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(),
157+
connMgrBuilder::setMaxConnPerRoute);
158+
159+
connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext));
160+
connMgrBuilder.setDefaultSocketConfig(socketConfig);
161+
return connMgrBuilder.build();
162+
}
163+
164+
public CloseableHttpClient createHttpClient() {
165+
166+
// Top Level builders
167+
HttpClientBuilder clientBuilder = HttpClientBuilder.create();
168+
SSLContext sslContext = createSSLContext();
169+
170+
// Socket configuration
171+
SocketConfig.Builder soCfgBuilder = SocketConfig.custom();
172+
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_TIMEOUT.getKey(),
173+
(t) -> soCfgBuilder.setSoTimeout(t, TimeUnit.MILLISECONDS));
174+
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_RCVBUF.getKey(),
175+
soCfgBuilder::setRcvBufSize);
176+
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_SNDBUF.getKey(),
177+
soCfgBuilder::setSndBufSize);
178+
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_LINGER.getKey(),
179+
(v) -> soCfgBuilder.setSoLinger(v, TimeUnit.SECONDS));
151180

152181
// Proxy
153182
String proxyHost = chConfiguration.get(ClickHouseClientOption.PROXY_HOST.getKey());
@@ -173,6 +202,15 @@ public CloseableHttpClient createHttpClient() {
173202
.equalsIgnoreCase("false")) {
174203
clientBuilder.disableCookieManagement();
175204
}
205+
SocketConfig socketConfig = soCfgBuilder.build();
206+
207+
// Connection manager
208+
boolean isConnectionPooling = MapUtils.getFlag(chConfiguration, "connection_pool_enabled");
209+
if (isConnectionPooling) {
210+
clientBuilder.setConnectionManager(poolConnectionManager(sslContext, socketConfig));
211+
} else {
212+
clientBuilder.setConnectionManager(basicConnectionManager(sslContext, socketConfig));
213+
}
176214

177215
return clientBuilder.build();
178216
}

0 commit comments

Comments
 (0)