Skip to content

Commit 80bfedf

Browse files
committed
Merge commit 'b9760b473b6e6e30f5da5f743e37e02150e13e39' of https://github.com/apache/cassandra-java-driver into pull-upstream-4.18.1-v3
Added extra stubs in BasicLoadBalancingPolicyPreferredRemoteDcsTest to match Scylla's modifications.
2 parents 8ba9a47 + b9760b4 commit 80bfedf

File tree

6 files changed

+278
-22
lines changed

6 files changed

+278
-22
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,13 @@ public enum DefaultDriverOption implements DriverOption {
10031003
* <p>Value-type: {@link java.time.Duration}
10041004
*/
10051005
SSL_KEYSTORE_RELOAD_INTERVAL("advanced.ssl-engine-factory.keystore-reload-interval"),
1006-
;
1006+
/**
1007+
* Ordered preference list of remote dcs optionally supplied for automatic failover.
1008+
*
1009+
* <p>Value type: {@link java.util.List List}&#60;{@link String}&#62;
1010+
*/
1011+
LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS(
1012+
"advanced.load-balancing-policy.dc-failover.preferred-remote-dcs");
10071013

10081014
private final String path;
10091015

core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,8 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
383383
map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC, 0);
384384
map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS, false);
385385
map.put(TypedDriverOption.METRICS_GENERATE_AGGREGABLE_HISTOGRAMS, true);
386+
map.put(
387+
TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS, ImmutableList.of(""));
386388
}
387389

388390
@Immutable

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -903,6 +903,16 @@ public String toString() {
903903
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS,
904904
GenericType.BOOLEAN);
905905

906+
/**
907+
* Ordered preference list of remote dcs optionally supplied for automatic failover and included
908+
* in query plan. This feature is enabled only when max-nodes-per-remote-dc is greater than 0.
909+
*/
910+
public static final TypedDriverOption<List<String>>
911+
LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS =
912+
new TypedDriverOption<>(
913+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS,
914+
GenericType.listOf(String.class));
915+
906916
private static Iterable<TypedDriverOption<?>> introspectBuiltInValues() {
907917
try {
908918
ImmutableList.Builder<TypedDriverOption<?>> result = ImmutableList.builder();

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,14 @@
5757
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
5858
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
5959
import com.datastax.oss.driver.shaded.guava.common.base.Predicates;
60+
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
61+
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
6062
import edu.umd.cs.findbugs.annotations.NonNull;
6163
import edu.umd.cs.findbugs.annotations.Nullable;
6264
import java.nio.ByteBuffer;
6365
import java.util.Collections;
66+
import java.util.LinkedHashSet;
67+
import java.util.List;
6468
import java.util.Map;
6569
import java.util.Objects;
6670
import java.util.Optional;
@@ -130,6 +134,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
130134
private volatile String localDc;
131135
private volatile String localRack;
132136
private volatile NodeSet liveNodes;
137+
private final LinkedHashSet<String> preferredRemoteDcs;
133138

134139
public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
135140
this.context = (InternalDriverContext) context;
@@ -144,6 +149,11 @@ public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String
144149
this.context
145150
.getConsistencyLevelRegistry()
146151
.nameToLevel(profile.getString(DefaultDriverOption.REQUEST_CONSISTENCY));
152+
153+
preferredRemoteDcs =
154+
new LinkedHashSet<>(
155+
profile.getStringList(
156+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS));
147157
}
148158

149159
/**
@@ -373,27 +383,59 @@ protected Queue<Node> maybeAddDcFailover(@Nullable Request request, @NonNull Que
373383
return local;
374384
}
375385
}
376-
QueryPlan remote =
377-
new LazyQueryPlan() {
378-
379-
@Override
380-
protected Object[] computeNodes() {
381-
Object[] remoteNodes =
382-
liveNodes.dcs().stream()
383-
.filter(Predicates.not(Predicates.equalTo(localDc)))
384-
.flatMap(dc -> liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc))
385-
.toArray();
386-
387-
int remoteNodesLength = remoteNodes.length;
388-
if (remoteNodesLength == 0) {
389-
return EMPTY_NODES;
390-
}
391-
shuffleHead(remoteNodes, remoteNodesLength);
392-
return remoteNodes;
393-
}
394-
};
395-
396-
return new CompositeQueryPlan(local, remote);
386+
if (preferredRemoteDcs.isEmpty()) {
387+
return new CompositeQueryPlan(local, buildRemoteQueryPlanAll());
388+
}
389+
return new CompositeQueryPlan(local, buildRemoteQueryPlanPreferred());
390+
}
391+
392+
private QueryPlan buildRemoteQueryPlanAll() {
393+
394+
return new LazyQueryPlan() {
395+
@Override
396+
protected Object[] computeNodes() {
397+
398+
Object[] remoteNodes =
399+
liveNodes.dcs().stream()
400+
.filter(Predicates.not(Predicates.equalTo(localDc)))
401+
.flatMap(dc -> liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc))
402+
.toArray();
403+
if (remoteNodes.length == 0) {
404+
return EMPTY_NODES;
405+
}
406+
shuffleHead(remoteNodes, remoteNodes.length);
407+
return remoteNodes;
408+
}
409+
};
410+
}
411+
412+
private QueryPlan buildRemoteQueryPlanPreferred() {
413+
414+
Set<String> dcs = liveNodes.dcs();
415+
List<String> orderedDcs = Lists.newArrayListWithCapacity(dcs.size());
416+
orderedDcs.addAll(preferredRemoteDcs);
417+
orderedDcs.addAll(Sets.difference(dcs, preferredRemoteDcs));
418+
419+
QueryPlan[] queryPlans =
420+
orderedDcs.stream()
421+
.filter(Predicates.not(Predicates.equalTo(localDc)))
422+
.map(
423+
(dc) -> {
424+
return new LazyQueryPlan() {
425+
@Override
426+
protected Object[] computeNodes() {
427+
Object[] rv = liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc).toArray();
428+
if (rv.length == 0) {
429+
return EMPTY_NODES;
430+
}
431+
shuffleHead(rv, rv.length);
432+
return rv;
433+
}
434+
};
435+
})
436+
.toArray(QueryPlan[]::new);
437+
438+
return new CompositeQueryPlan(queryPlans);
397439
}
398440

399441
/** Exposed as a protected method so that it can be accessed by tests */

core/src/main/resources/reference.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,11 @@ datastax-java-driver {
574574
# Modifiable at runtime: no
575575
# Overridable in a profile: yes
576576
allow-for-local-consistency-levels = false
577+
# Ordered preference list of remote dc's (in order) optionally supplied for automatic failover. While building a query plan, the driver uses the DC's supplied in order together with max-nodes-per-remote-dc
578+
# Required: no
579+
# Modifiable at runtime: no
580+
# Overridable in a profile: no
581+
preferred-remote-dcs = [""]
577582
}
578583
}
579584

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.internal.core.loadbalancing;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.eq;
23+
import static org.mockito.ArgumentMatchers.isNull;
24+
import static org.mockito.Mockito.atLeast;
25+
import static org.mockito.Mockito.never;
26+
import static org.mockito.Mockito.spy;
27+
import static org.mockito.Mockito.times;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.when;
30+
31+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
32+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
33+
import com.datastax.oss.driver.api.core.metadata.Node;
34+
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
35+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
36+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
37+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
38+
import java.util.Map;
39+
import java.util.UUID;
40+
import org.junit.Test;
41+
import org.mockito.Mock;
42+
43+
public class BasicLoadBalancingPolicyPreferredRemoteDcsTest
44+
extends BasicLoadBalancingPolicyDcFailoverTest {
45+
@Mock protected DefaultNode node10;
46+
@Mock protected DefaultNode node11;
47+
@Mock protected DefaultNode node12;
48+
@Mock protected DefaultNode node13;
49+
@Mock protected DefaultNode node14;
50+
51+
@Override
52+
@Test
53+
public void should_prioritize_single_replica() {
54+
when(request.getRoutingKeyspace()).thenReturn(KEYSPACE);
55+
when(request.getRoutingKey()).thenReturn(ROUTING_KEY);
56+
when(tokenMap.getReplicas(KEYSPACE, ROUTING_KEY)).thenReturn(ImmutableSet.of(node3));
57+
// Additional mock. Implementation diverges from upstream enough to call this method instead.
58+
when(tokenMap.getReplicas(eq(KEYSPACE), isNull(), eq(ROUTING_KEY)))
59+
.thenReturn(ImmutableSet.of(node3));
60+
61+
// node3 always first, round-robin on the rest
62+
assertThat(policy.newQueryPlan(request, session))
63+
.containsExactly(
64+
node3, node1, node2, node4, node5, node9, node10, node6, node7, node12, node13);
65+
assertThat(policy.newQueryPlan(request, session))
66+
.containsExactly(
67+
node3, node2, node4, node5, node1, node9, node10, node6, node7, node12, node13);
68+
assertThat(policy.newQueryPlan(request, session))
69+
.containsExactly(
70+
node3, node4, node5, node1, node2, node9, node10, node6, node7, node12, node13);
71+
assertThat(policy.newQueryPlan(request, session))
72+
.containsExactly(
73+
node3, node5, node1, node2, node4, node9, node10, node6, node7, node12, node13);
74+
75+
// Should not shuffle replicas since there is only one
76+
verify(policy, never()).shuffleHead(any(), eq(1));
77+
// But should shuffle remote nodes
78+
verify(policy, times(12)).shuffleHead(any(), eq(2));
79+
}
80+
81+
@Override
82+
@Test
83+
public void should_prioritize_and_shuffle_replicas() {
84+
when(request.getRoutingKeyspace()).thenReturn(KEYSPACE);
85+
when(request.getRoutingKey()).thenReturn(ROUTING_KEY);
86+
when(tokenMap.getReplicas(KEYSPACE, ROUTING_KEY))
87+
.thenReturn(ImmutableSet.of(node1, node2, node3, node6, node9));
88+
// Additional mock. Implementation diverges from upstream enough to call this method instead.
89+
when(tokenMap.getReplicas(eq(KEYSPACE), isNull(), eq(ROUTING_KEY)))
90+
.thenReturn(ImmutableSet.of(node1, node2, node3, node6, node9));
91+
92+
// node 6 and 9 being in a remote DC, they don't get a boost for being a replica
93+
assertThat(policy.newQueryPlan(request, session))
94+
.containsExactly(
95+
node1, node2, node3, node4, node5, node9, node10, node6, node7, node12, node13);
96+
assertThat(policy.newQueryPlan(request, session))
97+
.containsExactly(
98+
node1, node2, node3, node5, node4, node9, node10, node6, node7, node12, node13);
99+
100+
// should shuffle replicas
101+
verify(policy, times(2)).shuffleHead(any(), eq(3));
102+
// should shuffle remote nodes
103+
verify(policy, times(6)).shuffleHead(any(), eq(2));
104+
// No power of two choices with only two replicas
105+
verify(session, never()).getPools();
106+
}
107+
108+
@Override
109+
protected void assertRoundRobinQueryPlans() {
110+
for (int i = 0; i < 3; i++) {
111+
assertThat(policy.newQueryPlan(request, session))
112+
.containsExactly(
113+
node1, node2, node3, node4, node5, node9, node10, node6, node7, node12, node13);
114+
assertThat(policy.newQueryPlan(request, session))
115+
.containsExactly(
116+
node2, node3, node4, node5, node1, node9, node10, node6, node7, node12, node13);
117+
assertThat(policy.newQueryPlan(request, session))
118+
.containsExactly(
119+
node3, node4, node5, node1, node2, node9, node10, node6, node7, node12, node13);
120+
assertThat(policy.newQueryPlan(request, session))
121+
.containsExactly(
122+
node4, node5, node1, node2, node3, node9, node10, node6, node7, node12, node13);
123+
assertThat(policy.newQueryPlan(request, session))
124+
.containsExactly(
125+
node5, node1, node2, node3, node4, node9, node10, node6, node7, node12, node13);
126+
}
127+
128+
verify(policy, atLeast(15)).shuffleHead(any(), eq(2));
129+
}
130+
131+
@Override
132+
protected BasicLoadBalancingPolicy createAndInitPolicy() {
133+
when(node4.getDatacenter()).thenReturn("dc1");
134+
when(node5.getDatacenter()).thenReturn("dc1");
135+
when(node6.getDatacenter()).thenReturn("dc2");
136+
when(node7.getDatacenter()).thenReturn("dc2");
137+
when(node8.getDatacenter()).thenReturn("dc2");
138+
when(node9.getDatacenter()).thenReturn("dc3");
139+
when(node10.getDatacenter()).thenReturn("dc3");
140+
when(node11.getDatacenter()).thenReturn("dc3");
141+
when(node12.getDatacenter()).thenReturn("dc4");
142+
when(node13.getDatacenter()).thenReturn("dc4");
143+
when(node14.getDatacenter()).thenReturn("dc4");
144+
145+
// Accept 2 nodes per remote DC
146+
when(defaultProfile.getInt(
147+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC))
148+
.thenReturn(2);
149+
when(defaultProfile.getBoolean(
150+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS))
151+
.thenReturn(false);
152+
153+
when(defaultProfile.getStringList(
154+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS))
155+
.thenReturn(ImmutableList.of("dc3", "dc2"));
156+
157+
// Use a subclass to disable shuffling, we just spy to make sure that the shuffling method was
158+
// called (makes tests easier)
159+
BasicLoadBalancingPolicy policy =
160+
spy(
161+
new BasicLoadBalancingPolicy(context, DriverExecutionProfile.DEFAULT_NAME) {
162+
@Override
163+
protected void shuffleHead(Object[] currentNodes, int headLength) {
164+
// nothing (keep in same order)
165+
}
166+
});
167+
Map<UUID, Node> nodes =
168+
ImmutableMap.<UUID, Node>builder()
169+
.put(UUID.randomUUID(), node1)
170+
.put(UUID.randomUUID(), node2)
171+
.put(UUID.randomUUID(), node3)
172+
.put(UUID.randomUUID(), node4)
173+
.put(UUID.randomUUID(), node5)
174+
.put(UUID.randomUUID(), node6)
175+
.put(UUID.randomUUID(), node7)
176+
.put(UUID.randomUUID(), node8)
177+
.put(UUID.randomUUID(), node9)
178+
.put(UUID.randomUUID(), node10)
179+
.put(UUID.randomUUID(), node11)
180+
.put(UUID.randomUUID(), node12)
181+
.put(UUID.randomUUID(), node13)
182+
.put(UUID.randomUUID(), node14)
183+
.build();
184+
policy.init(nodes, distanceReporter);
185+
assertThat(policy.getLiveNodes().dc("dc1")).containsExactly(node1, node2, node3, node4, node5);
186+
assertThat(policy.getLiveNodes().dc("dc2")).containsExactly(node6, node7); // only 2 allowed
187+
assertThat(policy.getLiveNodes().dc("dc3")).containsExactly(node9, node10); // only 2 allowed
188+
assertThat(policy.getLiveNodes().dc("dc4")).containsExactly(node12, node13); // only 2 allowed
189+
return policy;
190+
}
191+
}

0 commit comments

Comments
 (0)