File tree Expand file tree Collapse file tree 7 files changed +63
-1
lines changed
java/com/datastax/oss/driver
test/java/com/datastax/oss/driver/internal/core/control Expand file tree Collapse file tree 7 files changed +63
-1
lines changed Original file line number Diff line number Diff line change @@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption {
668668 */
669669 CONTROL_CONNECTION_AGREEMENT_WARN ("advanced.control-connection.schema-agreement.warn-on-failure" ),
670670
671+ /**
672+ * Whether to forcibly add original contact points held by MetadataManager to the reconnection
673+ * plan, in case there is no live nodes available according to LBP. Experimental.
674+ *
675+ * <p>Value-type: boolean
676+ */
677+ CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS (
678+ "advanced.control-connection.reconnection.fallback-to-original-contacts" ),
679+
671680 /**
672681 * Whether `Session.prepare` calls should be sent to all nodes in the cluster.
673682 *
Original file line number Diff line number Diff line change @@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
360360 map .put (TypedDriverOption .CONTROL_CONNECTION_AGREEMENT_INTERVAL , Duration .ofMillis (200 ));
361361 map .put (TypedDriverOption .CONTROL_CONNECTION_AGREEMENT_TIMEOUT , Duration .ofSeconds (10 ));
362362 map .put (TypedDriverOption .CONTROL_CONNECTION_AGREEMENT_WARN , true );
363+ map .put (TypedDriverOption .CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS , false );
363364 map .put (TypedDriverOption .PREPARE_ON_ALL_NODES , true );
364365 map .put (TypedDriverOption .REPREPARE_ENABLED , true );
365366 map .put (TypedDriverOption .REPREPARE_CHECK_SYSTEM_TABLE , false );
Original file line number Diff line number Diff line change @@ -566,6 +566,10 @@ public String toString() {
566566 public static final TypedDriverOption <Boolean > CONTROL_CONNECTION_AGREEMENT_WARN =
567567 new TypedDriverOption <>(
568568 DefaultDriverOption .CONTROL_CONNECTION_AGREEMENT_WARN , GenericType .BOOLEAN );
569+ /** Whether to forcibly try original contacts if no live nodes are available */
570+ public static final TypedDriverOption <Boolean > CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS =
571+ new TypedDriverOption <>(
572+ DefaultDriverOption .CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS , GenericType .BOOLEAN );
569573 /** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */
570574 public static final TypedDriverOption <Boolean > PREPARE_ON_ALL_NODES =
571575 new TypedDriverOption <>(DefaultDriverOption .PREPARE_ON_ALL_NODES , GenericType .BOOLEAN );
Original file line number Diff line number Diff line change @@ -336,7 +336,7 @@ private void init(
336336
337337 private CompletionStage <Boolean > reconnect () {
338338 assert adminExecutor .inEventLoop ();
339- Queue <Node > nodes = context .getLoadBalancingPolicyWrapper ().newQueryPlan ();
339+ Queue <Node > nodes = context .getLoadBalancingPolicyWrapper ().newControlReconnectionQueryPlan ();
340340 CompletableFuture <Boolean > result = new CompletableFuture <>();
341341 connect (
342342 nodes ,
Original file line number Diff line number Diff line change 1717 */
1818package com .datastax .oss .driver .internal .core .metadata ;
1919
20+ import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
2021import com .datastax .oss .driver .api .core .config .DriverExecutionProfile ;
2122import com .datastax .oss .driver .api .core .loadbalancing .LoadBalancingPolicy ;
2223import com .datastax .oss .driver .api .core .loadbalancing .NodeDistance ;
@@ -165,6 +166,28 @@ public Queue<Node> newQueryPlan() {
165166 return newQueryPlan (null , DriverExecutionProfile .DEFAULT_NAME , null );
166167 }
167168
169+ @ NonNull
170+ public Queue <Node > newControlReconnectionQueryPlan () {
171+ // First try the original way
172+ Queue <Node > regularQueryPlan = newQueryPlan ();
173+ if (!regularQueryPlan .isEmpty ()) return regularQueryPlan ;
174+
175+ if (context
176+ .getConfig ()
177+ .getDefaultProfile ()
178+ .getBoolean (DefaultDriverOption .CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS )) {
179+ Set <DefaultNode > originalNodes = context .getMetadataManager ().getContactPoints ();
180+ List <Node > nodes = new ArrayList <>();
181+ for (DefaultNode node : originalNodes ) {
182+ nodes .add (new DefaultNode (node .getEndPoint (), context ));
183+ }
184+ Collections .shuffle (nodes );
185+ return new ConcurrentLinkedQueue <>(nodes );
186+ } else {
187+ return regularQueryPlan ;
188+ }
189+ }
190+
168191 // when it comes in from the outside
169192 private void onNodeStateEvent (NodeStateEvent event ) {
170193 eventFilter .accept (event );
Original file line number Diff line number Diff line change @@ -2113,6 +2113,17 @@ datastax-java-driver {
21132113 # Overridable in a profile: no
21142114 warn-on-failure = true
21152115 }
2116+
2117+ reconnection {
2118+ # Whether to forcibly add original contact points held by MetadataManager to the reconnection plan,
2119+ # in case there is no live nodes available according to LBP.
2120+ # Experimental.
2121+ #
2122+ # Required: yes
2123+ # Modifiable at runtime: yes, the new value will be used for checks issued after the change.
2124+ # Overridable in a profile: no
2125+ fallback-to-original-contacts = false
2126+ }
21162127 }
21172128
21182129 advanced.prepared-statements {
Original file line number Diff line number Diff line change @@ -132,6 +132,11 @@ public void setup() {
132132 when (defaultProfile .getBoolean (DefaultDriverOption .CONNECTION_WARN_INIT_ERROR ))
133133 .thenReturn (false );
134134
135+ when (context .getConfig ()).thenReturn (config );
136+ when (config .getDefaultProfile ()).thenReturn (defaultProfile );
137+ when (defaultProfile .getBoolean (DefaultDriverOption .CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS ))
138+ .thenReturn (false );
139+
135140 controlConnection = new ControlConnection (context );
136141 }
137142
@@ -145,6 +150,15 @@ protected void mockQueryPlan(Node... nodes) {
145150 }
146151 return queryPlan ;
147152 });
153+ when (loadBalancingPolicyWrapper .newControlReconnectionQueryPlan ())
154+ .thenAnswer (
155+ i -> {
156+ ConcurrentLinkedQueue <Node > queryPlan = new ConcurrentLinkedQueue <>();
157+ for (Node node : nodes ) {
158+ queryPlan .offer (node );
159+ }
160+ return queryPlan ;
161+ });
148162 }
149163
150164 @ After
You can’t perform that action at this time.
0 commit comments