Skip to content

Commit 779b7d2

Browse files
committed
fixed concurrent access to request inside ClickHouseStatementImpl
1 parent a386876 commit 779b7d2

File tree

3 files changed

+118
-64
lines changed

3 files changed

+118
-64
lines changed

clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -660,35 +660,4 @@ public void testMutation() {
660660
Assert.assertEquals(request.getQuery(), expectedSql);
661661
Assert.assertEquals(request.getStatements().get(0), expectedSql);
662662
}
663-
664-
@Test
665-
public void testConcurrentUse() {
666-
ClickHouseRequest<?> request = ClickHouseClient.newInstance().read(ClickHouseNode.builder().build());
667-
Assert.assertNotNull(request);
668-
669-
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
670-
671-
final List<ClickHouseRequest<?>> sealedRequests = new ArrayList<>();
672-
673-
for (int i = 0; i < 3; i++) {
674-
executor.scheduleWithFixedDelay(() -> {
675-
String queryID = UUID.randomUUID().toString();
676-
// System.out.println(System.currentTimeMillis() + " Thread " + Thread.currentThread().getId() + " qId: " + queryID);
677-
sealedRequests.add(request.query("select 1", queryID).seal());
678-
}, 100, 100, TimeUnit.MILLISECONDS);
679-
}
680-
681-
try {
682-
Thread.sleep(1000);
683-
} catch (InterruptedException e) {
684-
Assert.fail("Thread sleep interrupted");
685-
}
686-
executor.shutdown();
687-
final Set<String> queryIDs = new HashSet<>();
688-
for (ClickHouseRequest<?> r : sealedRequests) {
689-
// System.out.println(queryIDs);
690-
Assert.assertTrue(queryIDs.add(r.getQueryId().get()), "Query ID should be unique: " +
691-
r.getQueryId().get());
692-
}
693-
}
694663
}

clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,15 @@
11
package com.clickhouse.jdbc.internal;
22

3-
import java.io.IOException;
4-
import java.io.InputStream;
5-
import java.io.OutputStream;
6-
import java.io.Serializable;
7-
import java.nio.file.Path;
8-
import java.sql.ResultSet;
9-
import java.sql.SQLException;
10-
import java.sql.SQLFeatureNotSupportedException;
11-
import java.sql.SQLWarning;
12-
import java.sql.Statement;
13-
import java.util.*;
14-
import java.util.Map.Entry;
15-
import java.util.concurrent.TimeUnit;
16-
import java.util.concurrent.TimeoutException;
17-
183
import com.clickhouse.client.ClickHouseClient;
194
import com.clickhouse.client.ClickHouseConfig;
205
import com.clickhouse.client.ClickHouseException;
216
import com.clickhouse.client.ClickHouseNode;
227
import com.clickhouse.client.ClickHouseRequest;
8+
import com.clickhouse.client.ClickHouseRequest.Mutation;
239
import com.clickhouse.client.ClickHouseResponse;
2410
import com.clickhouse.client.ClickHouseResponseSummary;
2511
import com.clickhouse.client.ClickHouseSimpleResponse;
2612
import com.clickhouse.client.ClickHouseTransaction;
27-
import com.clickhouse.client.ClickHouseRequest.Mutation;
2813
import com.clickhouse.client.config.ClickHouseClientOption;
2914
import com.clickhouse.client.config.ClickHouseDefaults;
3015
import com.clickhouse.config.ClickHouseConfigChangeListener;
@@ -42,16 +27,37 @@
4227
import com.clickhouse.data.ClickHouseOutputStream;
4328
import com.clickhouse.data.ClickHouseUtils;
4429
import com.clickhouse.data.ClickHouseValues;
45-
import com.clickhouse.logging.Logger;
46-
import com.clickhouse.logging.LoggerFactory;
4730
import com.clickhouse.jdbc.ClickHouseConnection;
4831
import com.clickhouse.jdbc.ClickHouseResultSet;
4932
import com.clickhouse.jdbc.ClickHouseStatement;
5033
import com.clickhouse.jdbc.JdbcTypeMapping;
51-
import com.clickhouse.jdbc.SqlExceptionUtils;
5234
import com.clickhouse.jdbc.JdbcWrapper;
35+
import com.clickhouse.jdbc.SqlExceptionUtils;
5336
import com.clickhouse.jdbc.parser.ClickHouseSqlStatement;
5437
import com.clickhouse.jdbc.parser.StatementType;
38+
import com.clickhouse.logging.Logger;
39+
import com.clickhouse.logging.LoggerFactory;
40+
41+
import java.io.IOException;
42+
import java.io.InputStream;
43+
import java.io.OutputStream;
44+
import java.io.Serializable;
45+
import java.nio.file.Path;
46+
import java.sql.ResultSet;
47+
import java.sql.SQLException;
48+
import java.sql.SQLFeatureNotSupportedException;
49+
import java.sql.SQLWarning;
50+
import java.sql.Statement;
51+
import java.util.ArrayList;
52+
import java.util.Arrays;
53+
import java.util.HashSet;
54+
import java.util.LinkedList;
55+
import java.util.List;
56+
import java.util.Map;
57+
import java.util.Map.Entry;
58+
import java.util.concurrent.TimeUnit;
59+
import java.util.concurrent.TimeoutException;
60+
import java.util.function.Function;
5561

5662
public class ClickHouseStatementImpl extends JdbcWrapper
5763
implements ClickHouseConfigChangeListener<ClickHouseRequest<?>>, ClickHouseStatement {
@@ -130,11 +136,9 @@ private ClickHouseResponse getLastResponse(Map<ClickHouseOption, Serializable> o
130136
request.set("_set_roles_stmt", requestRoles);
131137
}
132138

133-
request.query(stmt.getSQL(), queryId = connection.newQueryId());
134139
// TODO skip useless queries to reduce network calls and server load
135140
try {
136-
response = autoTx ? request.executeWithinTransaction(connection.isImplicitTransactionSupported())
137-
: request.transaction(connection.getTransaction()).executeAndWait();
141+
response = sendRequest(stmt.getSQL(), r -> r);
138142
} catch (Exception e) {
139143
throw SqlExceptionUtils.handle(e);
140144
} finally {
@@ -272,7 +276,6 @@ protected ClickHouseResponse processSqlStatement(ClickHouseSqlStatement stmt) th
272276

273277
protected ClickHouseResponse executeStatement(String stmt, Map<ClickHouseOption, Serializable> options,
274278
List<ClickHouseExternalTable> tables, Map<String, String> settings) throws SQLException {
275-
boolean autoTx = connection.getAutoCommit() && connection.isTransactionSupported();
276279
try {
277280
if (options != null) {
278281
request.options(options);
@@ -310,9 +313,8 @@ protected ClickHouseResponse executeStatement(String stmt, Map<ClickHouseOption,
310313
}
311314
request.external(list);
312315
}
313-
request.query(stmt, queryId = connection.newQueryId());
314-
return autoTx ? request.executeWithinTransaction(connection.isImplicitTransactionSupported())
315-
: request.transaction(connection.getTransaction()).executeAndWait();
316+
317+
return sendRequest(stmt, r -> r);
316318
} catch (Exception e) {
317319
throw SqlExceptionUtils.handle(e);
318320
}
@@ -328,18 +330,61 @@ protected ClickHouseResponse executeStatement(ClickHouseSqlStatement stmt,
328330
return executeStatement(stmt.getSQL(), options, tables, settings);
329331
}
330332

331-
protected int executeInsert(String sql, InputStream input) throws SQLException {
333+
private ClickHouseResponse sendRequest(String sql, Function<ClickHouseRequest<?>, ClickHouseRequest<?>> preSeal) throws SQLException {
332334
boolean autoTx = connection.getAutoCommit() && connection.isTransactionSupported();
333-
Mutation req = request.write().query(sql, queryId = connection.newQueryId()).data(input);
334-
try (ClickHouseResponse resp = autoTx
335-
? req.executeWithinTransaction(connection.isImplicitTransactionSupported())
336-
: req.transaction(connection.getTransaction()).executeAndWait();
337-
ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp)) {
338-
// ignore
335+
336+
ClickHouseRequest<?> req;
337+
ClickHouseTransaction tx = null;
338+
synchronized (request) {
339+
try {
340+
if (autoTx) {
341+
if (connection.isImplicitTransactionSupported()) {
342+
request.set(ClickHouseTransaction.SETTING_IMPLICIT_TRANSACTION, 1).transaction(null);
343+
} else {
344+
tx = request.getManager().createImplicitTransaction(request);
345+
request.transaction(connection.getTransaction());
346+
}
347+
} else {
348+
try {
349+
request.transaction(connection.getTransaction());
350+
} catch (ClickHouseException e) {
351+
throw SqlExceptionUtils.handle(e);
352+
}
353+
}
354+
355+
req = preSeal.apply(request).query(sql, queryId = connection.newQueryId()).seal();
356+
} catch (Exception e) {
357+
throw SqlExceptionUtils.handle(e);
358+
}
359+
}
360+
361+
try {
362+
return req.executeAndWait();
339363
} catch (Exception e) {
364+
if (tx != null) {
365+
try {
366+
tx.rollback();
367+
} catch (Exception ex) {
368+
log.warn("Failed to rollback transaction", ex);
369+
}
370+
}
340371
throw SqlExceptionUtils.handle(e);
372+
} finally {
373+
try {
374+
request.transaction(null);
375+
} catch (Exception e) {
376+
throw SqlExceptionUtils.handle(ClickHouseException.of(e, req.getServer()));
377+
}
341378
}
379+
}
342380

381+
protected int executeInsert(String sql, InputStream input) throws SQLException {
382+
try (ClickHouseResponse response = sendRequest(sql, r -> r.write().data(input));
383+
ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), response)) {
384+
// no more actions needed
385+
} catch (Exception e) {
386+
throw SqlExceptionUtils.handle(e);
387+
}
343388
return (int) currentUpdateCount;
344389
}
345390

clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.time.OffsetDateTime;
2121
import java.time.ZoneId;
2222
import java.time.ZonedDateTime;
23+
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Calendar;
2526
import java.util.Collections;
@@ -30,9 +31,13 @@
3031
import java.util.TimeZone;
3132
import java.util.UUID;
3233
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.ScheduledExecutorService;
3336
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
3438

3539
import com.clickhouse.client.ClickHouseClient;
40+
import com.clickhouse.client.ClickHouseException;
3641
import com.clickhouse.client.ClickHouseParameterizedQuery;
3742
import com.clickhouse.client.ClickHouseProtocol;
3843
import com.clickhouse.client.ClickHouseRequest;
@@ -48,6 +53,7 @@
4853
import com.clickhouse.data.value.UnsignedShort;
4954

5055
import org.roaringbitmap.longlong.Roaring64NavigableMap;
56+
import org.testcontainers.shaded.org.checkerframework.checker.units.qual.A;
5157
import org.testng.Assert;
5258
import org.testng.SkipException;
5359
import org.testng.annotations.DataProvider;
@@ -1392,4 +1398,38 @@ public void testTimeZone(boolean useBinary) throws SQLException {
13921398
}
13931399
}
13941400
}
1401+
1402+
@Test(groups = "integration")
1403+
public void testMultiThreadedExecution() throws Exception {
1404+
Properties props = new Properties();
1405+
try (ClickHouseConnection conn = newConnection(props);
1406+
ClickHouseStatement stmt = conn.createStatement()) {
1407+
1408+
1409+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
1410+
1411+
final AtomicBoolean failed = new AtomicBoolean(false);
1412+
for (int i = 0; i < 3; i++) {
1413+
executor.scheduleWithFixedDelay(() -> {
1414+
try {
1415+
stmt.execute("select 1");
1416+
} catch (Exception e) {
1417+
e.printStackTrace();
1418+
failed.set(true);
1419+
}
1420+
}, 100, 100, TimeUnit.MILLISECONDS);
1421+
}
1422+
1423+
try {
1424+
Thread.sleep(1000);
1425+
} catch (Exception e) {
1426+
Assert.fail("Test interrupted", e);
1427+
}
1428+
1429+
executor.shutdown();
1430+
executor.awaitTermination(10, TimeUnit.SECONDS);
1431+
1432+
Assert.assertFalse(failed.get());
1433+
}
1434+
}
13951435
}

0 commit comments

Comments
 (0)