Skip to content

Commit c5c4ece

Browse files
authored
Merge pull request #2522 from ClickHouse/jdbc_connection_impl
[JDBC] NetworkTimeout
2 parents e2cdc0d + 2c7976b commit c5c4ece

File tree

11 files changed

+248
-57
lines changed

11 files changed

+248
-57
lines changed

client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.clickhouse.client.api.internal.CommonSettings;
66
import org.apache.hc.core5.http.HttpHeaders;
77

8+
import java.time.temporal.ChronoUnit;
89
import java.util.Collection;
910
import java.util.Map;
1011

@@ -27,6 +28,11 @@ public InsertSettings(Map<String, Object> settings) {
2728
}
2829
}
2930

31+
private InsertSettings(CommonSettings settings) {
32+
this.settings = settings;
33+
setDefaults();
34+
}
35+
3036
private void setDefaults() {// Default settings, for now a very small list
3137
this.setInputStreamCopyBufferSize(DEFAULT_INPUT_STREAM_BATCH_SIZE);
3238
}
@@ -274,4 +280,28 @@ public InsertSettings logComment(String logComment) {
274280
public String getLogComment() {
275281
return settings.getLogComment();
276282
}
283+
284+
public static InsertSettings merge(InsertSettings source, InsertSettings override) {
285+
CommonSettings mergedSettings = source.settings.copyAndMerge(override.settings);
286+
InsertSettings insertSettings = new InsertSettings(mergedSettings);
287+
insertSettings.setInputStreamCopyBufferSize(override.getInputStreamCopyBufferSize());
288+
return insertSettings;
289+
}
290+
291+
/**
292+
* Sets a network operation timeout.
293+
* @param timeout
294+
* @param unit
295+
*/
296+
public void setNetworkTimeout(long timeout, ChronoUnit unit) {
297+
settings.setNetworkTimeout(timeout, unit);
298+
}
299+
300+
/**
301+
* Returns network timeout. Zero value is returned if no timeout is set.
302+
* @return timeout in ms.
303+
*/
304+
public Long getNetworkTimeout() {
305+
return settings.getNetworkTimeout();
306+
}
277307
}

client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.clickhouse.client.api.Client;
44
import com.clickhouse.client.api.ClientConfigProperties;
55

6+
import java.time.Duration;
7+
import java.time.temporal.ChronoUnit;
68
import java.util.Collection;
79
import java.util.HashMap;
810
import java.util.Map;
@@ -217,6 +219,25 @@ public String getLogComment() {
217219
return logComment;
218220
}
219221

222+
/**
223+
* Sets a network operation timeout.
224+
* @param timeout
225+
* @param unit
226+
*/
227+
public void setNetworkTimeout(long timeout, ChronoUnit unit) {
228+
settings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), Duration.of(timeout, unit).toMillis());
229+
}
230+
231+
/**
232+
* Returns network timeout. Zero value is returned if no timeout is set.
233+
* @return timeout in ms.
234+
*/
235+
public Long getNetworkTimeout() {
236+
return (Long) getOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(),
237+
ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getDefaultValue());
238+
}
239+
240+
220241
public CommonSettings copyAndMerge(CommonSettings override) {
221242
CommonSettings copy = new CommonSettings();
222243
copy.settings.putAll(settings);

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,6 @@ public class HttpAPIClientHelper {
110110

111111
private final CloseableHttpClient httpClient;
112112

113-
private final RequestConfig baseRequestConfig;
114-
115113
private String proxyAuthHeaderValue;
116114

117115
private final Set<ClientFaultCause> defaultRetryCauses;
@@ -125,11 +123,6 @@ public HttpAPIClientHelper(Map<String, Object> configuration, Object metricsRegi
125123
this.metricsRegistry = metricsRegistry;
126124
this.httpClient = createHttpClient(initSslContext, configuration);
127125

128-
RequestConfig.Builder reqConfBuilder = RequestConfig.custom();
129-
reqConfBuilder.setConnectionRequestTimeout(ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(configuration), TimeUnit.MILLISECONDS);
130-
131-
this.baseRequestConfig = reqConfBuilder.build();
132-
133126
boolean usingClientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(configuration);
134127
boolean usingServerCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(configuration);
135128
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(configuration);
@@ -438,12 +431,19 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
438431
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);
439432
boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig);
440433

441-
req.setConfig(baseRequestConfig);
434+
442435
// setting entity. wrapping if compression is enabled
443436
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback),
444437
clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig));
445438

446439
HttpClientContext context = HttpClientContext.create();
440+
Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig);
441+
Number connectionReqTimeout = ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(requestConfig);
442+
RequestConfig reqHttpConf = RequestConfig.custom()
443+
.setResponseTimeout(responseTimeout.longValue(), TimeUnit.MILLISECONDS)
444+
.setConnectionRequestTimeout(connectionReqTimeout.longValue(), TimeUnit.MILLISECONDS)
445+
.build();
446+
context.setRequestConfig(reqHttpConf);
447447

448448
ClassicHttpResponse httpResponse = null;
449449
try {

client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import com.clickhouse.client.api.internal.ValidationUtils;
99
import com.clickhouse.data.ClickHouseFormat;
1010

11+
import java.time.temporal.ChronoUnit;
1112
import java.util.Collection;
12-
import java.util.HashMap;
1313
import java.util.Map;
1414
import java.util.TimeZone;
1515

@@ -269,6 +269,23 @@ public String getLogComment() {
269269
return settings.getLogComment();
270270
}
271271

272+
/**
273+
* Sets a network operation timeout.
274+
* @param timeout
275+
* @param unit
276+
*/
277+
public void setNetworkTimeout(long timeout, ChronoUnit unit) {
278+
settings.setNetworkTimeout(timeout, unit);
279+
}
280+
281+
/**
282+
* Returns network timeout. Zero value is returned if no timeout is set.
283+
* @return timeout in ms.
284+
*/
285+
public Long getNetworkTimeout() {
286+
return settings.getNetworkTimeout();
287+
}
288+
272289
public static QuerySettings merge(QuerySettings source, QuerySettings override) {
273290
CommonSettings mergedSettings = source.settings.copyAndMerge(override.settings);
274291
return new QuerySettings(mergedSettings);

client-v2/src/test/java/com/clickhouse/client/SettingsTests.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import org.testng.Assert;
77
import org.testng.annotations.Test;
88

9+
import java.time.temporal.ChronoUnit;
910
import java.util.Arrays;
1011
import java.util.Collections;
1112
import java.util.List;
13+
import java.util.concurrent.TimeUnit;
1214

1315
@Test(groups = {"unit"})
1416
public class SettingsTests {
@@ -22,15 +24,28 @@ void testClientSettings() {
2224
}
2325

2426
@Test
25-
void testMergeQuerySettings() {
26-
QuerySettings settings1 = new QuerySettings().setQueryId("test1").httpHeader("key1", "value1");
27-
QuerySettings settings2 = new QuerySettings().httpHeader("key1", "value2");
27+
void testMergeSettings() {
28+
{
29+
QuerySettings settings1 = new QuerySettings().setQueryId("test1").httpHeader("key1", "value1");
30+
QuerySettings settings2 = new QuerySettings().httpHeader("key1", "value2");
31+
32+
QuerySettings merged = QuerySettings.merge(settings1, settings2);
33+
Assert.assertNotSame(merged, settings1);
34+
Assert.assertNotSame(merged, settings2);
2835

29-
QuerySettings merged = QuerySettings.merge(settings1, settings2);
30-
Assert.assertNotSame(merged, settings1);
31-
Assert.assertNotSame(merged, settings2);
36+
Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2");
37+
}
38+
{
39+
InsertSettings settings1 = new InsertSettings().setQueryId("test1").httpHeader("key1", "value1");
40+
InsertSettings settings2 = new InsertSettings().httpHeader("key1", "value2").setInputStreamCopyBufferSize(200000);
3241

33-
Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2");
42+
InsertSettings merged = InsertSettings.merge(settings1, settings2);
43+
Assert.assertNotSame(merged, settings1);
44+
Assert.assertNotSame(merged, settings2);
45+
46+
Assert.assertEquals(merged.getInputStreamCopyBufferSize(), settings2.getInputStreamCopyBufferSize());
47+
Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2");
48+
}
3449
}
3550

3651
@Test
@@ -87,6 +102,12 @@ void testQuerySettingsSpecific() throws Exception {
87102
settings.logComment(null);
88103
Assert.assertNull(settings.getLogComment());
89104
}
105+
106+
{
107+
final QuerySettings settings = new QuerySettings();
108+
settings.setNetworkTimeout(10, ChronoUnit.SECONDS);
109+
Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10));
110+
}
90111
}
91112

92113
@Test
@@ -140,5 +161,11 @@ public void testInsertSettingsSpecific() throws Exception {
140161
settings.logComment(null);
141162
Assert.assertNull(settings.getLogComment());
142163
}
164+
165+
{
166+
final InsertSettings settings = new InsertSettings();
167+
settings.setNetworkTimeout(10, ChronoUnit.SECONDS);
168+
Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10));
169+
}
143170
}
144171
}

jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.sql.Statement;
3535
import java.sql.Struct;
3636
import java.time.Duration;
37+
import java.time.temporal.ChronoUnit;
3738
import java.util.Arrays;
3839
import java.util.Calendar;
3940
import java.util.Collections;
@@ -46,7 +47,7 @@
4647
import java.util.stream.Collectors;
4748

4849
public class ConnectionImpl implements Connection, JdbcV2Wrapper {
49-
private static final Logger log = LoggerFactory.getLogger(ConnectionImpl.class);
50+
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImpl.class);
5051

5152
protected final String url;
5253
private final Client client; // this member is private to force using getClient()
@@ -65,6 +66,8 @@ public class ConnectionImpl implements Connection, JdbcV2Wrapper {
6566

6667
private final SqlParser sqlParser;
6768

69+
private Executor networkTimeoutExecutor;
70+
6871
public ConnectionImpl(String url, Properties info) throws SQLException {
6972
try {
7073
this.url = url;//Raw URL
@@ -85,10 +88,10 @@ public ConnectionImpl(String url, Properties info) throws SQLException {
8588
}
8689

8790
if (this.config.isDisableFrameworkDetection()) {
88-
log.debug("Framework detection is disabled.");
91+
LOG.debug("Framework detection is disabled.");
8992
} else {
9093
String detectedFrameworks = Driver.FrameworksDetection.getFrameworksDetected();
91-
log.debug("Detected frameworks: {}", detectedFrameworks);
94+
LOG.debug("Detected frameworks: {}", detectedFrameworks);
9295
if (!detectedFrameworks.trim().isEmpty()) {
9396
clientName += " (" + detectedFrameworks + ")";
9497
}
@@ -213,9 +216,8 @@ public void close() throws SQLException {
213216
if (isClosed()) {
214217
return;
215218
}
216-
217-
client.close();
218-
closed = true;
219+
closed = true; // mark as closed to prevent further invocations
220+
client.close(); // this will disrupt pending requests.
219221
}
220222

221223
@Override
@@ -600,27 +602,58 @@ public String getSchema() throws SQLException {
600602

601603
@Override
602604
public void abort(Executor executor) throws SQLException {
603-
if (!config.isIgnoreUnsupportedRequests()) {
604-
throw new SQLFeatureNotSupportedException("abort not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
605+
if (executor == null) {
606+
throw new SQLException("Executor must be not null");
605607
}
608+
// This method should check permissions with SecurityManager but the one is deprecated.
609+
// There is no replacement for SecurityManger and it is marked for removal.
610+
this.close();
606611
}
607612

608613
@Override
609614
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
610-
//TODO: Should this be supported?
611-
if (!config.isIgnoreUnsupportedRequests()) {
612-
throw new SQLFeatureNotSupportedException("setNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
615+
ensureOpen();
616+
617+
// Very good mail thread about this method implementation. https://mail.openjdk.org/pipermail/jdbc-spec-discuss/2017-November/000236.html
618+
619+
// This method should check permissions with SecurityManager but the one is deprecated.
620+
// There is no replacement for SecurityManger and it is marked for removal.
621+
if (milliseconds > 0 && executor == null) {
622+
// we need executor only for positive timeout values.
623+
throw new SQLException("Executor must be not null");
624+
}
625+
if (milliseconds < 0) {
626+
throw new SQLException("Timeout must be >= 0");
627+
}
628+
629+
// How it should work:
630+
// if timeout is set with this method then any timeout exception should be reported to the connection
631+
// when connection get signal about timeout it uses executor to abort itself
632+
// Some connection pools set timeout before calling Connection#close() to ensure that this operation will not hang
633+
// Socket timeout is propagated with QuerySettings this connection has.
634+
networkTimeoutExecutor = executor;
635+
defaultQuerySettings.setNetworkTimeout(milliseconds, ChronoUnit.MILLIS);
636+
}
637+
638+
639+
// Should be called by child object to notify about timeout.
640+
public synchronized void onNetworkTimeout() {
641+
if (this.closed || networkTimeoutExecutor == null) {
642+
return; // we closed already or have not set network timeout so do nothing.
613643
}
644+
645+
networkTimeoutExecutor.execute(() -> {
646+
try {
647+
this.abort(networkTimeoutExecutor);
648+
} catch (SQLException e) {
649+
throw new RuntimeException("Failed to abort connection", e);
650+
}
651+
});
614652
}
615653

616654
@Override
617655
public int getNetworkTimeout() throws SQLException {
618-
//TODO: Should this be supported?
619-
if (!config.isIgnoreUnsupportedRequests()) {
620-
throw new SQLFeatureNotSupportedException("getNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
621-
}
622-
623-
return -1;
656+
return defaultQuerySettings.getNetworkTimeout().intValue();
624657
}
625658

626659
/**

0 commit comments

Comments
 (0)