Skip to content

Commit 32eec04

Browse files
committed
connection managment tests
1 parent 4ac08b0 commit 32eec04

File tree

5 files changed

+177
-14
lines changed

5 files changed

+177
-14
lines changed

clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpClientConnectionImpl.java renamed to clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java

File renamed without changes.

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,17 +214,17 @@ public Builder() {
214214
public Builder addEndpoint(String endpoint) {
215215
try {
216216
URL endpointURL = new java.net.URL(endpoint);
217-
if (!(endpointURL.getProtocol().equalsIgnoreCase("https") ||
218-
endpointURL.getProtocol().equalsIgnoreCase("http"))) {
217+
218+
if (endpointURL.getProtocol().equalsIgnoreCase("https")) {
219+
addEndpoint(Protocol.HTTP, endpointURL.getHost(), endpointURL.getPort(), true);
220+
} else if (endpointURL.getProtocol().equalsIgnoreCase("http")) {
221+
addEndpoint(Protocol.HTTP, endpointURL.getHost(), endpointURL.getPort(), false);
222+
} else {
219223
throw new IllegalArgumentException("Only HTTP and HTTPS protocols are supported");
220224
}
221225
} catch (java.net.MalformedURLException e) {
222226
throw new IllegalArgumentException("Endpoint should be a valid URL string", e);
223227
}
224-
if (endpoint.startsWith("https://")) {
225-
this.configuration.put(ClickHouseClientOption.SSL.getKey(), "true");
226-
}
227-
this.endpoints.add(endpoint);
228228
return this;
229229
}
230230

@@ -246,7 +246,7 @@ public Builder addEndpoint(Protocol protocol, String host, int port, boolean sec
246246
this.configuration.put(ClickHouseClientOption.SSL.getKey(), "true");
247247
}
248248
String endpoint = String.format("%s%s://%s:%d", protocol.toString().toLowerCase(), secure ? "s": "", host, port);
249-
this.addEndpoint(endpoint);
249+
this.endpoints.add(endpoint);
250250
return this;
251251
}
252252

@@ -354,6 +354,7 @@ public Builder setMaxConnections(int maxConnections) {
354354
/**
355355
* Sets how long any connection would be considered as active and able for a lease.
356356
* After this time connection will be marked for sweep and will not be returned from a pool.
357+
* Has more effect than keep-alive timeout.
357358
* @param timeout - time in unit
358359
* @param unit - time unit
359360
* @return
@@ -363,6 +364,20 @@ public Builder setConnectionTTL(long timeout, ChronoUnit unit) {
363364
return this;
364365
}
365366

367+
/**
368+
* Sets keep alive timeout for a connection to override server value. If set to -1 then server value will be used.
369+
* Default is -1.
370+
* Doesn't override connection TTL value.
371+
* {@see Client#setConnectionTTL}
372+
* @param timeout - time in unit
373+
* @param unit - time unit
374+
* @return
375+
*/
376+
public Builder setKeepAliveTimeout(long timeout, ChronoUnit unit) {
377+
this.configuration.put("connection_keep_alive_timeout", String.valueOf(Duration.of(timeout, unit).toMillis()));
378+
return this;
379+
}
380+
366381
/**
367382
* Sets strategy of how connections are reuse.
368383
* Default is {@link ConnectionReuseStrategy#FIFO} to evenly distribute load between them.
@@ -756,6 +771,10 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
756771
userConfig.put("connection_pool_enabled", "true");
757772
}
758773

774+
if (!userConfig.containsKey("connection_ttl")) {
775+
userConfig.put("connection_ttl", "-1");
776+
}
777+
759778
return userConfig;
760779
}
761780
}

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hc.core5.net.URIBuilder;
4040
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
4141
import org.apache.hc.core5.pool.PoolReusePolicy;
42+
import org.apache.hc.core5.util.TimeValue;
4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
4445

@@ -129,6 +130,17 @@ public SSLContext createSSLContext() {
129130
return sslContext;
130131
}
131132

133+
private long CONNECTION_INACTIVITY_CHECK = 5000L;
134+
135+
private ConnectionConfig createConnectionConfig() {
136+
ConnectionConfig.Builder connConfig = ConnectionConfig.custom();
137+
connConfig.setTimeToLive(MapUtils.getLong(chConfiguration, "connection_ttl"), TimeUnit.MILLISECONDS);
138+
connConfig.setConnectTimeout(MapUtils.getLong(chConfiguration, ClickHouseClientOption.CONNECTION_TIMEOUT.getKey()),
139+
TimeUnit.MILLISECONDS);
140+
connConfig.setValidateAfterInactivity(CONNECTION_INACTIVITY_CHECK, TimeUnit.MILLISECONDS); // non-configurable for now
141+
142+
return connConfig.build();
143+
}
132144

133145
private HttpClientConnectionManager basicConnectionManager(SSLContext sslContext, SocketConfig socketConfig) {
134146
RegistryBuilder<ConnectionSocketFactory> registryBuilder = RegistryBuilder.create();
@@ -137,7 +149,9 @@ private HttpClientConnectionManager basicConnectionManager(SSLContext sslContext
137149

138150

139151
BasicHttpClientConnectionManager connManager = new BasicHttpClientConnectionManager(registryBuilder.build());
152+
connManager.setConnectionConfig(createConnectionConfig());
140153
connManager.setSocketConfig(socketConfig);
154+
141155
return connManager;
142156
}
143157

@@ -146,16 +160,12 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext,
146160
.setConnPoolPolicy(PoolReusePolicy.LIFO)
147161
.setPoolConcurrencyPolicy(PoolConcurrencyPolicy.STRICT);
148162

149-
ConnectionConfig.Builder connConfig = ConnectionConfig.custom();
150-
MapUtils.applyInt(chConfiguration, ClickHouseClientOption.CONNECTION_TIMEOUT.getKey(),
151-
(t) -> connConfig.setConnectTimeout(t, TimeUnit.MILLISECONDS));
152-
153-
154-
connMgrBuilder.setDefaultConnectionConfig(connConfig.build());
163+
connMgrBuilder.setDefaultConnectionConfig(createConnectionConfig());
155164
connMgrBuilder.setMaxConnTotal(Integer.MAX_VALUE); // as we do not know how many routes we will have
156165
MapUtils.applyInt(chConfiguration, ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(),
157166
connMgrBuilder::setMaxConnPerRoute);
158167

168+
159169
connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext));
160170
connMgrBuilder.setDefaultSocketConfig(socketConfig);
161171
return connMgrBuilder.build();
@@ -211,6 +221,10 @@ public CloseableHttpClient createHttpClient() {
211221
} else {
212222
clientBuilder.setConnectionManager(basicConnectionManager(sslContext, socketConfig));
213223
}
224+
long keepAliveTimeout = MapUtils.getLong(chConfiguration, "connection_keep_alive_timeout");
225+
if (keepAliveTimeout > 0) {
226+
clientBuilder.setKeepAliveStrategy((response, context) -> TimeValue.ofMilliseconds(keepAliveTimeout));
227+
}
214228

215229
return clientBuilder.build();
216230
}
@@ -238,7 +252,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
238252
try {
239253
URIBuilder uriBuilder = new URIBuilder(server.getBaseUri());
240254
addQueryParams(uriBuilder, chConfiguration, requestConfig);
241-
uri = uriBuilder.build();
255+
uri = uriBuilder.normalizeSyntax().build();
242256
} catch (URISyntaxException e) {
243257
throw new RuntimeException(e);
244258
}

client-v2/src/main/java/com/clickhouse/client/api/internal/MapUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@ public static int getInt(Map<String, String> map, String key) {
4242
return 0;
4343
}
4444

45+
public static long getLong(Map<String, String> map, String key) {
46+
String val = map.get(key);
47+
if (val != null) {
48+
try {
49+
return Long.parseLong(val);
50+
} catch (NumberFormatException e) {
51+
throw new RuntimeException("Invalid value for key " + key + ": " + val, e);
52+
}
53+
}
54+
return 0;
55+
}
56+
4557
public static boolean getFlag(Map<String, String> map, String key) {
4658
String val = map.get(key);
4759
if (val == null) {
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package com.clickhouse.client;
2+
3+
import com.clickhouse.client.api.Client;
4+
import com.clickhouse.client.api.enums.ProxyType;
5+
import com.clickhouse.client.api.query.GenericRecord;
6+
import com.github.tomakehurst.wiremock.WireMockServer;
7+
import com.github.tomakehurst.wiremock.client.WireMock;
8+
import com.github.tomakehurst.wiremock.common.Slf4jNotifier;
9+
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
10+
import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener;
11+
import org.apache.hc.core5.net.URIBuilder;
12+
import org.testng.Assert;
13+
import org.testng.annotations.DataProvider;
14+
import org.testng.annotations.Test;
15+
16+
import java.net.Socket;
17+
import java.nio.ByteBuffer;
18+
import java.time.temporal.ChronoUnit;
19+
import java.util.List;
20+
import java.util.Random;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
public class ConnectionManagementTests extends BaseIntegrationTest{
24+
25+
26+
@Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider")
27+
public void testConnectionTTL(Long connectionTtl, Long keepAlive, int openSockets) throws Exception {
28+
ClickHouseNode server = getServer(ClickHouseProtocol.HTTP);
29+
30+
int proxyPort = new Random().nextInt(1000) + 10000;
31+
System.out.println("proxyPort: " + proxyPort);
32+
ConnectionCounterListener connectionCounter = new ConnectionCounterListener();
33+
WireMockServer proxy = new WireMockServer(WireMockConfiguration
34+
.options().port(proxyPort)
35+
.networkTrafficListener(connectionCounter)
36+
.notifier(new Slf4jNotifier(true)));
37+
proxy.start();
38+
URIBuilder targetURI = new URIBuilder(server.getBaseUri())
39+
.setPath("");
40+
proxy.addStubMapping(WireMock.post(WireMock.anyUrl())
41+
.willReturn(WireMock.aResponse().proxiedFrom(targetURI.build().toString())).build());
42+
43+
Client.Builder clientBuilder = new Client.Builder()
44+
.addEndpoint(server.getBaseUri())
45+
.setUsername("default")
46+
.setPassword(getPassword())
47+
.useNewImplementation(true)
48+
// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
49+
.addProxy(ProxyType.HTTP, "localhost", proxyPort);
50+
if (connectionTtl != null) {
51+
clientBuilder.setConnectionTTL(connectionTtl, ChronoUnit.MILLIS);
52+
}
53+
if (keepAlive != null) {
54+
clientBuilder.setKeepAliveTimeout(keepAlive, ChronoUnit.MILLIS);
55+
}
56+
57+
try (Client client = clientBuilder.build()) {
58+
List<GenericRecord> resp = client.queryAll("select 1");
59+
Assert.assertEquals(resp.stream().findFirst().get().getString(1), "1");
60+
61+
try {
62+
Thread.sleep(1000L);
63+
} catch (InterruptedException e) {
64+
Assert.fail("Unexpected exception", e);
65+
}
66+
67+
resp = client.queryAll("select 1");
68+
Assert.assertEquals(resp.stream().findFirst().get().getString(1), "1");
69+
} catch (Exception e) {
70+
e.printStackTrace();
71+
Assert.fail("Unexpected exception", e);
72+
} finally {
73+
Assert.assertEquals(connectionCounter.opened.get(), openSockets);
74+
proxy.stop();
75+
}
76+
}
77+
78+
@DataProvider(name = "testConnectionTTLProvider")
79+
public static Object[][] testConnectionTTLProvider() {
80+
return new Object[][] {
81+
{ 1000L, null, 2 },
82+
{ 2000L, null, 1 },
83+
{ null, 2000L, 1 },
84+
{ null, 500L, 2 },
85+
{ 1000L, 0L, 2 },
86+
{ 1000L, 3000L, 2}
87+
};
88+
}
89+
90+
private static class ConnectionCounterListener implements WiremockNetworkTrafficListener {
91+
92+
private AtomicInteger opened = new AtomicInteger(0);
93+
private AtomicInteger closed = new AtomicInteger(0);
94+
95+
@Override
96+
public void opened(Socket socket) {
97+
opened.incrementAndGet();
98+
System.out.println("Opened: " + socket);
99+
}
100+
101+
@Override
102+
public void incoming(Socket socket, ByteBuffer bytes) {
103+
104+
}
105+
106+
@Override
107+
public void outgoing(Socket socket, ByteBuffer bytes) {
108+
109+
}
110+
111+
@Override
112+
public void closed(Socket socket) {
113+
closed.incrementAndGet();
114+
System.out.println("Closed: " + socket);
115+
}
116+
}
117+
118+
}

0 commit comments

Comments
 (0)