Skip to content

Commit 66f8bce

Browse files
authored
Merge pull request #1706 from ClickHouse/fix_duplicate_queryId
Fix duplicate query ID error
2 parents b38866e + 2ba1a7f commit 66f8bce

File tree

4 files changed

+129
-48
lines changed

4 files changed

+129
-48
lines changed

clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,17 @@
1111
import java.util.ArrayList;
1212
import java.util.Collections;
1313
import java.util.HashMap;
14+
import java.util.HashSet;
1415
import java.util.List;
1516
import java.util.Map;
1617
import java.util.Optional;
1718
import java.util.Properties;
19+
import java.util.Set;
1820
import java.util.UUID;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
1925

2026
import org.testng.Assert;
2127
import org.testng.annotations.Test;

clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ public List<Proxy> select(URI uri) {
8181

8282
private static final String USER_AGENT = ClickHouseClientOption.buildUserAgent(null, "HttpClient");
8383

84-
private final AtomicBoolean busy;
8584
private final HttpClient httpClient;
8685
private final HttpRequest pingRequest;
8786

@@ -197,15 +196,13 @@ protected HttpClientConnectionImpl(ClickHouseNode server, ClickHouseRequest<?> r
197196
builder.sslContext(ClickHouseSslContextProvider.getProvider().getSslContext(SSLContext.class, config)
198197
.orElse(null));
199198
}
200-
201-
busy = new AtomicBoolean(false);
202199
httpClient = builder.build();
203200
pingRequest = newRequest(getBaseUrl() + "ping");
204201
}
205202

206203
@Override
207204
protected boolean isReusable() {
208-
return busy.get();
205+
return true; // httpClient is stateless and can be reused
209206
}
210207

211208
private CompletableFuture<HttpResponse<InputStream>> postRequest(HttpRequest request) {
@@ -218,7 +215,7 @@ private CompletableFuture<HttpResponse<InputStream>> postRequest(HttpRequest req
218215
private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.Builder reqBuilder, byte[] boundary,
219216
String sql, ClickHouseInputStream data, List<ClickHouseExternalTable> tables, ClickHouseOutputStream output,
220217
Runnable postAction) throws IOException {
221-
try {
218+
222219
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
223220
.createPipedOutputStream(config);
224221
reqBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(stream::getInputStream));
@@ -243,14 +240,11 @@ private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.B
243240
}
244241

245242
return buildResponse(config, r, output, postAction);
246-
} finally {
247-
busy.set(false);
248-
}
249243
}
250244

251245
private ClickHouseHttpResponse postString(ClickHouseConfig config, HttpRequest.Builder reqBuilder, String sql,
252246
ClickHouseOutputStream output, Runnable postAction) throws IOException {
253-
try {
247+
254248
reqBuilder.POST(HttpRequest.BodyPublishers.ofString(sql));
255249
HttpResponse<InputStream> r;
256250
try {
@@ -267,9 +261,6 @@ private ClickHouseHttpResponse postString(ClickHouseConfig config, HttpRequest.B
267261
}
268262
}
269263
return buildResponse(config, r, output, postAction);
270-
} finally {
271-
busy.set(false);
272-
}
273264
}
274265

275266
@Override
@@ -281,9 +272,7 @@ protected final String getDefaultUserAgent() {
281272
protected ClickHouseHttpResponse post(ClickHouseConfig config, String sql, ClickHouseInputStream data,
282273
List<ClickHouseExternalTable> tables, ClickHouseOutputStream output, String url,
283274
Map<String, String> headers, Runnable postAction) throws IOException {
284-
if (!busy.compareAndSet(false, true)) {
285-
throw new ConnectException("Connection is busy");
286-
}
275+
287276
ClickHouseConfig c = config == null ? this.config : config;
288277
HttpRequest.Builder reqBuilder = HttpRequest.newBuilder()
289278
.uri(URI.create(ClickHouseChecker.isNullOrEmpty(url) ? this.url : url))

clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,15 @@
11
package com.clickhouse.jdbc.internal;
22

3-
import java.io.IOException;
4-
import java.io.InputStream;
5-
import java.io.OutputStream;
6-
import java.io.Serializable;
7-
import java.nio.file.Path;
8-
import java.sql.ResultSet;
9-
import java.sql.SQLException;
10-
import java.sql.SQLFeatureNotSupportedException;
11-
import java.sql.SQLWarning;
12-
import java.sql.Statement;
13-
import java.util.*;
14-
import java.util.Map.Entry;
15-
import java.util.concurrent.TimeUnit;
16-
import java.util.concurrent.TimeoutException;
17-
183
import com.clickhouse.client.ClickHouseClient;
194
import com.clickhouse.client.ClickHouseConfig;
205
import com.clickhouse.client.ClickHouseException;
216
import com.clickhouse.client.ClickHouseNode;
227
import com.clickhouse.client.ClickHouseRequest;
8+
import com.clickhouse.client.ClickHouseRequest.Mutation;
239
import com.clickhouse.client.ClickHouseResponse;
2410
import com.clickhouse.client.ClickHouseResponseSummary;
2511
import com.clickhouse.client.ClickHouseSimpleResponse;
2612
import com.clickhouse.client.ClickHouseTransaction;
27-
import com.clickhouse.client.ClickHouseRequest.Mutation;
2813
import com.clickhouse.client.config.ClickHouseClientOption;
2914
import com.clickhouse.client.config.ClickHouseDefaults;
3015
import com.clickhouse.config.ClickHouseConfigChangeListener;
@@ -42,16 +27,37 @@
4227
import com.clickhouse.data.ClickHouseOutputStream;
4328
import com.clickhouse.data.ClickHouseUtils;
4429
import com.clickhouse.data.ClickHouseValues;
45-
import com.clickhouse.logging.Logger;
46-
import com.clickhouse.logging.LoggerFactory;
4730
import com.clickhouse.jdbc.ClickHouseConnection;
4831
import com.clickhouse.jdbc.ClickHouseResultSet;
4932
import com.clickhouse.jdbc.ClickHouseStatement;
5033
import com.clickhouse.jdbc.JdbcTypeMapping;
51-
import com.clickhouse.jdbc.SqlExceptionUtils;
5234
import com.clickhouse.jdbc.JdbcWrapper;
35+
import com.clickhouse.jdbc.SqlExceptionUtils;
5336
import com.clickhouse.jdbc.parser.ClickHouseSqlStatement;
5437
import com.clickhouse.jdbc.parser.StatementType;
38+
import com.clickhouse.logging.Logger;
39+
import com.clickhouse.logging.LoggerFactory;
40+
41+
import java.io.IOException;
42+
import java.io.InputStream;
43+
import java.io.OutputStream;
44+
import java.io.Serializable;
45+
import java.nio.file.Path;
46+
import java.sql.ResultSet;
47+
import java.sql.SQLException;
48+
import java.sql.SQLFeatureNotSupportedException;
49+
import java.sql.SQLWarning;
50+
import java.sql.Statement;
51+
import java.util.ArrayList;
52+
import java.util.Arrays;
53+
import java.util.HashSet;
54+
import java.util.LinkedList;
55+
import java.util.List;
56+
import java.util.Map;
57+
import java.util.Map.Entry;
58+
import java.util.concurrent.TimeUnit;
59+
import java.util.concurrent.TimeoutException;
60+
import java.util.function.Function;
5561

5662
public class ClickHouseStatementImpl extends JdbcWrapper
5763
implements ClickHouseConfigChangeListener<ClickHouseRequest<?>>, ClickHouseStatement {
@@ -130,11 +136,9 @@ private ClickHouseResponse getLastResponse(Map<ClickHouseOption, Serializable> o
130136
request.set("_set_roles_stmt", requestRoles);
131137
}
132138

133-
request.query(stmt.getSQL(), queryId = connection.newQueryId());
134139
// TODO skip useless queries to reduce network calls and server load
135140
try {
136-
response = autoTx ? request.executeWithinTransaction(connection.isImplicitTransactionSupported())
137-
: request.transaction(connection.getTransaction()).executeAndWait();
141+
response = sendRequest(stmt.getSQL(), r -> r);
138142
} catch (Exception e) {
139143
throw SqlExceptionUtils.handle(e);
140144
} finally {
@@ -272,7 +276,6 @@ protected ClickHouseResponse processSqlStatement(ClickHouseSqlStatement stmt) th
272276

273277
protected ClickHouseResponse executeStatement(String stmt, Map<ClickHouseOption, Serializable> options,
274278
List<ClickHouseExternalTable> tables, Map<String, String> settings) throws SQLException {
275-
boolean autoTx = connection.getAutoCommit() && connection.isTransactionSupported();
276279
try {
277280
if (options != null) {
278281
request.options(options);
@@ -310,9 +313,8 @@ protected ClickHouseResponse executeStatement(String stmt, Map<ClickHouseOption,
310313
}
311314
request.external(list);
312315
}
313-
request.query(stmt, queryId = connection.newQueryId());
314-
return autoTx ? request.executeWithinTransaction(connection.isImplicitTransactionSupported())
315-
: request.transaction(connection.getTransaction()).executeAndWait();
316+
317+
return sendRequest(stmt, r -> r);
316318
} catch (Exception e) {
317319
throw SqlExceptionUtils.handle(e);
318320
}
@@ -328,18 +330,61 @@ protected ClickHouseResponse executeStatement(ClickHouseSqlStatement stmt,
328330
return executeStatement(stmt.getSQL(), options, tables, settings);
329331
}
330332

331-
protected int executeInsert(String sql, InputStream input) throws SQLException {
333+
private ClickHouseResponse sendRequest(String sql, Function<ClickHouseRequest<?>, ClickHouseRequest<?>> preSeal) throws SQLException {
332334
boolean autoTx = connection.getAutoCommit() && connection.isTransactionSupported();
333-
Mutation req = request.write().query(sql, queryId = connection.newQueryId()).data(input);
334-
try (ClickHouseResponse resp = autoTx
335-
? req.executeWithinTransaction(connection.isImplicitTransactionSupported())
336-
: req.transaction(connection.getTransaction()).executeAndWait();
337-
ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp)) {
338-
// ignore
335+
336+
ClickHouseRequest<?> req;
337+
ClickHouseTransaction tx = null;
338+
synchronized (request) {
339+
try {
340+
if (autoTx) {
341+
if (connection.isImplicitTransactionSupported()) {
342+
request.set(ClickHouseTransaction.SETTING_IMPLICIT_TRANSACTION, 1).transaction(null);
343+
} else {
344+
tx = request.getManager().createImplicitTransaction(request);
345+
request.transaction(connection.getTransaction());
346+
}
347+
} else {
348+
try {
349+
request.transaction(connection.getTransaction());
350+
} catch (ClickHouseException e) {
351+
throw SqlExceptionUtils.handle(e);
352+
}
353+
}
354+
355+
req = preSeal.apply(request).query(sql, queryId = connection.newQueryId()).seal();
356+
} catch (Exception e) {
357+
throw SqlExceptionUtils.handle(e);
358+
}
359+
}
360+
361+
try {
362+
return req.executeAndWait();
339363
} catch (Exception e) {
364+
if (tx != null) {
365+
try {
366+
tx.rollback();
367+
} catch (Exception ex) {
368+
log.warn("Failed to rollback transaction", ex);
369+
}
370+
}
340371
throw SqlExceptionUtils.handle(e);
372+
} finally {
373+
try {
374+
request.transaction(null);
375+
} catch (Exception e) {
376+
throw SqlExceptionUtils.handle(ClickHouseException.of(e, req.getServer()));
377+
}
341378
}
379+
}
342380

381+
protected int executeInsert(String sql, InputStream input) throws SQLException {
382+
try (ClickHouseResponse response = sendRequest(sql, r -> r.write().data(input));
383+
ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), response)) {
384+
// no more actions needed
385+
} catch (Exception e) {
386+
throw SqlExceptionUtils.handle(e);
387+
}
343388
return (int) currentUpdateCount;
344389
}
345390

clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.File;
44
import java.io.IOException;
5+
import java.lang.ref.WeakReference;
56
import java.math.BigDecimal;
67
import java.sql.Array;
78
import java.sql.BatchUpdateException;
@@ -20,6 +21,7 @@
2021
import java.time.OffsetDateTime;
2122
import java.time.ZoneId;
2223
import java.time.ZonedDateTime;
24+
import java.util.ArrayList;
2325
import java.util.Arrays;
2426
import java.util.Calendar;
2527
import java.util.Collections;
@@ -30,9 +32,14 @@
3032
import java.util.TimeZone;
3133
import java.util.UUID;
3234
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.Executors;
36+
import java.util.concurrent.ScheduledExecutorService;
3337
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.concurrent.atomic.AtomicReference;
3440

3541
import com.clickhouse.client.ClickHouseClient;
42+
import com.clickhouse.client.ClickHouseException;
3643
import com.clickhouse.client.ClickHouseParameterizedQuery;
3744
import com.clickhouse.client.ClickHouseProtocol;
3845
import com.clickhouse.client.ClickHouseRequest;
@@ -48,6 +55,7 @@
4855
import com.clickhouse.data.value.UnsignedShort;
4956

5057
import org.roaringbitmap.longlong.Roaring64NavigableMap;
58+
import org.testcontainers.shaded.org.checkerframework.checker.units.qual.A;
5159
import org.testng.Assert;
5260
import org.testng.SkipException;
5361
import org.testng.annotations.DataProvider;
@@ -1392,4 +1400,37 @@ public void testTimeZone(boolean useBinary) throws SQLException {
13921400
}
13931401
}
13941402
}
1403+
1404+
@Test(groups = "integration")
1405+
public void testMultiThreadedExecution() throws Exception {
1406+
Properties props = new Properties();
1407+
try (ClickHouseConnection conn = newConnection(props);
1408+
ClickHouseStatement stmt = conn.createStatement()) {
1409+
1410+
1411+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
1412+
1413+
final AtomicReference<Exception> failedException = new AtomicReference<>(null);
1414+
for (int i = 0; i < 3; i++) {
1415+
executor.scheduleWithFixedDelay(() -> {
1416+
try {
1417+
stmt.execute("select 1");
1418+
} catch (Exception e) {
1419+
failedException.set(e);
1420+
}
1421+
}, 100, 100, TimeUnit.MILLISECONDS);
1422+
}
1423+
1424+
try {
1425+
Thread.sleep(1000);
1426+
} catch (Exception e) {
1427+
Assert.fail("Test interrupted", e);
1428+
}
1429+
1430+
executor.shutdown();
1431+
executor.awaitTermination(10, TimeUnit.SECONDS);
1432+
1433+
Assert.assertNull(failedException.get(), "Failed because of exception: " + failedException.get());
1434+
}
1435+
}
13951436
}

0 commit comments

Comments
 (0)