Skip to content

Commit bf76f28

Browse files
committed
implemented network timeout. no tests
1 parent 751ddb1 commit bf76f28

File tree

8 files changed

+147
-28
lines changed

8 files changed

+147
-28
lines changed

client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.clickhouse.client.api.ClientConfigProperties;
66
import com.clickhouse.client.api.enums.Protocol;
77
import com.clickhouse.client.api.internal.ValidationUtils;
8+
import com.clickhouse.client.api.query.QuerySettings;
89
import org.apache.hc.core5.http.HttpHeaders;
910

1011
import java.util.Collection;
@@ -264,4 +265,27 @@ public InsertSettings logComment(String logComment) {
264265
public String getLogComment() {
265266
return logComment;
266267
}
268+
269+
public static InsertSettings merge(InsertSettings source, InsertSettings override) {
270+
InsertSettings merged = new InsertSettings();
271+
if (source != null) {
272+
merged.rawSettings.putAll(source.rawSettings);
273+
}
274+
if (override != null && override != source) {// avoid copying the literally same object
275+
merged.rawSettings.putAll(override.rawSettings);
276+
}
277+
return merged;
278+
}
279+
280+
public void setNetworkTimeout(Long networkTimeout) {
281+
if (networkTimeout != null) {
282+
rawSettings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), networkTimeout.intValue());
283+
} else {
284+
rawSettings.remove(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey());
285+
}
286+
}
287+
288+
public Long getNetworkTimeout() {
289+
return (Long) rawSettings.get(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey());
290+
}
267291
}

client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,18 @@ public String getLogComment() {
267267
return logComment;
268268
}
269269

270+
public void setNetworkTimeout(Long networkTimeout) {
271+
if (networkTimeout != null) {
272+
rawSettings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), networkTimeout.intValue());
273+
} else {
274+
rawSettings.remove(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey());
275+
}
276+
}
277+
278+
public Long getNetworkTimeout() {
279+
return (Long) rawSettings.get(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey());
280+
}
281+
270282
public static QuerySettings merge(QuerySettings source, QuerySettings override) {
271283
QuerySettings merged = new QuerySettings();
272284
if (source != null) {

jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22

33
import com.clickhouse.client.api.Client;
44
import com.clickhouse.client.api.ClientConfigProperties;
5+
import com.clickhouse.client.api.ClientException;
6+
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
57
import com.clickhouse.client.api.internal.ServerSettings;
68
import com.clickhouse.client.api.metadata.TableSchema;
79
import com.clickhouse.client.api.query.GenericRecord;
10+
import com.clickhouse.client.api.query.QueryResponse;
811
import com.clickhouse.client.api.query.QuerySettings;
912
import com.clickhouse.data.ClickHouseDataType;
13+
import com.clickhouse.data.ClickHouseFormat;
1014
import com.clickhouse.jdbc.internal.ExceptionUtils;
1115
import com.clickhouse.jdbc.internal.JdbcConfiguration;
1216
import com.clickhouse.jdbc.internal.JdbcUtils;
@@ -16,6 +20,7 @@
1620
import org.slf4j.Logger;
1721
import org.slf4j.LoggerFactory;
1822

23+
import java.io.Closeable;
1924
import java.sql.Array;
2025
import java.sql.Blob;
2126
import java.sql.CallableStatement;
@@ -43,10 +48,11 @@
4348
import java.util.Properties;
4449
import java.util.Set;
4550
import java.util.concurrent.Executor;
51+
import java.util.concurrent.TimeUnit;
4652
import java.util.stream.Collectors;
4753

4854
public class ConnectionImpl implements Connection, JdbcV2Wrapper {
49-
private static final Logger log = LoggerFactory.getLogger(ConnectionImpl.class);
55+
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImpl.class);
5056

5157
protected final String url;
5258
private final Client client; // this member is private to force using getClient()
@@ -65,9 +71,11 @@ public class ConnectionImpl implements Connection, JdbcV2Wrapper {
6571

6672
private final SqlParser sqlParser;
6773

74+
private Executor networkTimeoutExecutor;
75+
6876
public ConnectionImpl(String url, Properties info) throws SQLException {
6977
try {
70-
log.debug("Creating connection to {}", url);
78+
LOG.debug("Creating connection to {}", url);
7179
this.url = url;//Raw URL
7280
this.config = new JdbcConfiguration(url, info);
7381
this.onCluster = false;
@@ -86,10 +94,10 @@ public ConnectionImpl(String url, Properties info) throws SQLException {
8694
}
8795

8896
if (this.config.isDisableFrameworkDetection()) {
89-
log.debug("Framework detection is disabled.");
97+
LOG.debug("Framework detection is disabled.");
9098
} else {
9199
String detectedFrameworks = Driver.FrameworksDetection.getFrameworksDetected();
92-
log.debug("Detected frameworks: {}", detectedFrameworks);
100+
LOG.debug("Detected frameworks: {}", detectedFrameworks);
93101
if (!detectedFrameworks.trim().isEmpty()) {
94102
clientName += " (" + detectedFrameworks + ")";
95103
}
@@ -210,9 +218,8 @@ public void close() throws SQLException {
210218
if (isClosed()) {
211219
return;
212220
}
213-
214-
client.close();
215-
closed = true;
221+
closed = true; // mark as closed to prevent further invocations
222+
client.close(); // this will disrupt pending requests.
216223
}
217224

218225
@Override
@@ -597,27 +604,59 @@ public String getSchema() throws SQLException {
597604

598605
@Override
599606
public void abort(Executor executor) throws SQLException {
600-
if (!config.isIgnoreUnsupportedRequests()) {
601-
throw new SQLFeatureNotSupportedException("abort not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
607+
if (executor == null) {
608+
throw new SQLException("Executor must be not null");
602609
}
610+
// This method should check permissions with SecurityManager but the one is deprecated.
611+
// There is no replacement for SecurityManger and it is marked for removal.
612+
this.close();
603613
}
604614

605615
@Override
606616
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
607-
//TODO: Should this be supported?
608-
if (!config.isIgnoreUnsupportedRequests()) {
609-
throw new SQLFeatureNotSupportedException("setNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
617+
ensureOpen();
618+
619+
// Very good mail thread about this method implementation. https://mail.openjdk.org/pipermail/jdbc-spec-discuss/2017-November/000236.html
620+
621+
// This method should check permissions with SecurityManager but the one is deprecated.
622+
// There is no replacement for SecurityManger and it is marked for removal.
623+
if (milliseconds > 0 && executor == null) {
624+
// we need executor only for positive timeout values.
625+
throw new SQLException("Executor must be not null");
626+
}
627+
if (milliseconds < 0) {
628+
throw new SQLException("Timeout must be >= 0");
629+
}
630+
631+
// How it should work:
632+
// if timeout is set with this method then any timeout exception should be reported to the connection
633+
// when connection get signal about timeout it uses executor to abort itself
634+
// Some connection pools set timeout before calling Connection#close() to ensure that this operation will not hang
635+
// Socket timeout is propagated with QuerySettings this connection has.
636+
networkTimeoutExecutor = executor;
637+
defaultQuerySettings.setOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), (long)milliseconds);
638+
}
639+
640+
641+
// Should be called by child object to notify about timeout.
642+
public void onNetworkTimeout() throws SQLException {
643+
if (isClosed() || networkTimeoutExecutor == null) {
644+
return; // we closed already so do nothing.
610645
}
646+
647+
networkTimeoutExecutor.execute(() -> {
648+
try {
649+
this.abort(networkTimeoutExecutor);
650+
} catch (SQLException e) {
651+
throw new RuntimeException("Failed to abort connection", e);
652+
}
653+
});
611654
}
612655

613656
@Override
614657
public int getNetworkTimeout() throws SQLException {
615-
//TODO: Should this be supported?
616-
if (!config.isIgnoreUnsupportedRequests()) {
617-
throw new SQLFeatureNotSupportedException("getNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
618-
}
619-
620-
return -1;
658+
Long networkTimeout = defaultQuerySettings.getNetworkTimeout();
659+
return networkTimeout == null ? 0 : networkTimeout.intValue();
621660
}
622661

623662
/**

jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.Reader;
1616
import java.io.StringReader;
1717
import java.math.BigDecimal;
18+
import java.net.SocketTimeoutException;
1819
import java.net.URL;
1920
import java.nio.charset.StandardCharsets;
2021
import java.sql.Blob;
@@ -99,6 +100,9 @@ public boolean next() throws SQLException {
99100
try {
100101
return reader.next() != null;
101102
} catch (Exception e) {
103+
if (e instanceof SocketTimeoutException) {
104+
this.parentStatement.onNetworkTimeout();
105+
}
102106
throw ExceptionUtils.toSqlState(e);
103107
}
104108
}

jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
import java.net.SocketTimeoutException;
1415
import java.sql.ResultSet;
1516
import java.sql.SQLException;
1617
import java.sql.SQLWarning;
@@ -168,6 +169,10 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr
168169
}
169170
}
170171
onResultSetClosed(null);
172+
173+
if (e instanceof SocketTimeoutException) {
174+
this.connection.onNetworkTimeout();
175+
}
171176
throw ExceptionUtils.toSqlState(e);
172177
}
173178
}
@@ -203,6 +208,9 @@ protected long executeUpdateImpl(String sql, QuerySettings settings) throws SQLE
203208
updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned.
204209
lastQueryId = response.getQueryId();
205210
} catch (Exception e) {
211+
if (e instanceof SocketTimeoutException) {
212+
this.connection.onNetworkTimeout();
213+
}
206214
throw ExceptionUtils.toSqlState(e);
207215
}
208216

@@ -610,4 +618,9 @@ public long executeLargeUpdate(String sql, String[] columnNames) throws SQLExcep
610618
public String getLastQueryId() {
611619
return lastQueryId;
612620
}
621+
622+
// Proxy method for child objects. Do not call.
623+
public void onNetworkTimeout() throws SQLException {
624+
this.connection.onNetworkTimeout();
625+
}
613626
}

jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.InputStream;
1717
import java.io.Reader;
1818
import java.math.BigDecimal;
19+
import java.net.SocketTimeoutException;
1920
import java.net.URL;
2021
import java.sql.Array;
2122
import java.sql.Blob;
@@ -109,6 +110,9 @@ public long executeLargeUpdate() throws SQLException {
109110
try {
110111
writer.commitRow();
111112
} catch (Exception e) {
113+
if (e instanceof SocketTimeoutException) {
114+
this.connection.onNetworkTimeout();
115+
}
112116
throw new SQLException(e);
113117
}
114118

@@ -121,6 +125,9 @@ public long executeLargeUpdate() throws SQLException {
121125
updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned.
122126
lastQueryId = response.getQueryId();
123127
} catch (Exception e) {
128+
if (e instanceof SocketTimeoutException) {
129+
this.connection.onNetworkTimeout();
130+
}
124131
throw ExceptionUtils.toSqlState(e);
125132
} finally {
126133
try {
@@ -298,6 +305,9 @@ public void addBatch() throws SQLException {
298305
try {
299306
writer.commitRow();
300307
} catch (Exception e) {
308+
if (e instanceof SocketTimeoutException) {
309+
this.connection.onNetworkTimeout();
310+
}
301311
throw new SQLException(e);
302312
}
303313
}

jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.clickhouse.data.Tuple;
77
import com.clickhouse.jdbc.types.Array;
88
import com.google.common.collect.ImmutableMap;
9+
import org.slf4j.Logger;
910

1011
import java.awt.*;
1112
import java.math.BigInteger;
@@ -297,4 +298,14 @@ public static List<Object> convertList(List<Object> values, Class<?> type) throw
297298
}
298299
return convertedValues;
299300
}
301+
302+
public static void safeClose(AutoCloseable closeable, Logger logger) {
303+
if (closeable != null) {
304+
try {
305+
closeable.close();
306+
} catch (Exception ex) {
307+
logger.warn("Failed to close closeable after exception", ex);
308+
}
309+
}
310+
}
300311
}

jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@
2828
import java.util.Base64;
2929
import java.util.Properties;
3030
import java.util.UUID;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.TimeUnit;
3133

3234
import static org.testng.Assert.assertEquals;
3335
import static org.testng.Assert.assertNull;
3436
import static org.testng.Assert.assertThrows;
37+
import static org.testng.Assert.assertTrue;
3538
import static org.testng.Assert.fail;
3639

3740
public class ConnectionTest extends JdbcIntegrationTest {
@@ -384,20 +387,23 @@ public void setSchemaTest() throws SQLException {
384387

385388
@Test(groups = { "integration" })
386389
public void abortTest() throws SQLException {
387-
Connection localConnection = this.getJdbcConnection();
388-
assertThrows(SQLFeatureNotSupportedException.class, () -> localConnection.abort(null));
390+
try (Connection conn = this.getJdbcConnection()) {
391+
conn.abort(Executors.newSingleThreadExecutor());
392+
assertTrue(conn.isClosed());
393+
}
389394
}
390395

391396
@Test(groups = { "integration" })
392-
public void setNetworkTimeoutTest() throws SQLException {
393-
Connection localConnection = this.getJdbcConnection();
394-
assertThrows(SQLFeatureNotSupportedException.class, () -> localConnection.setNetworkTimeout(null, 0));
395-
}
397+
public void testNetworkTimeout() throws SQLException {
398+
try {
399+
Connection conn = this.getJdbcConnection();
400+
int t1 = (int) TimeUnit.SECONDS.toMillis(20);
401+
conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), t1);
402+
Assert.assertEquals(t1, conn.getNetworkTimeout());
396403

397-
@Test(groups = { "integration" })
398-
public void getNetworkTimeoutTest() throws SQLException {
399-
Connection localConnection = this.getJdbcConnection();
400-
assertThrows(SQLFeatureNotSupportedException.class, localConnection::getNetworkTimeout);
404+
} catch (Exception e) {
405+
406+
}
401407
}
402408

403409
@Test(groups = { "integration" })

0 commit comments

Comments
 (0)