Skip to content

Commit 22dd8d6

Browse files
committed
Merge branch 'main' into remove_old_client_usage
2 parents 09175c8 + 7a15ecb commit 22dd8d6

File tree

4 files changed

+94
-30
lines changed

4 files changed

+94
-30
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.lang.reflect.InvocationTargetException;
5656
import java.lang.reflect.Method;
5757
import java.net.ConnectException;
58+
import java.net.SocketTimeoutException;
5859
import java.net.URL;
5960
import java.nio.charset.StandardCharsets;
6061
import java.time.Duration;
@@ -1302,7 +1303,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
13021303
// Selecting some node
13031304
ClickHouseNode selectedNode = getNextAliveNode();
13041305

1305-
ClientException lastException = null;
1306+
RuntimeException lastException = null;
13061307
for (int i = 0; i <= maxRetries; i++) {
13071308
// Execute request
13081309
try (ClassicHttpResponse httpResponse =
@@ -1342,19 +1343,17 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
13421343
metrics.operationComplete();
13431344
metrics.setQueryId(queryId);
13441345
return new InsertResponse(metrics);
1345-
} catch (NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException | ConnectException e) {
1346-
lastException = httpClientHelper.wrapException("Insert request initiation failed", e);
1346+
} catch (Exception e) {
1347+
lastException = httpClientHelper.wrapException("Query request failed (Attempt " + (i + 1) + "/" + (maxRetries + 1) + ")", e);
13471348
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
1348-
LOG.warn("Retrying", e);
1349+
LOG.warn("Retrying.", e);
13491350
selectedNode = getNextAliveNode();
13501351
} else {
13511352
throw lastException;
13521353
}
1353-
} catch (IOException e) {
1354-
throw new ClientException("Insert request failed", e);
13551354
}
13561355
}
1357-
throw new ClientException("Insert request failed after retries", lastException);
1356+
throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1), lastException);
13581357
};
13591358

13601359
return runAsyncOperation(supplier, settings.getAllSettings());
@@ -1462,7 +1461,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
14621461
// Selecting some node
14631462
ClickHouseNode selectedNode = getNextAliveNode();
14641463

1465-
ClientException lastException = null;
1464+
RuntimeException lastException = null;
14661465
for (int i = 0; i <= maxRetries; i++) {
14671466
// Execute request
14681467
try (ClassicHttpResponse httpResponse =
@@ -1487,16 +1486,14 @@ public CompletableFuture<InsertResponse> insert(String tableName,
14871486
metrics.operationComplete();
14881487
metrics.setQueryId(queryId);
14891488
return new InsertResponse(metrics);
1490-
} catch (NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException | ConnectException e) {
1491-
lastException = httpClientHelper.wrapException("Insert request initiation failed", e);
1489+
} catch (Exception e) {
1490+
lastException = httpClientHelper.wrapException("Query request failed (Attempt " + (i + 1) + "/" + (maxRetries + 1) + ")", e);
14921491
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
1493-
LOG.warn("Retrying", e);
1492+
LOG.warn("Retrying.", e);
14941493
selectedNode = getNextAliveNode();
14951494
} else {
14961495
throw lastException;
14971496
}
1498-
} catch (IOException e) {
1499-
throw new ClientException("Insert request failed", e);
15001497
}
15011498

15021499
if (i < maxRetries) {
@@ -1507,7 +1504,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15071504
}
15081505
}
15091506
}
1510-
throw new ClientException("Insert request failed after retries", lastException);
1507+
throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1), lastException);
15111508
};
15121509

15131510
return runAsyncOperation(responseSupplier, settings.getAllSettings());
@@ -1588,7 +1585,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
15881585
responseSupplier = () -> {
15891586
// Selecting some node
15901587
ClickHouseNode selectedNode = getNextAliveNode();
1591-
ClientException lastException = null;
1588+
RuntimeException lastException = null;
15921589
for (int i = 0; i <= maxRetries; i++) {
15931590
try {
15941591
ClassicHttpResponse httpResponse =
@@ -1615,22 +1612,18 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
16151612

16161613
return new QueryResponse(httpResponse, finalSettings.getFormat(), finalSettings, metrics);
16171614

1618-
} catch (NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException | ConnectException e) {
1619-
lastException = httpClientHelper.wrapException("Query request initiation failed", e);
1615+
} catch (Exception e) {
1616+
lastException = httpClientHelper.wrapException("Query request failed (Attempt " + (i + 1) + "/" + (maxRetries + 1) + ")", e);
16201617
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
16211618
LOG.warn("Retrying.", e);
16221619
selectedNode = getNextAliveNode();
16231620
} else {
16241621
throw lastException;
16251622
}
1626-
} catch (ClientException | ServerException e) {
1627-
throw e;
1628-
} catch (Exception e) {
1629-
throw new ClientException("Query request failed", e);
16301623
}
16311624
}
16321625

1633-
throw new ClientException("Query request failed after retries", lastException);
1626+
throw new ClientException("Query request failed after attempts: " + (maxRetries + 1), lastException);
16341627
};
16351628

16361629
return runAsyncOperation(responseSupplier, settings.getAllSettings());

client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ public enum ClientFaultCause {
77
NoHttpResponse,
88
ConnectTimeout,
99
ConnectionRequestTimeout,
10+
SocketTimeout,
1011
}

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.net.ConnectException;
6161
import java.net.InetSocketAddress;
6262
import java.net.NoRouteToHostException;
63+
import java.net.SocketTimeoutException;
6364
import java.net.URI;
6465
import java.net.URISyntaxException;
6566
import java.net.URL;
@@ -108,7 +109,7 @@ public HttpAPIClientHelper(Map<String, String> configuration) {
108109
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
109110
LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression);
110111

111-
defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get("client_retry_on_failures"), ClientFaultCause.class);
112+
defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get(ClientConfigProperties.CLIENT_RETRY_ON_FAILURE.getKey()), ClientFaultCause.class);
112113
if (defaultRetryCauses.contains(ClientFaultCause.None)) {
113114
defaultRetryCauses.removeIf(c -> c != ClientFaultCause.None);
114115
}
@@ -398,7 +399,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
398399
} catch (ConnectException | NoRouteToHostException e) {
399400
LOG.warn("Failed to connect to '{}': {}", server.getHost(), e.getMessage());
400401
throw new ClientException("Failed to connect", e);
401-
} catch (ConnectionRequestTimeoutException | ServerException | NoHttpResponseException | ClientException e) {
402+
} catch (ConnectionRequestTimeoutException | ServerException | NoHttpResponseException | ClientException | SocketTimeoutException e) {
402403
throw e;
403404
} catch (Exception e) {
404405
throw new ClientException("Failed to execute request", e);
@@ -576,32 +577,46 @@ public static <T> T getHeaderVal(Header header, T defaultValue, Function<String,
576577
return converter.apply(header.getValue());
577578
}
578579

579-
public boolean shouldRetry(Exception ex, Map<String, Object> requestSettings) {
580+
public boolean shouldRetry(Throwable ex, Map<String, Object> requestSettings) {
580581
Set<ClientFaultCause> retryCauses = (Set<ClientFaultCause>)
581-
requestSettings.getOrDefault("retry_on_failures", defaultRetryCauses);
582+
requestSettings.getOrDefault(ClientConfigProperties.CLIENT_RETRY_ON_FAILURE.getKey(), defaultRetryCauses);
582583

583584
if (retryCauses.contains(ClientFaultCause.None)) {
584585
return false;
585586
}
586587

587-
if (ex instanceof NoHttpResponseException ) {
588+
if (ex instanceof NoHttpResponseException
589+
|| ex.getCause() instanceof NoHttpResponseException) {
588590
return retryCauses.contains(ClientFaultCause.NoHttpResponse);
589591
}
590592

591-
if (ex instanceof ConnectException || ex instanceof ConnectTimeoutException) {
593+
if (ex instanceof ConnectException
594+
|| ex instanceof ConnectTimeoutException
595+
|| ex.getCause() instanceof ConnectException
596+
|| ex.getCause() instanceof ConnectTimeoutException) {
592597
return retryCauses.contains(ClientFaultCause.ConnectTimeout);
593598
}
594599

595-
if (ex instanceof ConnectionRequestTimeoutException) {
600+
if (ex instanceof ConnectionRequestTimeoutException
601+
|| ex.getCause() instanceof ConnectionRequestTimeoutException) {
596602
return retryCauses.contains(ClientFaultCause.ConnectionRequestTimeout);
597603
}
598604

605+
if (ex instanceof SocketTimeoutException
606+
|| ex.getCause() instanceof SocketTimeoutException) {
607+
return retryCauses.contains(ClientFaultCause.SocketTimeout);
608+
}
609+
599610
return false;
600611
}
601612

602613
// This method wraps some client specific exceptions into specific ClientException or just ClientException
603614
// ClientException will be also wrapped
604-
public ClientException wrapException(String message, Exception cause) {
615+
public RuntimeException wrapException(String message, Exception cause) {
616+
if (cause instanceof ClientException || cause instanceof ServerException) {
617+
return (RuntimeException) cause;
618+
}
619+
605620
if (cause instanceof ConnectionRequestTimeoutException ||
606621
cause instanceof NoHttpResponseException ||
607622
cause instanceof ConnectTimeoutException ||

client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,8 @@ public void testServerSettings() {
607607
} catch (Exception e) {
608608
e.printStackTrace();
609609
Assert.fail("Unexpected exception", e);
610+
} finally {
611+
mockServer.stop();
610612
}
611613
}
612614
}
@@ -1062,6 +1064,59 @@ public void testWithDefaultTimeouts() {
10621064
}
10631065
}
10641066

1067+
1068+
@Test(groups = { "integration" })
1069+
public void testTimeoutsWithRetry() {
1070+
if (isCloud()) {
1071+
return; // mocked server
1072+
}
1073+
1074+
WireMockServer faultyServer = new WireMockServer( WireMockConfiguration
1075+
.options().port(9090).notifier(new ConsoleNotifier(false)));
1076+
faultyServer.start();
1077+
1078+
// First request gets no response
1079+
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
1080+
.inScenario("Timeout")
1081+
.withRequestBody(WireMock.containing("SELECT 1"))
1082+
.whenScenarioStateIs(STARTED)
1083+
.willSetStateTo("Failed")
1084+
.willReturn(WireMock.aResponse()
1085+
.withStatus(HttpStatus.SC_OK)
1086+
.withFixedDelay(5000)
1087+
.withHeader("X-ClickHouse-Summary",
1088+
"{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build());
1089+
1090+
// Second request gets a response (retry)
1091+
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
1092+
.inScenario("Timeout")
1093+
.withRequestBody(WireMock.containing("SELECT 1"))
1094+
.whenScenarioStateIs("Failed")
1095+
.willSetStateTo("Done")
1096+
.willReturn(WireMock.aResponse()
1097+
.withStatus(HttpStatus.SC_OK)
1098+
.withFixedDelay(1000)
1099+
.withHeader("X-ClickHouse-Summary",
1100+
"{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build());
1101+
1102+
try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false)
1103+
.setUsername("default")
1104+
.setPassword("")
1105+
.setSocketTimeout(3000)
1106+
.retryOnFailures(ClientFaultCause.SocketTimeout)
1107+
.build()) {
1108+
int startTime = (int) System.currentTimeMillis();
1109+
try {
1110+
client.query("SELECT 1").get();
1111+
} catch (Exception e) {
1112+
Assert.fail("Elapsed Time: " + (System.currentTimeMillis() - startTime), e);
1113+
}
1114+
} finally {
1115+
faultyServer.stop();
1116+
}
1117+
}
1118+
1119+
10651120
protected Client.Builder newClient() {
10661121
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
10671122
boolean isSecure = isCloud();

0 commit comments

Comments
 (0)