Skip to content

Commit 253d93c

Browse files
authored
Merge pull request #1609 from mbaksheev/issue-1470-fix-failover
Fix failover for Apache HTTP client. Connect timeout error is not detected correctly
2 parents 32ccda5 + d21709c commit 253d93c

File tree

5 files changed

+106
-26
lines changed

5 files changed

+106
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Bug Fixes
44
- Change RowBinaryWithDefaults settings. Output is changed from true to false
5+
- Fix failover for Apache HTTP client. Connect timeout error is not detected correctly
56

67
## 0.6.0-patch3
78

clickhouse-cli-client/src/test/java/com/clickhouse/client/cli/ClickHouseCommandLineClientTest.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,21 @@
33
import com.clickhouse.client.ClickHouseClient;
44
import com.clickhouse.client.ClickHouseClientBuilder;
55
import com.clickhouse.client.ClickHouseException;
6+
import com.clickhouse.client.ClickHouseLoadBalancingPolicy;
67
import com.clickhouse.client.ClickHouseNode;
78
import com.clickhouse.client.ClickHouseProtocol;
89
import com.clickhouse.client.ClickHouseServerForTest;
910
import com.clickhouse.client.ClientIntegrationTest;
1011
import com.clickhouse.client.cli.config.ClickHouseCommandLineOption;
1112
import com.clickhouse.data.ClickHouseCompression;
12-
13-
import java.io.IOException;
14-
import java.util.concurrent.ExecutionException;
15-
1613
import org.testcontainers.containers.GenericContainer;
17-
import org.testng.Assert;
1814
import org.testng.SkipException;
1915
import org.testng.annotations.BeforeClass;
2016
import org.testng.annotations.DataProvider;
2117
import org.testng.annotations.Test;
18+
19+
import java.io.IOException;
20+
import java.util.concurrent.ExecutionException;
2221
// deprecate from version 0.6.0
2322
@Deprecated
2423
public class ClickHouseCommandLineClientTest extends ClientIntegrationTest {
@@ -190,4 +189,14 @@ public void testTransactionTimeout() throws ClickHouseException {
190189
public void testRowBinaryWithDefaults() throws ClickHouseException, IOException, ExecutionException, InterruptedException {
191190
throw new SkipException("Skip due to supported");
192191
}
192+
193+
@Test(dataProvider = "loadBalancingPolicies", groups = {"unit"})
194+
public void testLoadBalancingPolicyFailover(ClickHouseLoadBalancingPolicy loadBalancingPolicy) {
195+
throw new SkipException("Skip due to failover is not supported");
196+
}
197+
198+
@Test(groups = {"integration"})
199+
public void testFailover() {
200+
throw new SkipException("Skip due to failover is not supported");
201+
}
193202
}

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseException.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.net.ConnectException;
55
import java.net.SocketTimeoutException;
66
import java.net.UnknownHostException;
7-
import java.util.Locale;
87
import java.util.concurrent.TimeoutException;
98
import java.util.regex.Matcher;
109
import java.util.regex.Pattern;
@@ -120,10 +119,7 @@ public static ClickHouseException forCancellation(Exception e, ClickHouseNode se
120119
public static boolean isConnectTimedOut(Throwable t) {
121120
if (t instanceof SocketTimeoutException || t instanceof TimeoutException) {
122121
String msg = t.getMessage();
123-
if (msg != null && msg.length() >= MSG_CONNECT_TIMED_OUT.length()) {
124-
msg = msg.substring(0, MSG_CONNECT_TIMED_OUT.length()).toLowerCase(Locale.ROOT);
125-
}
126-
return MSG_CONNECT_TIMED_OUT.equals(msg);
122+
return msg != null && msg.toLowerCase().contains(MSG_CONNECT_TIMED_OUT);
127123
}
128124

129125
return false;

clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,11 @@
4141
import com.clickhouse.data.value.UnsignedInteger;
4242
import com.clickhouse.data.value.UnsignedLong;
4343
import com.clickhouse.data.value.UnsignedShort;
44-
4544
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
4645
import org.testng.Assert;
4746
import org.testng.SkipException;
4847
import org.testng.annotations.DataProvider;
4948
import org.testng.annotations.Test;
50-
import org.testng.asserts.Assertion;
5149

5250
import java.io.BufferedReader;
5351
import java.io.ByteArrayInputStream;
@@ -75,9 +73,10 @@
7573
import java.util.Collections;
7674
import java.util.List;
7775
import java.util.Map;
76+
import java.util.Map.Entry;
77+
import java.util.Properties;
7878
import java.util.TimeZone;
7979
import java.util.UUID;
80-
import java.util.Map.Entry;
8180
import java.util.concurrent.CompletableFuture;
8281
import java.util.concurrent.ExecutionException;
8382
import java.util.concurrent.atomic.AtomicInteger;
@@ -150,13 +149,23 @@ protected ClickHouseResponseSummary execute(ClickHouseRequest<?> request, String
150149

151150
protected ClickHouseRequest<?> newRequest(ClickHouseClient client, ClickHouseNode server) {
152151
ClickHouseRequest<?> request = client.read(server);
152+
setClientOptions(request);
153+
return request;
154+
}
155+
156+
protected ClickHouseRequest<?> newRequest(ClickHouseClient client, ClickHouseNodes servers) {
157+
ClickHouseRequest<?> request = client.read(servers);
158+
setClientOptions(request);
159+
return request;
160+
}
161+
162+
private void setClientOptions(ClickHouseRequest<?> request) {
153163
Map<ClickHouseOption, Serializable> options = getClientOptions();
154164
if (options != null) {
155165
for (Entry<ClickHouseOption, Serializable> e : options.entrySet()) {
156166
request.option(e.getKey(), e.getValue());
157167
}
158168
}
159-
return request;
160169
}
161170

162171
protected abstract ClickHouseProtocol getProtocol();
@@ -398,6 +407,16 @@ protected Object[][] getSimpleTypes() {
398407
"00000000-0000-0000-ffff-ffffffffffff", "00000000-0000-0000-0000-000000000001" } };
399408
}
400409

410+
@DataProvider(name = "loadBalancingPolicies")
411+
protected Object[][] getLoadBalancingPolicies() {
412+
return new Object[][]{
413+
{ClickHouseLoadBalancingPolicy.of(null)},
414+
{ClickHouseLoadBalancingPolicy.of(ClickHouseLoadBalancingPolicy.FIRST_ALIVE)},
415+
{ClickHouseLoadBalancingPolicy.of(ClickHouseLoadBalancingPolicy.ROUND_ROBIN)},
416+
{ClickHouseLoadBalancingPolicy.of(ClickHouseLoadBalancingPolicy.RANDOM)},
417+
};
418+
}
419+
401420
@Test(groups = { "unit" })
402421
public void testInitialization() {
403422
Assert.assertNotNull(getProtocol(), "The client should support non-null protocol");
@@ -886,7 +905,7 @@ public void testQuery() {
886905
* while (response.hasError()) { int index = 0; for (ClickHouseColumn c :
887906
* columns) { // RawValue v = response.getRawValue(index++); // String v =
888907
* response.getValue(index++, String.class) }
889-
*
908+
*
890909
* } byte[] bytes = in.readAllBytes(); String str = new String(bytes);
891910
*/
892911
} catch (Exception e) {
@@ -2555,4 +2574,50 @@ public void testRowBinaryWithDefaults() throws ClickHouseException, IOException,
25552574
}
25562575
}
25572576
}
2577+
2578+
@Test(dataProvider = "loadBalancingPolicies", groups = {"unit"})
2579+
public void testLoadBalancingPolicyFailover(ClickHouseLoadBalancingPolicy loadBalancingPolicy) {
2580+
String firstEndpoint = "111.1.1.1";
2581+
String secondEndpoint = "222.2.2.2";
2582+
2583+
Properties props = new Properties();
2584+
props.setProperty("failover", "1");
2585+
2586+
// nodes where the first node is failed
2587+
ClickHouseNodes nodes = ClickHouseNodes.of(
2588+
getProtocol() + "://" + firstEndpoint + "," + secondEndpoint,
2589+
props
2590+
);
2591+
2592+
try (ClickHouseClient client = getClient();
2593+
ClickHouseResponse response = newRequest(client, nodes.nodes.getFirst())
2594+
.query("select 1")
2595+
.executeAndWait()) {
2596+
Assert.fail("Exception expected for query on failed node");
2597+
} catch (Exception failoverException) {
2598+
ClickHouseNode failoverNode = loadBalancingPolicy.suggest(nodes, nodes.nodes.getFirst(), failoverException);
2599+
Assert.assertEquals(failoverNode.getHost(), secondEndpoint, "The next node is expected to be suggested by the load balancing policy");
2600+
}
2601+
}
2602+
2603+
@Test(groups = {"integration"})
2604+
public void testFailover() throws ClickHouseException {
2605+
ClickHouseNode availableNode = getServer();
2606+
Properties props = new Properties();
2607+
props.setProperty("failover", "1");
2608+
2609+
// nodes with the first node is unavailable
2610+
ClickHouseNodes nodes = ClickHouseNodes.of(
2611+
getProtocol() + "://111.1.1.1," + availableNode.getBaseUri(),
2612+
props
2613+
);
2614+
2615+
// should fail over to next node and successfully perform request if the first node is failed
2616+
try (ClickHouseClient client = getClient();
2617+
ClickHouseResponse response = newRequest(client, nodes)
2618+
.query("select 1")
2619+
.executeAndWait()) {
2620+
Assert.assertEquals(response.firstRecord().getValue(0).asInteger(), 1);
2621+
}
2622+
}
25582623
}

clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,8 @@
11
package com.clickhouse.client.grpc;
22

3-
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
4-
import org.testng.Assert;
5-
import org.testng.SkipException;
6-
import org.testng.annotations.DataProvider;
7-
import org.testng.annotations.Test;
8-
9-
import java.io.ByteArrayInputStream;
10-
import java.io.IOException;
11-
import java.util.concurrent.ExecutionException;
12-
133
import com.clickhouse.client.ClickHouseClient;
144
import com.clickhouse.client.ClickHouseException;
5+
import com.clickhouse.client.ClickHouseLoadBalancingPolicy;
156
import com.clickhouse.client.ClickHouseNode;
167
import com.clickhouse.client.ClickHouseProtocol;
178
import com.clickhouse.client.ClickHouseResponse;
@@ -23,6 +14,15 @@
2314
import com.clickhouse.data.ClickHouseInputStream;
2415
import com.clickhouse.data.ClickHouseRecord;
2516
import com.clickhouse.data.ClickHouseVersion;
17+
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
18+
import org.testng.Assert;
19+
import org.testng.SkipException;
20+
import org.testng.annotations.DataProvider;
21+
import org.testng.annotations.Test;
22+
23+
import java.io.ByteArrayInputStream;
24+
import java.io.IOException;
25+
import java.util.concurrent.ExecutionException;
2626
@Deprecated
2727
public class ClickHouseGrpcClientTest extends ClientIntegrationTest {
2828
@DataProvider(name = "requestCompressionMatrix")
@@ -144,7 +144,7 @@ public void testErrorDuringInsert() throws ClickHouseException {
144144
public void testErrorDuringQuery() throws ClickHouseException {
145145
throw new SkipException("Skip due to grpc is too slow");
146146
}
147-
147+
148148
@Test(groups = { "integration" })
149149
@Override
150150
public void testSessionLock() {
@@ -205,4 +205,13 @@ public void testRowBinaryWithDefaults() throws ClickHouseException, IOException,
205205
throw new SkipException("Skip due to supported");
206206
}
207207

208+
@Test(dataProvider = "loadBalancingPolicies", groups = {"unit"})
209+
public void testLoadBalancingPolicyFailover(ClickHouseLoadBalancingPolicy loadBalancingPolicy) {
210+
throw new SkipException("Skip due to failover is not supported");
211+
}
212+
213+
@Test(groups = {"integration"})
214+
public void testFailover() {
215+
throw new SkipException("Skip due to failover is not supported");
216+
}
208217
}

0 commit comments

Comments
 (0)