Skip to content

Commit 1cfc2df

Browse files
authored
Add option to consider initial contact points during reconnection (#344)
When control connection tries to reconnect usually it considers only nodes provided by load balancing policy. Usually those do not include what was initially passed to the driver but the recently seen alive nodes. In some setups the IPs can keep changing so it may be useful to have an option to try initial contact points as one of the options during reconnection. Mainly if the contact point is a hostname. This change adds the option to the `QueryOptions` to control that behaviour and adds necessary logic to `ControlConnection` class. It is disabled by default, meaning that default behaviour remains unchanged. Additionally adds org.burningwave tools dependency. This dependency has features that allow for easier host resolution mocking. Adds MappedHostResolverProvider class for testing as a single entry point for controlling hostname resolution. Adds an option to CcmBridge Builder to specify cluster name. Driver checks the cluster name when reconnecting so it will refuse to reconnect to a different CcmBridge auto-generated name.
1 parent c044358 commit 1cfc2df

File tree

8 files changed

+201
-0
lines changed

8 files changed

+201
-0
lines changed

driver-core/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,13 @@
195195
<version>1.78.1</version>
196196
</dependency>
197197

198+
<!-- added for easier DNS hostname resolution mocking -->
199+
<dependency>
200+
<groupId>org.burningwave</groupId>
201+
<artifactId>tools</artifactId>
202+
<scope>test</scope>
203+
</dependency>
204+
198205

199206
</dependencies>
200207

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.datastax.driver.core.utils.MoreFutures;
3535
import com.datastax.driver.core.utils.MoreObjects;
3636
import com.google.common.annotations.VisibleForTesting;
37+
import com.google.common.collect.Iterators;
3738
import com.google.common.util.concurrent.FutureCallback;
3839
import com.google.common.util.concurrent.ListenableFuture;
3940
import com.google.common.util.concurrent.SettableFuture;
@@ -160,6 +161,15 @@ protected Connection tryReconnect() throws ConnectionException {
160161
if (isShutdown) throw new ConnectionException(null, "Control connection was shut down");
161162

162163
try {
164+
if (cluster
165+
.configuration
166+
.getQueryOptions()
167+
.shouldAddOriginalContactsToReconnectionPlan()) {
168+
List<Host> initialContacts = cluster.metadata.getContactPoints();
169+
Collections.shuffle(initialContacts);
170+
return reconnectInternal(
171+
Iterators.concat(queryPlan(), initialContacts.iterator()), false);
172+
}
163173
return reconnectInternal(queryPlan(), false);
164174
} catch (NoHostAvailableException e) {
165175
throw new ConnectionException(null, e.getMessage());

driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public class QueryOptions {
7272

7373
private volatile boolean schemaQueriesPaged = true;
7474

75+
private volatile boolean addOriginalContactsToReconnectionPlan = false;
76+
7577
/**
7678
* Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL},
7779
* {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}.
@@ -499,6 +501,26 @@ public int getMaxPendingRefreshNodeRequests() {
499501
return maxPendingRefreshNodeRequests;
500502
}
501503

504+
/**
505+
* Whether the driver should use original contact points when reconnecting to a control node. In
506+
* practice this forces driver to manually add original contact points to the end of the query
507+
* plan. It is possible that it may introduce duplicates (but under differnet Host class
508+
* instances) in the query plan. If this is set to false it does not mean that original contact
509+
* points will be excluded.
510+
*
511+
* <p>One use case of this feature is that if the original contact point is defined by hostname
512+
* and its IP address changes then setting this to {@code true} allows trying reconnecting to the
513+
* new IP if all connection was lost.
514+
*/
515+
public QueryOptions setAddOriginalContactsToReconnectionPlan(boolean enabled) {
516+
this.addOriginalContactsToReconnectionPlan = enabled;
517+
return this;
518+
}
519+
520+
public boolean shouldAddOriginalContactsToReconnectionPlan() {
521+
return this.addOriginalContactsToReconnectionPlan;
522+
}
523+
502524
@Override
503525
public boolean equals(Object that) {
504526
if (that == null || !(that instanceof QueryOptions)) {

driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,7 @@ public static class Builder {
949949
private static final Pattern RANDOM_PORT_PATTERN = Pattern.compile(RANDOM_PORT);
950950

951951
private String ipPrefix = TestUtils.IP_PREFIX;
952+
private String providedClusterName = null;
952953
int[] nodes = {1};
953954
private int[] jmxPorts = {};
954955
private boolean start = true;
@@ -991,6 +992,15 @@ public Builder withSniProxy() {
991992
return this;
992993
}
993994

995+
/**
996+
* Builder takes care of naming and numbering clusters on its own. Use if you really need a
997+
* specific name
998+
*/
999+
public Builder withClusterName(String clusterName) {
1000+
this.providedClusterName = clusterName;
1001+
return this;
1002+
}
1003+
9941004
/** Enables SSL encryption. */
9951005
public Builder withSSL() {
9961006
cassandraConfiguration.put("client_encryption_options.enabled", "true");
@@ -1115,6 +1125,8 @@ public CCMBridge build() {
11151125
// be careful NOT to alter internal state (hashCode/equals) during build!
11161126
String clusterName = TestUtils.generateIdentifier("ccm_");
11171127

1128+
if (providedClusterName != null) clusterName = providedClusterName;
1129+
11181130
VersionNumber dseVersion;
11191131
VersionNumber cassandraVersion;
11201132
boolean versionConfigured = this.version != null;
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package com.datastax.driver.core;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.testng.Assert.assertTrue;
5+
6+
import java.net.InetSocketAddress;
7+
import java.util.List;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import org.testng.annotations.Test;
11+
12+
public class DnsEndpointTests {
13+
14+
private static final Logger logger = LoggerFactory.getLogger(DnsEndpointTests.class);
15+
16+
@Test(groups = "long")
17+
public void replace_cluster_test() {
18+
// Configure host resolution
19+
MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.1.1.1");
20+
21+
Cluster cluster = null;
22+
Session session = null;
23+
CCMBridge bridgeA = null;
24+
try {
25+
bridgeA =
26+
CCMBridge.builder()
27+
.withNodes(1)
28+
.withIpPrefix("127.1.1.")
29+
.withBinaryPort(9042)
30+
.withClusterName("same_name")
31+
.build();
32+
bridgeA.start();
33+
34+
cluster =
35+
Cluster.builder()
36+
.addContactPointsWithPorts(
37+
InetSocketAddress.createUnresolved("control.reconnect.test", 9042))
38+
.withPort(9042)
39+
.withoutAdvancedShardAwareness()
40+
.withQueryOptions(new QueryOptions().setAddOriginalContactsToReconnectionPlan(true))
41+
.build();
42+
session = cluster.connect();
43+
44+
ResultSet rs = session.execute("select * from system.local");
45+
Row row = rs.one();
46+
String address = row.getInet("broadcast_address").toString();
47+
logger.info("Queried node has broadcast_address: {}}", address);
48+
System.out.flush();
49+
} finally {
50+
assert bridgeA != null;
51+
bridgeA.close();
52+
}
53+
54+
CCMBridge bridgeB = null;
55+
// Overwrite host resolution
56+
MappedHostResolverProvider.removeResolverEntry("control.reconnect.test");
57+
MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.2.2.1");
58+
try {
59+
bridgeB =
60+
CCMBridge.builder()
61+
.withNodes(1)
62+
.withIpPrefix("127.2.2.")
63+
.withBinaryPort(9042)
64+
.withClusterName("same_name")
65+
.build();
66+
bridgeB.start();
67+
Thread.sleep(1000 * 92);
68+
ResultSet rs = session.execute("select * from system.local");
69+
Row row = rs.one();
70+
String address = row.getInet("broadcast_address").toString();
71+
logger.info("Queried node has broadcast_address: {}}", address);
72+
} catch (InterruptedException e) {
73+
throw new RuntimeException(e);
74+
} finally {
75+
assert bridgeB != null;
76+
bridgeB.close();
77+
}
78+
}
79+
80+
@Test(groups = "long")
81+
public void should_connect_with_mocked_hostname() {
82+
MappedHostResolverProvider.addResolverEntry("mocked.hostname.test", "127.0.1.1");
83+
try (CCMBridge ccmBridge =
84+
CCMBridge.builder().withNodes(1).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
85+
Cluster cluster =
86+
Cluster.builder()
87+
.addContactPointsWithPorts(
88+
InetSocketAddress.createUnresolved("mocked.hostname.test", 9042))
89+
.withPort(9042)
90+
.withoutAdvancedShardAwareness()
91+
.build()) {
92+
ccmBridge.start();
93+
Session session = cluster.connect();
94+
ResultSet rs = session.execute("SELECT * FROM system.local");
95+
List<Row> rows = rs.all();
96+
assertThat(rows).hasSize(1);
97+
Row row = rows.get(0);
98+
assertThat(row.getInet("broadcast_address").toString()).contains("127.0.1.1");
99+
assertTrue(
100+
session.getCluster().getMetadata().getAllHosts().stream()
101+
.map(Host::toString)
102+
.anyMatch(hostString -> hostString.contains("mocked.hostname.test")));
103+
}
104+
}
105+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.datastax.driver.core;
2+
3+
import org.burningwave.tools.net.DefaultHostResolver;
4+
import org.burningwave.tools.net.HostResolutionRequestInterceptor;
5+
import org.burningwave.tools.net.MappedHostResolver;
6+
7+
public class MappedHostResolverProvider {
8+
private static volatile MappedHostResolver resolver = null;
9+
10+
private MappedHostResolverProvider() {}
11+
12+
public static synchronized boolean setResolver(MappedHostResolver newResolver) {
13+
if (resolver != null) {
14+
return false;
15+
}
16+
resolver = newResolver;
17+
HostResolutionRequestInterceptor.INSTANCE.install(resolver, DefaultHostResolver.INSTANCE);
18+
return true;
19+
}
20+
21+
public static synchronized void addResolverEntry(String hostname, String address) {
22+
if (resolver == null) {
23+
setResolver(new MappedHostResolver());
24+
}
25+
resolver.putHost(hostname, address);
26+
}
27+
28+
public static synchronized void removeResolverEntry(String hostname) {
29+
if (resolver == null) {
30+
return;
31+
}
32+
resolver.removeHost(hostname);
33+
}
34+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
managed-logger.repository=autodetect
2+
managed-logger.repository.enabled=false
3+
banner.hide=true
4+
priority-of-this-configuration=1000

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
<scassandra.version>1.1.2</scassandra.version>
8787
<logback.version>1.2.13</logback.version>
8888
<byteman.version>3.0.8</byteman.version>
89+
<burningwave.tools.version>0.26.2</burningwave.tools.version>
8990
<ipprefix>127.0.1.</ipprefix>
9091
<!-- defaults below are overridden by profiles and/or submodules -->
9192
<test.groups>unit</test.groups>
@@ -398,6 +399,12 @@
398399
<version>${groovy.version}</version>
399400
</dependency>
400401

402+
<dependency>
403+
<groupId>org.burningwave</groupId>
404+
<artifactId>tools</artifactId>
405+
<version>${burningwave.tools.version}</version>
406+
</dependency>
407+
401408
</dependencies>
402409

403410
</dependencyManagement>

0 commit comments

Comments
 (0)