Skip to content

Commit 1ebd326

Browse files
committed
Add optional fallback for ControlConnection#reconnect()
Adds an experimental option to allow `ControlConnection` to try reconnecting to the original contact points held by `MetadataManager`, in case of getting empty query plan from the load balancing policy. In order to separate this logic from query plans of other queries `LoadBalancingPolicyWrapper#newControlReconnectionQueryPlan()` was introduced and is called during reconnection in place of `newQueryPlan()`.
1 parent f319d3d commit 1ebd326

File tree

8 files changed

+60
-2
lines changed

8 files changed

+60
-2
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption {
668668
*/
669669
CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"),
670670

671+
/**
672+
* Whether to forcibly add original contact points held by MetadataManager to the reconnection
673+
* plan, in case there is no live nodes available according to LBP. Experimental.
674+
*
675+
* <p>Value-type: boolean
676+
*/
677+
CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS(
678+
"advanced.control-connection.reconnection.fallback-to-original-contacts"),
679+
671680
/**
672681
* Whether `Session.prepare` calls should be sent to all nodes in the cluster.
673682
*

core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
360360
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200));
361361
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10));
362362
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true);
363+
map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false);
363364
map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true);
364365
map.put(TypedDriverOption.REPREPARE_ENABLED, true);
365366
map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false);

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,10 @@ public String toString() {
566566
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_AGREEMENT_WARN =
567567
new TypedDriverOption<>(
568568
DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN);
569+
/** Whether to forcibly try original contacts if no live nodes are available */
570+
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS =
571+
new TypedDriverOption<>(
572+
DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN);
569573
/** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */
570574
public static final TypedDriverOption<Boolean> PREPARE_ON_ALL_NODES =
571575
new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN);

core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ private void init(
336336

337337
private CompletionStage<Boolean> reconnect() {
338338
assert adminExecutor.inEventLoop();
339-
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
339+
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();
340340
CompletableFuture<Boolean> result = new CompletableFuture<>();
341341
connect(
342342
nodes,

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package com.datastax.oss.driver.internal.core.metadata;
1919

20+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2021
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2122
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
2223
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
@@ -165,6 +166,24 @@ public Queue<Node> newQueryPlan() {
165166
return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
166167
}
167168

169+
@NonNull
170+
public Queue<Node> newControlReconnectionQueryPlan() {
171+
// First try the original way
172+
Queue<Node> regularQueryPlan = newQueryPlan();
173+
if (!regularQueryPlan.isEmpty()) return regularQueryPlan;
174+
175+
if (context
176+
.getConfig()
177+
.getDefaultProfile()
178+
.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) {
179+
List<Node> nodes = new ArrayList<>(context.getMetadataManager().getContactPoints());
180+
Collections.shuffle(nodes);
181+
return new ConcurrentLinkedQueue<>(nodes);
182+
} else {
183+
return regularQueryPlan;
184+
}
185+
}
186+
168187
// when it comes in from the outside
169188
private void onNodeStateEvent(NodeStateEvent event) {
170189
eventFilter.accept(event);

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private void completeOrReschedule(Set<UUID> uuids, Throwable error) {
174174
f -> {
175175
if (!f.isSuccess()) {
176176
LOG.debug(
177-
"[{}] Error while rescheduling schema agreement, completing now (false)",
177+
"[{}] listesner Error while rescheduling schema agreement, completing now (false)",
178178
logPrefix,
179179
f.cause());
180180
}

core/src/main/resources/reference.conf

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,6 +2113,17 @@ datastax-java-driver {
21132113
# Overridable in a profile: no
21142114
warn-on-failure = true
21152115
}
2116+
2117+
reconnection {
2118+
# Whether to forcibly add original contact points held by MetadataManager to the reconnection plan,
2119+
# in case there is no live nodes available according to LBP.
2120+
# Experimental.
2121+
#
2122+
# Required: yes
2123+
# Modifiable at runtime: yes, the new value will be used for checks issued after the change.
2124+
# Overridable in a profile: no
2125+
fallback-to-original-contacts = false
2126+
}
21162127
}
21172128

21182129
advanced.prepared-statements {

core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ public void setup() {
132132
when(defaultProfile.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR))
133133
.thenReturn(false);
134134

135+
when(context.getConfig()).thenReturn(config);
136+
when(config.getDefaultProfile()).thenReturn(defaultProfile);
137+
when(defaultProfile.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS))
138+
.thenReturn(false);
139+
135140
controlConnection = new ControlConnection(context);
136141
}
137142

@@ -145,6 +150,15 @@ protected void mockQueryPlan(Node... nodes) {
145150
}
146151
return queryPlan;
147152
});
153+
when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
154+
.thenAnswer(
155+
i -> {
156+
ConcurrentLinkedQueue<Node> queryPlan = new ConcurrentLinkedQueue<>();
157+
for (Node node : nodes) {
158+
queryPlan.offer(node);
159+
}
160+
return queryPlan;
161+
});
148162
}
149163

150164
@After

0 commit comments

Comments
 (0)