Skip to content

Commit 9dfcc65

Browse files
committed
Connect to all DNS records of unresolved endpoint
netty bootstrap.connect uses only first address of unresolved InetSocketAddress. That causes 4.x to not even try to connect to other when it first one fails. This PR makes driver to resolve unresolved endpoint, instead of leaving to to netty. Making it possible to connect to any ip address from DNS contact endpoint.
1 parent d69bdd4 commit 9dfcc65

File tree

11 files changed

+269
-30
lines changed

11 files changed

+269
-30
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@
2121
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2222
import com.datastax.oss.driver.api.core.session.Session;
2323
import com.datastax.oss.driver.shaded.guava.common.base.Charsets;
24+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
2425
import edu.umd.cs.findbugs.annotations.NonNull;
2526
import edu.umd.cs.findbugs.annotations.Nullable;
2627
import java.net.InetSocketAddress;
2728
import java.net.SocketAddress;
29+
import java.net.UnknownHostException;
2830
import java.nio.ByteBuffer;
2931
import java.nio.CharBuffer;
3032
import java.nio.charset.StandardCharsets;
3133
import java.util.Arrays;
34+
import java.util.Collections;
35+
import java.util.List;
3236
import java.util.Objects;
3337
import net.jcip.annotations.ThreadSafe;
3438
import org.slf4j.Logger;
@@ -171,6 +175,12 @@ public SocketAddress resolve() {
171175
return new InetSocketAddress("127.0.0.1", 9042);
172176
}
173177

178+
@NonNull
179+
@Override
180+
public List<EndPoint> resolveAll() throws UnknownHostException {
181+
return ImmutableList.of(this);
182+
}
183+
174184
@NonNull
175185
@Override
176186
public String asMetricPrefix() {

core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import edu.umd.cs.findbugs.annotations.NonNull;
2121
import java.net.InetSocketAddress;
2222
import java.net.SocketAddress;
23+
import java.net.UnknownHostException;
24+
import java.util.List;
2325

2426
/**
2527
* Encapsulates the information needed to open connections to a node.
@@ -40,6 +42,14 @@ public interface EndPoint {
4042
@NonNull
4143
SocketAddress resolve();
4244

45+
/**
46+
* Resolves this instance to a socket address.
47+
*
48+
* <p>This will be called each time the driver opens a new connection to the node. The returned
49+
* address cannot be null.
50+
*/
51+
@NonNull
52+
List<EndPoint> resolveAll() throws UnknownHostException;
4353
/**
4454
* Returns an alternate string representation for use in node-level metric names.
4555
*

core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,27 @@
1919

2020
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2121
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
22+
import com.datastax.oss.driver.internal.core.metadata.UnresolvedEndPoint;
23+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
2224
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
2325
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
26+
2427
import java.net.InetAddress;
2528
import java.net.InetSocketAddress;
2629
import java.net.UnknownHostException;
30+
import java.util.ArrayList;
2731
import java.util.Arrays;
2832
import java.util.Collections;
2933
import java.util.HashSet;
3034
import java.util.List;
3135
import java.util.Set;
36+
3237
import org.slf4j.Logger;
3338
import org.slf4j.LoggerFactory;
3439

35-
/** Utility class to handle the initial contact points passed to the driver. */
40+
/**
41+
* Utility class to handle the initial contact points passed to the driver.
42+
*/
3643
public class ContactPoints {
3744
private static final Logger LOG = LoggerFactory.getLogger(ContactPoints.class);
3845

@@ -41,18 +48,17 @@ public static Set<EndPoint> merge(
4148

4249
Set<EndPoint> result = Sets.newHashSet(programmaticContactPoints);
4350
for (String spec : configContactPoints) {
44-
for (InetSocketAddress address : extract(spec, resolve)) {
45-
DefaultEndPoint endPoint = new DefaultEndPoint(address);
51+
for (EndPoint endPoint : extract(spec, resolve)) {
4652
boolean wasNew = result.add(endPoint);
4753
if (!wasNew) {
48-
LOG.warn("Duplicate contact point {}", address);
54+
LOG.warn("Duplicate contact point {}", spec);
4955
}
5056
}
5157
}
5258
return ImmutableSet.copyOf(result);
5359
}
5460

55-
private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
61+
private static Set<EndPoint> extract(String spec, boolean resolve) {
5662
int separator = spec.lastIndexOf(':');
5763
if (separator < 0) {
5864
LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec);
@@ -69,7 +75,7 @@ private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
6975
return Collections.emptySet();
7076
}
7177
if (!resolve) {
72-
return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port));
78+
return ImmutableSet.of(new UnresolvedEndPoint(host, port));
7379
} else {
7480
try {
7581
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
@@ -79,9 +85,9 @@ private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
7985
spec,
8086
Arrays.deepToString(inetAddresses));
8187
}
82-
Set<InetSocketAddress> result = new HashSet<>();
88+
Set<EndPoint> result = new HashSet<>();
8389
for (InetAddress inetAddress : inetAddresses) {
84-
result.add(new InetSocketAddress(inetAddress, port));
90+
result.add(new DefaultEndPoint(new InetSocketAddress(inetAddress, port)));
8591
}
8692
return result;
8793
} catch (UnknownHostException e) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package com.datastax.oss.driver.internal.core.metadata;
1919

2020
import com.datastax.oss.driver.api.core.metadata.EndPoint;
21+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
2122
import edu.umd.cs.findbugs.annotations.NonNull;
2223
import java.io.Serializable;
2324
import java.net.InetSocketAddress;
25+
import java.util.Collections;
26+
import java.util.List;
2427
import java.util.Objects;
2528

2629
public class DefaultEndPoint implements EndPoint, Serializable {
@@ -41,6 +44,12 @@ public InetSocketAddress resolve() {
4144
return address;
4245
}
4346

47+
@NonNull
48+
@Override
49+
public List<EndPoint> resolveAll() {
50+
return ImmutableList.of(this);
51+
}
52+
4453
@Override
4554
public boolean equals(Object other) {
4655
if (other == this) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@
2525
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
2626
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
2727
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
28+
29+
import java.net.UnknownHostException;
2830
import java.util.HashMap;
31+
import java.util.HashSet;
2932
import java.util.Map;
3033
import java.util.Set;
3134
import java.util.UUID;
35+
3236
import net.jcip.annotations.ThreadSafe;
3337
import org.slf4j.Logger;
3438
import org.slf4j.LoggerFactory;
@@ -42,8 +46,10 @@ class InitialNodeListRefresh extends NodesRefresh {
4246

4347
private static final Logger LOG = LoggerFactory.getLogger(InitialNodeListRefresh.class);
4448

45-
@VisibleForTesting final Iterable<NodeInfo> nodeInfos;
46-
@VisibleForTesting final Set<DefaultNode> contactPoints;
49+
@VisibleForTesting
50+
final Iterable<NodeInfo> nodeInfos;
51+
@VisibleForTesting
52+
final Set<DefaultNode> contactPoints;
4753

4854
InitialNodeListRefresh(Iterable<NodeInfo> nodeInfos, Set<DefaultNode> contactPoints) {
4955
this.nodeInfos = nodeInfos;
@@ -72,21 +78,21 @@ public Result compute(
7278
+ "keeping only the first one",
7379
logPrefix,
7480
hostId);
81+
continue;
82+
}
83+
EndPoint endPoint = nodeInfo.getEndPoint();
84+
DefaultNode node = findIn(contactPoints, endPoint);
85+
if (node == null) {
86+
node = new DefaultNode(endPoint, context);
87+
LOG.debug("[{}] Adding new node {}", logPrefix, node);
7588
} else {
76-
EndPoint endPoint = nodeInfo.getEndPoint();
77-
DefaultNode node = findIn(contactPoints, endPoint);
78-
if (node == null) {
79-
node = new DefaultNode(endPoint, context);
80-
LOG.debug("[{}] Adding new node {}", logPrefix, node);
81-
} else {
82-
LOG.debug("[{}] Copying contact point {}", logPrefix, node);
83-
}
84-
if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) {
85-
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
86-
}
87-
copyInfos(nodeInfo, node, context);
88-
newNodes.put(hostId, node);
89+
LOG.debug("[{}] Copying contact point {}", logPrefix, node);
90+
}
91+
if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) {
92+
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
8993
}
94+
copyInfos(nodeInfo, node, context);
95+
newNodes.put(hostId, node);
9096
}
9197

9298
ImmutableList.Builder<Object> eventsBuilder = ImmutableList.builder();

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,22 @@
4444
import com.datastax.oss.driver.internal.core.util.concurrent.Debouncer;
4545
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
4646
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
47+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
4748
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
4849
import edu.umd.cs.findbugs.annotations.NonNull;
4950
import io.netty.util.concurrent.EventExecutor;
5051
import java.net.InetSocketAddress;
52+
import java.net.UnknownHostException;
5153
import java.nio.ByteBuffer;
5254
import java.util.Collections;
5355
import java.util.List;
5456
import java.util.Map;
5557
import java.util.Set;
5658
import java.util.concurrent.CompletableFuture;
5759
import java.util.concurrent.CompletionStage;
60+
import java.util.concurrent.CopyOnWriteArraySet;
61+
62+
import jnr.ffi.annotations.Synchronized;
5863
import net.jcip.annotations.ThreadSafe;
5964
import org.slf4j.Logger;
6065
import org.slf4j.LoggerFactory;
@@ -80,7 +85,8 @@ public class MetadataManager implements AsyncAutoCloseable {
8085
private volatile KeyspaceFilter keyspaceFilter;
8186
private volatile Boolean schemaEnabledProgrammatically;
8287
private volatile boolean tokenMapEnabled;
83-
private volatile Set<DefaultNode> contactPoints;
88+
private volatile Set<EndPoint> contactPoints;
89+
private volatile Set<DefaultNode> resolvedContactPoints;
8490
private volatile boolean wasImplicitContactPoint;
8591
private volatile TypeCodec<TupleValue> tabletPayloadCodec = null;
8692

@@ -102,7 +108,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial
102108
DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES, Collections.emptyList());
103109
this.keyspaceFilter = KeyspaceFilter.newInstance(logPrefix, refreshedKeyspaces);
104110
this.tokenMapEnabled = config.getBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED);
105-
111+
this.resolvedContactPoints = new CopyOnWriteArraySet<>();
106112
context.getEventBus().register(ConfigChangeEvent.class, this::onConfigChanged);
107113
}
108114

@@ -145,18 +151,19 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
145151
// Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we
146152
// don't know their host_id. So store them in a volatile field instead, they will get copied
147153
// during the first node refresh.
148-
ImmutableSet.Builder<DefaultNode> contactPointsBuilder = ImmutableSet.builder();
154+
ImmutableSet.Builder<EndPoint> contactPointsBuilder = ImmutableSet.builder();
149155
if (providedContactPoints == null || providedContactPoints.isEmpty()) {
150156
LOG.info(
151157
"[{}] No contact points provided, defaulting to {}", logPrefix, DEFAULT_CONTACT_POINT);
152158
this.wasImplicitContactPoint = true;
153-
contactPointsBuilder.add(new DefaultNode(DEFAULT_CONTACT_POINT, context));
159+
contactPointsBuilder.add(DEFAULT_CONTACT_POINT);
154160
} else {
155161
for (EndPoint endPoint : providedContactPoints) {
156-
contactPointsBuilder.add(new DefaultNode(endPoint, context));
162+
contactPointsBuilder.add(endPoint);
157163
}
158164
}
159165
this.contactPoints = contactPointsBuilder.build();
166+
this.resolveContactPoints();
160167
LOG.debug("[{}] Adding initial contact points {}", logPrefix, contactPoints);
161168
}
162169

@@ -167,7 +174,29 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
167174
* @see #wasImplicitContactPoint()
168175
*/
169176
public Set<DefaultNode> getContactPoints() {
170-
return contactPoints;
177+
return resolvedContactPoints;
178+
}
179+
180+
@Synchronized
181+
public void resolveContactPoints() {
182+
ImmutableSet.Builder<EndPoint> resultBuilder = ImmutableSet.builder();
183+
for (EndPoint endPoint : contactPoints) {
184+
try {
185+
resultBuilder.addAll(endPoint.resolveAll());
186+
} catch (UnknownHostException e) {
187+
LOG.error("failed to resolve contact endpoint {}", endPoint, e);
188+
}
189+
}
190+
191+
Set<EndPoint> result = resultBuilder.build();
192+
for (EndPoint endPoint : result) {
193+
if (resolvedContactPoints.stream().anyMatch(resolved -> resolved.getEndPoint().equals(endPoint))) {
194+
continue;
195+
}
196+
this.resolvedContactPoints.add(new DefaultNode(endPoint, context));
197+
}
198+
199+
this.resolvedContactPoints.removeIf(endPoint -> !result.contains(endPoint.getEndPoint()));
171200
}
172201

173202
/** Whether the default contact point was used (because none were provided explicitly). */
@@ -337,10 +366,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con
337366
}
338367

339368
private Void refreshNodes(Iterable<NodeInfo> nodeInfos) {
369+
if (!didFirstNodeListRefresh) {
370+
resolveContactPoints();
371+
}
340372
MetadataRefresh refresh =
341373
didFirstNodeListRefresh
342374
? new FullNodeListRefresh(nodeInfos)
343-
: new InitialNodeListRefresh(nodeInfos, contactPoints);
375+
: new InitialNodeListRefresh(nodeInfos, resolvedContactPoints);
344376
didFirstNodeListRefresh = true;
345377
return apply(refresh);
346378
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package com.datastax.oss.driver.internal.core.metadata;
1919

2020
import com.datastax.oss.driver.api.core.metadata.EndPoint;
21+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
2122
import com.datastax.oss.driver.shaded.guava.common.primitives.UnsignedBytes;
2223
import edu.umd.cs.findbugs.annotations.NonNull;
2324
import java.net.InetAddress;
2425
import java.net.InetSocketAddress;
2526
import java.net.UnknownHostException;
2627
import java.util.Arrays;
2728
import java.util.Comparator;
29+
import java.util.List;
2830
import java.util.Objects;
2931
import java.util.concurrent.atomic.AtomicLong;
3032

@@ -72,6 +74,12 @@ public InetSocketAddress resolve() {
7274
}
7375
}
7476

77+
@NonNull
78+
@Override
79+
public List<EndPoint> resolveAll() {
80+
return ImmutableList.of(this);
81+
}
82+
7583
@Override
7684
public boolean equals(Object other) {
7785
if (other == this) {

0 commit comments

Comments
 (0)