Skip to content

Commit 2a41352

Browse files
authored
Merge pull request #1771 from ClickHouse/connection_pool_config
Connection pool config
2 parents 0268b89 + 518365e commit 2a41352

File tree

4 files changed

+176
-18
lines changed

4 files changed

+176
-18
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,16 @@ public enum ClickHouseClientOption implements ClickHouseOption {
425425
/**
426426
* Query ID to be attached to an operation
427427
*/
428-
QUERY_ID("query_id", "", "Query id");
428+
QUERY_ID("query_id", "", "Query id"),
429+
430+
431+
/**
432+
* Connection time to live in milliseconds. 0 or negative number means no limit.
433+
* Can be used to override keep-alive time suggested by a server.
434+
*/
435+
CONNECTION_TTL("connection_ttl", 0L,
436+
"Connection time to live in milliseconds. 0 or negative number means no limit."),
437+
;
429438

430439
private final String key;
431440
private final Serializable defaultValue;

clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@
4040
import org.apache.hc.core5.http.io.SocketConfig;
4141
import org.apache.hc.core5.http.io.entity.EntityUtils;
4242
import org.apache.hc.core5.http.protocol.HttpContext;
43+
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
44+
import org.apache.hc.core5.pool.PoolReusePolicy;
4345
import org.apache.hc.core5.ssl.SSLContexts;
46+
import org.apache.hc.core5.util.TimeValue;
4447
import org.apache.hc.core5.util.Timeout;
4548
import org.apache.hc.core5.util.VersionInfo;
4649

@@ -91,14 +94,31 @@ private CloseableHttpClient newConnection(ClickHouseConfig c) throws IOException
9194
r.register("https", socketFactory.create(c, SSLConnectionSocketFactory.class));
9295
}
9396

94-
HttpConnectionManager connManager = new HttpConnectionManager(r.build(), c);
97+
long connectionTTL = config.getLongOption(ClickHouseClientOption.CONNECTION_TTL);
98+
log.info("Connection TTL: %d ms", connectionTTL);
99+
String poolReuseStrategy = c.getStrOption(ClickHouseHttpOption.CONNECTION_REUSE_STRATEGY);
100+
PoolReusePolicy poolReusePolicy = PoolReusePolicy.LIFO;
101+
if (poolReuseStrategy != null && !poolReuseStrategy.isEmpty()) {
102+
try {
103+
poolReusePolicy = PoolReusePolicy.valueOf(poolReuseStrategy);
104+
} catch (IllegalArgumentException e) {
105+
throw new IllegalArgumentException("Invalid connection reuse strategy: " + poolReuseStrategy);
106+
}
107+
}
108+
log.info("Connection reuse strategy: %s", poolReusePolicy.name());
109+
HttpConnectionManager connManager = new HttpConnectionManager(r.build(), c, PoolConcurrencyPolicy.LAX,
110+
poolReusePolicy, TimeValue.ofMilliseconds(connectionTTL));
95111
int maxConnection = config.getIntOption(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS);
96112

97-
connManager.setMaxTotal(maxConnection);
113+
connManager.setMaxTotal(Integer.MAX_VALUE); // unlimited on global level
98114
connManager.setDefaultMaxPerRoute(maxConnection);
99115

100116
HttpClientBuilder builder = HttpClientBuilder.create().setConnectionManager(connManager)
101117
.disableContentCompression();
118+
long timeout = c.getLongOption(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT);
119+
if (timeout > 0) {
120+
builder.setKeepAliveStrategy((response, context) -> TimeValue.ofMilliseconds(timeout));
121+
}
102122
if (c.getProxyType() == ClickHouseProxyType.HTTP) {
103123
builder.setProxy(new HttpHost(c.getProxyHost(), c.getProxyPort()));
104124
}
@@ -394,9 +414,10 @@ static class HttpConnectionManager extends PoolingHttpClientConnectionManager {
394414
versionInfo != null && !versionInfo.isEmpty() ? versionInfo : PROVIDER);
395415
}
396416

397-
public HttpConnectionManager(Registry<ConnectionSocketFactory> socketFactory, ClickHouseConfig config) {
398-
super(socketFactory);
399-
417+
public HttpConnectionManager(Registry<ConnectionSocketFactory> socketFactory, ClickHouseConfig config,
418+
PoolConcurrencyPolicy poolConcurrentcyPolicy, PoolReusePolicy poolReusePolicy,
419+
TimeValue ttl) {
420+
super(socketFactory, poolConcurrentcyPolicy, poolReusePolicy, ttl);
400421
ConnectionConfig connConfig = ConnectionConfig.custom()
401422
.setConnectTimeout(Timeout.of(config.getConnectionTimeout(), TimeUnit.MILLISECONDS))
402423
.setValidateAfterInactivity(config.getLongOption(ClickHouseHttpOption.AHC_VALIDATE_AFTER_INACTIVITY), TimeUnit.MILLISECONDS)

clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,28 @@ public enum ClickHouseHttpOption implements ClickHouseOption {
8787
* <li>{@code 503 Service Unavailable}</li>
8888
* </ul>
8989
*/
90-
AHC_RETRY_ON_FAILURE("ahc_retry_on_failure", false, "Whether to retry on failure with AsyncHttpClient.")
90+
AHC_RETRY_ON_FAILURE("ahc_retry_on_failure", false, "Whether to retry on failure with AsyncHttpClient."),
91+
92+
/**
93+
* Configuration for AsyncHttpClient connection pool. It defines how to reuse connections.
94+
* If {@code "FIFO"} is set, the connections are reused in the order they were created.
95+
* If {@code "LIFO"} is set, the connections are reused as soon they are available.
96+
* Default value is {@code "LIFO"}.
97+
*/
98+
CONNECTION_REUSE_STRATEGY("connection_reuse_strategy", "LIFO",
99+
"Connection reuse strategy for AsyncHttpClient. Valid values: LIFO, FIFO"),
100+
101+
/**
102+
* Configures client with preferred connection keep alive timeout if keep alive is enabled.
103+
* Usually servers tells a client how long it can keep a connection alive. This option can be used
104+
* when connection should be ended earlier. If value less or equal to 0, the server's timeout is used.
105+
* Default value is -1.
106+
* Time unit is milliseconds.
107+
*
108+
* Supported only for Apache Http Client connection provider currently.
109+
*/
110+
KEEP_ALIVE_TIMEOUT("alive_timeout", -1L,
111+
"Default keep-alive timeout in milliseconds."),
91112
;
92113

93114
private final String key;

clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java

Lines changed: 118 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,42 @@
99
import com.clickhouse.client.ClickHouseResponse;
1010
import com.clickhouse.client.ClickHouseSocketFactory;
1111
import com.clickhouse.client.config.ClickHouseClientOption;
12+
import com.clickhouse.client.config.ClickHouseDefaults;
13+
import com.clickhouse.client.config.ClickHouseProxyType;
1214
import com.clickhouse.client.http.config.ClickHouseHttpOption;
1315
import com.clickhouse.client.http.config.HttpConnectionProvider;
1416
import com.clickhouse.config.ClickHouseOption;
1517
import com.clickhouse.data.ClickHouseUtils;
16-
17-
import java.io.IOException;
18-
import java.io.Serializable;
19-
import java.net.ConnectException;
20-
import java.util.Collections;
21-
import java.util.HashMap;
22-
import java.util.List;
23-
import java.util.Map;
24-
import java.util.TreeMap;
25-
import java.util.concurrent.atomic.AtomicBoolean;
26-
2718
import com.github.tomakehurst.wiremock.WireMockServer;
2819
import com.github.tomakehurst.wiremock.client.WireMock;
20+
import com.github.tomakehurst.wiremock.common.Slf4jNotifier;
21+
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
2922
import com.github.tomakehurst.wiremock.http.Fault;
23+
import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener;
3024
import com.github.tomakehurst.wiremock.stubbing.Scenario;
3125
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
3226
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
3327
import org.apache.hc.core5.http.HttpStatus;
28+
import org.apache.hc.core5.net.URIBuilder;
3429
import org.testng.Assert;
3530
import org.testng.annotations.DataProvider;
3631
import org.testng.annotations.Ignore;
3732
import org.testng.annotations.Test;
3833

34+
import java.io.IOException;
35+
import java.io.Serializable;
36+
import java.net.ConnectException;
37+
import java.net.Socket;
38+
import java.nio.ByteBuffer;
39+
import java.util.Collections;
40+
import java.util.HashMap;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Random;
44+
import java.util.TreeMap;
45+
import java.util.concurrent.atomic.AtomicBoolean;
46+
import java.util.concurrent.atomic.AtomicInteger;
47+
3948
public class ApacheHttpConnectionImplTest extends ClickHouseHttpClientTest {
4049
public static class CustomSocketFactory implements ClickHouseSocketFactory {
4150
private static final AtomicBoolean created = new AtomicBoolean();
@@ -267,4 +276,102 @@ public void testNoHttpResponseExceptionWithValidation(long validationTimeout) {
267276
public static Object[] validationTimeoutProvider() {
268277
return new Long[] {-1L , 100L };
269278
}
279+
280+
@Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider")
281+
@SuppressWarnings("java:S2925")
282+
public void testConnectionTTL(Map<ClickHouseOption, Serializable> options, int openSockets) throws Exception {
283+
if (isCloud()) {
284+
// skip for cloud because wiremock proxy need extra configuration. TODO: need to fix it
285+
return;
286+
}
287+
ClickHouseNode server = getServer(ClickHouseProtocol.HTTP);
288+
289+
int proxyPort = new Random().nextInt(1000) + 10000;
290+
System.out.println("proxyPort: " + proxyPort);
291+
ConnectionCounterListener connectionCounter = new ConnectionCounterListener();
292+
WireMockServer proxy = new WireMockServer(WireMockConfiguration
293+
.options().port(proxyPort)
294+
.networkTrafficListener(connectionCounter)
295+
.notifier(new Slf4jNotifier(true)));
296+
proxy.start();
297+
URIBuilder targetURI = new URIBuilder(server.getBaseUri())
298+
.setPath("");
299+
proxy.addStubMapping(WireMock.post(WireMock.anyUrl())
300+
.willReturn(WireMock.aResponse().proxiedFrom(targetURI.build().toString())).build());
301+
302+
Map<ClickHouseOption, Serializable> baseOptions = new HashMap<>();
303+
baseOptions.put(ClickHouseClientOption.PROXY_PORT, proxyPort);
304+
baseOptions.put(ClickHouseClientOption.PROXY_HOST, "localhost");
305+
baseOptions.put(ClickHouseClientOption.PROXY_TYPE, ClickHouseProxyType.HTTP);
306+
baseOptions.put(ClickHouseDefaults.PASSWORD, getPassword());
307+
baseOptions.put(ClickHouseDefaults.USER, "default");
308+
baseOptions.putAll(options);
309+
310+
ClickHouseConfig config = new ClickHouseConfig(baseOptions);
311+
try (ClickHouseClient client = ClickHouseClient.builder().config(config).build()) {
312+
try (ClickHouseResponse resp = client.read(server).query("select 1").executeAndWait()) {
313+
Assert.assertEquals(resp.firstRecord().getValue(0).asString(), "1");
314+
}
315+
try {
316+
Thread.sleep(1000L);
317+
} catch (InterruptedException e) {
318+
Assert.fail("Unexpected exception", e);
319+
}
320+
321+
try (ClickHouseResponse resp = client.read(server).query("select 1").executeAndWait()) {
322+
Assert.assertEquals(resp.firstRecord().getValue(0).asString(), "1");
323+
}
324+
} catch (Exception e) {
325+
Assert.fail("Unexpected exception", e);
326+
} finally {
327+
Assert.assertEquals(connectionCounter.opened.get(), openSockets);
328+
proxy.stop();
329+
}
330+
}
331+
332+
@DataProvider(name = "testConnectionTTLProvider")
333+
public static Object[][] testConnectionTTLProvider() {
334+
HashMap<ClickHouseOption, Serializable> disabledKeepAlive = new HashMap<>();
335+
disabledKeepAlive.put(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT, 1000L);
336+
disabledKeepAlive.put(ClickHouseHttpOption.KEEP_ALIVE, false);
337+
HashMap<ClickHouseOption, Serializable> fifoOption = new HashMap<>();
338+
fifoOption.put(ClickHouseClientOption.CONNECTION_TTL, 1000L);
339+
fifoOption.put(ClickHouseHttpOption.CONNECTION_REUSE_STRATEGY, "FIFO");
340+
return new Object[][] {
341+
{ Collections.singletonMap(ClickHouseClientOption.CONNECTION_TTL, 1000L), 2 },
342+
{ Collections.singletonMap(ClickHouseClientOption.CONNECTION_TTL, 2000L), 1 },
343+
{ Collections.singletonMap(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT, 2000L), 1 },
344+
{ Collections.singletonMap(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT, 500L), 2 },
345+
{ disabledKeepAlive, 2 },
346+
{ fifoOption, 2 }
347+
};
348+
}
349+
350+
private static class ConnectionCounterListener implements WiremockNetworkTrafficListener {
351+
352+
private AtomicInteger opened = new AtomicInteger(0);
353+
private AtomicInteger closed = new AtomicInteger(0);
354+
355+
@Override
356+
public void opened(Socket socket) {
357+
opened.incrementAndGet();
358+
System.out.println("Opened: " + socket);
359+
}
360+
361+
@Override
362+
public void incoming(Socket socket, ByteBuffer bytes) {
363+
// ignore
364+
}
365+
366+
@Override
367+
public void outgoing(Socket socket, ByteBuffer bytes) {
368+
// ignore
369+
}
370+
371+
@Override
372+
public void closed(Socket socket) {
373+
closed.incrementAndGet();
374+
System.out.println("Closed: " + socket);
375+
}
376+
}
270377
}

0 commit comments

Comments
 (0)