Skip to content

Commit 5b3917f

Browse files
authored
Merge pull request #2046 from ClickHouse/fix-connection-leak
Adjusting cancel and ensure close
2 parents 753c989 + 0888d5c commit 5b3917f

File tree

4 files changed

+99
-23
lines changed

4 files changed

+99
-23
lines changed

jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.io.Reader;
77
import java.math.BigDecimal;
88
import java.net.MalformedURLException;
9+
import java.net.SocketException;
910
import java.net.URL;
1011
import java.nio.charset.StandardCharsets;
1112
import java.sql.*;
@@ -68,23 +69,34 @@ public boolean next() throws SQLException {
6869
@Override
6970
public void close() throws SQLException {
7071
closed = true;
71-
if (reader != null) {
72-
try {
73-
reader.close();
74-
} catch (Exception e) {
75-
throw ExceptionUtils.toSqlState(e);
76-
}
7772

78-
reader = null;
73+
Exception e = null;
74+
try {
75+
if (reader != null) {
76+
try {
77+
reader.close();
78+
} catch (Exception re) {
79+
log.debug("Error closing reader", re);
80+
e = re;
81+
} finally {
82+
reader = null;
83+
}
84+
}
85+
} finally {
86+
if (response != null) {
87+
try {
88+
response.close();
89+
} catch (Exception re) {
90+
log.debug("Error closing response", re);
91+
e = re;
92+
} finally {
93+
response = null;
94+
}
95+
}
7996
}
8097

81-
if (response != null) {
82-
try {
83-
response.close();
84-
} catch (Exception e) {
85-
throw ExceptionUtils.toSqlState(e);
86-
}
87-
response = null;
98+
if (e != null) {
99+
throw ExceptionUtils.toSqlState(e);
88100
}
89101
}
90102

jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Arrays;
2323
import java.util.Collections;
2424
import java.util.List;
25+
import java.util.UUID;
2526
import java.util.concurrent.TimeUnit;
2627

2728
public class StatementImpl implements Statement, JdbcV2Wrapper {
@@ -34,7 +35,7 @@ public class StatementImpl implements Statement, JdbcV2Wrapper {
3435
private OperationMetrics metrics;
3536
private List<String> batch;
3637
private String lastSql;
37-
private String lastQueryId;
38+
private volatile String lastQueryId;
3839
private String schema;
3940
private int maxRows;
4041

@@ -146,6 +147,14 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL
146147
checkClosed();
147148
QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings);
148149

150+
if (mergedSettings.getQueryId() != null) {
151+
lastQueryId = mergedSettings.getQueryId();
152+
} else {
153+
lastQueryId = UUID.randomUUID().toString();
154+
mergedSettings.setQueryId(lastQueryId);
155+
}
156+
LOG.debug("Query ID: {}", lastQueryId);
157+
149158
try {
150159
lastSql = parseJdbcEscapeSyntax(sql);
151160
QueryResponse response;
@@ -169,7 +178,6 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL
169178
}
170179
currentResultSet = new ResultSetImpl(this, response, reader);
171180
metrics = response.getMetrics();
172-
lastQueryId = response.getQueryId();
173181
} catch (Exception e) {
174182
throw ExceptionUtils.toSqlState(e);
175183
}
@@ -193,6 +201,13 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException
193201

194202
QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings);
195203

204+
if (mergedSettings.getQueryId() != null) {
205+
lastQueryId = mergedSettings.getQueryId();
206+
} else {
207+
lastQueryId = UUID.randomUUID().toString();
208+
mergedSettings.setQueryId(lastQueryId);
209+
}
210+
196211
lastSql = parseJdbcEscapeSyntax(sql);
197212
int updateCount = 0;
198213
try (QueryResponse response = queryTimeout == 0 ? connection.client.query(lastSql, mergedSettings).get()
@@ -212,8 +227,13 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException
212227
public void close() throws SQLException {
213228
closed = true;
214229
if (currentResultSet != null) {
215-
currentResultSet.close();
216-
currentResultSet = null;
230+
try {
231+
currentResultSet.close();
232+
} catch (Exception e) {
233+
LOG.debug("Failed to close current result set", e);
234+
} finally {
235+
currentResultSet = null;
236+
}
217237
}
218238
}
219239

@@ -267,10 +287,10 @@ public void cancel() throws SQLException {
267287
return;
268288
}
269289

270-
try {
271-
connection.client.query(String.format("KILL QUERY%sWHERE query_id = '%s'",
272-
connection.onCluster ? " ON CLUSTER " + connection.cluster + " " : " ",
273-
lastQueryId), connection.getDefaultQuerySettings()).get();
290+
try (QueryResponse response = connection.client.query(String.format("KILL QUERY%sWHERE query_id = '%s'",
291+
connection.onCluster ? " ON CLUSTER " + connection.cluster + " " : " ",
292+
lastQueryId), connection.getDefaultQuerySettings()).get()){
293+
LOG.debug("Query {} was killed by {}", lastQueryId, response.getQueryId());
274294
} catch (Exception e) {
275295
throw new SQLException(e);
276296
}
@@ -298,7 +318,7 @@ public boolean execute(String sql) throws SQLException {
298318
return execute(sql, new QuerySettings().setDatabase(schema));
299319
}
300320

301-
private boolean execute(String sql, QuerySettings settings) throws SQLException {
321+
public boolean execute(String sql, QuerySettings settings) throws SQLException {
302322
checkClosed();
303323
StatementType type = parseStatementType(sql);
304324

jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/ExceptionUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,12 @@ public static SQLException toSqlState(String message, Exception cause) {
6161

6262
return new SQLException(exceptionMessage, SQL_STATE_CLIENT_ERROR, cause);//Default
6363
}
64+
65+
public static Throwable getRootCause(Throwable throwable) {
66+
Throwable cause = throwable;
67+
while (cause.getCause() != null) {
68+
cause = cause.getCause();
69+
}
70+
return cause;
71+
}
6472
}

jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
import com.clickhouse.client.api.ClientConfigProperties;
44
import com.clickhouse.client.api.query.GenericRecord;
5+
import com.clickhouse.client.api.query.QuerySettings;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.testng.Assert;
59
import com.clickhouse.data.ClickHouseVersion;
610
import org.apache.commons.lang3.RandomStringUtils;
711
import org.testng.annotations.Test;
@@ -18,6 +22,7 @@
1822
import java.util.Arrays;
1923
import java.util.List;
2024
import java.util.Properties;
25+
import java.util.UUID;
2126

2227
import static org.testng.Assert.assertEquals;
2328
import static org.testng.Assert.assertFalse;
@@ -27,6 +32,8 @@
2732

2833

2934
public class StatementTest extends JdbcIntegrationTest {
35+
private static final Logger log = LoggerFactory.getLogger(StatementTest.class);
36+
3037
@Test(groups = { "integration" })
3138
public void testExecuteQuerySimpleNumbers() throws Exception {
3239
try (Connection conn = getJdbcConnection()) {
@@ -492,4 +499,33 @@ public void testConnectionExhaustion() throws Exception {
492499
}
493500
}
494501
}
502+
503+
@Test(groups = { "integration" })
504+
public void testConcurrentCancel() throws Exception {
505+
int maxNumConnections = 3;
506+
507+
try (Connection conn = getJdbcConnection()) {
508+
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
509+
stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000");
510+
stmt.cancel();
511+
}
512+
for (int i = 0; i < maxNumConnections; i++) {
513+
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
514+
final int threadNum = i;
515+
log.info("Starting thread {}", threadNum);
516+
new Thread(() -> {
517+
try {
518+
ResultSet rs = stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000");
519+
rs.next();
520+
log.info(rs.getObject(1).toString());
521+
} catch (SQLException e) {
522+
log.error("Error in thread {}", threadNum, e);
523+
}
524+
}).start();
525+
526+
stmt.cancel();
527+
}
528+
}
529+
}
530+
}
495531
}

0 commit comments

Comments
 (0)