Skip to content

Commit 437f94b

Browse files
authored
Add rack awareness to default load balancing policy
The default load balancing policy will prefer replicas that are in the local datacenter and in the local rack. The rack awareness feature is optional, to enable it a local rack should be supplied through configuration by defining the option `DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK` `basic.load-balancing-policy.local-rack`, otherwise it will be disabled and unlike the local datacenter it will not be implicitly fetched from the provided contact points.
1 parent cd0ef41 commit 437f94b

File tree

11 files changed

+226
-10
lines changed

11 files changed

+226
-10
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public enum DefaultDriverOption implements DriverOption {
9393
* <p>Value-type: {@link String}
9494
*/
9595
LOAD_BALANCING_LOCAL_DATACENTER("basic.load-balancing-policy.local-datacenter"),
96+
/**
97+
* The rack that is considered "local".
98+
*
99+
* <p>Value-type: {@link String}
100+
*/
101+
LOAD_BALANCING_LOCAL_RACK("basic.load-balancing-policy.local-rack"),
96102
/**
97103
* A custom filter to include/exclude nodes.
98104
*

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ public String toString() {
137137
public static final TypedDriverOption<String> LOAD_BALANCING_LOCAL_DATACENTER =
138138
new TypedDriverOption<>(
139139
DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, GenericType.STRING);
140+
141+
public static final TypedDriverOption<String> LOAD_BALANCING_LOCAL_RACK =
142+
new TypedDriverOption<>(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK, GenericType.STRING);
140143
/**
141144
* A custom filter to include/exclude nodes.
142145
*

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
4141
import com.datastax.oss.driver.internal.core.loadbalancing.helper.DefaultNodeDistanceEvaluatorHelper;
4242
import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalDcHelper;
43+
import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalRackHelper;
4344
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.DcAgnosticNodeSet;
4445
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.MultiDcNodeSet;
4546
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet;
@@ -121,6 +122,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
121122
private volatile DistanceReporter distanceReporter;
122123
private volatile NodeDistanceEvaluator nodeDistanceEvaluator;
123124
private volatile String localDc;
125+
private volatile String localRack;
124126
private volatile NodeSet liveNodes;
125127

126128
public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
@@ -154,6 +156,11 @@ protected String getLocalDatacenter() {
154156
return localDc;
155157
}
156158

159+
@Nullable
160+
protected String getLocalRack() {
161+
return localRack;
162+
}
163+
157164
/** @return The nodes currently considered as live. */
158165
protected NodeSet getLiveNodes() {
159166
return liveNodes;
@@ -163,6 +170,8 @@ protected NodeSet getLiveNodes() {
163170
public void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter) {
164171
this.distanceReporter = distanceReporter;
165172
localDc = discoverLocalDc(nodes).orElse(null);
173+
// If local datacenter is not provided then the rack awareness is disabled
174+
localRack = localDc != null ? discoverLocalRack(nodes).orElse(null) : null;
166175
nodeDistanceEvaluator = createNodeDistanceEvaluator(localDc, nodes);
167176
liveNodes =
168177
localDc == null
@@ -207,6 +216,11 @@ protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
207216
return new OptionalLocalDcHelper(context, profile, logPrefix).discoverLocalDc(nodes);
208217
}
209218

219+
@NonNull
220+
protected Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes) {
221+
return new OptionalLocalRackHelper(profile, logPrefix).discoverLocalRack(nodes);
222+
}
223+
210224
/**
211225
* Creates a new node distance evaluator to use with this policy.
212226
*
@@ -356,6 +370,11 @@ protected void shuffleHead(Object[] currentNodes, int headLength) {
356370
ArrayUtils.shuffleHead(currentNodes, headLength);
357371
}
358372

373+
/** Exposed as a protected method so that it can be accessed by tests */
374+
protected void shuffleInRange(Object[] currentNodes, int startIndex, int endIndex) {
375+
ArrayUtils.shuffleInRange(currentNodes, startIndex, endIndex);
376+
}
377+
359378
@Override
360379
public void onAdd(@NonNull Node node) {
361380
NodeDistance distance = computeNodeDistance(node);

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import edu.umd.cs.findbugs.annotations.Nullable;
3636
import java.util.BitSet;
3737
import java.util.Map;
38+
import java.util.Objects;
3839
import java.util.Optional;
3940
import java.util.Queue;
4041
import java.util.Set;
@@ -132,21 +133,36 @@ public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session ses
132133

133134
Set<Node> allReplicas = getReplicas(request, session);
134135
int replicaCount = 0; // in currentNodes
136+
int localRackReplicaCount = 0; // in currentNodes
137+
String localRack = getLocalRack();
135138

136139
if (!allReplicas.isEmpty()) {
137140

138141
// Move replicas to the beginning of the plan
142+
// Replicas in local rack should precede other replicas
139143
for (int i = 0; i < currentNodes.length; i++) {
140144
Node node = (Node) currentNodes[i];
141145
if (allReplicas.contains(node)) {
142-
ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
146+
if (Objects.equals(node.getRack(), localRack)
147+
&& Objects.equals(node.getDatacenter(), getLocalDatacenter())) {
148+
ArrayUtils.bubbleUp(currentNodes, i, localRackReplicaCount);
149+
localRackReplicaCount++;
150+
} else {
151+
ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
152+
}
143153
replicaCount++;
144154
}
145155
}
146156

147157
if (replicaCount > 1) {
148-
149-
shuffleHead(currentNodes, replicaCount);
158+
if (localRack != null && localRackReplicaCount > 0) {
159+
// Shuffle only replicas that are in the local rack
160+
shuffleHead(currentNodes, localRackReplicaCount);
161+
// Shuffles only replicas that are not in local rack
162+
shuffleInRange(currentNodes, localRackReplicaCount, replicaCount - 1);
163+
} else {
164+
shuffleHead(currentNodes, replicaCount);
165+
}
150166

151167
if (replicaCount > 2) {
152168

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.datastax.oss.driver.internal.core.loadbalancing.helper;
2+
3+
import com.datastax.oss.driver.api.core.metadata.Node;
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import java.util.Map;
6+
import java.util.Optional;
7+
import java.util.UUID;
8+
9+
public interface LocalRackHelper {
10+
@NonNull
11+
Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes);
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.datastax.oss.driver.internal.core.loadbalancing.helper;
2+
3+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
4+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
5+
import com.datastax.oss.driver.api.core.metadata.Node;
6+
import edu.umd.cs.findbugs.annotations.NonNull;
7+
import java.util.Map;
8+
import java.util.Optional;
9+
import java.util.UUID;
10+
import net.jcip.annotations.ThreadSafe;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
/**
15+
* An implementation of {@link LocalRackHelper} that fetches the local rack from the driver
16+
* configuration. If no user-supplied rack can be retrieved, it returns {@link Optional#empty
17+
* empty}.
18+
*/
19+
@ThreadSafe
20+
public class OptionalLocalRackHelper implements LocalRackHelper {
21+
private static final Logger LOG = LoggerFactory.getLogger(OptionalLocalRackHelper.class);
22+
23+
@NonNull protected final DriverExecutionProfile profile;
24+
@NonNull protected final String logPrefix;
25+
26+
public OptionalLocalRackHelper(
27+
@NonNull DriverExecutionProfile profile, @NonNull String logPrefix) {
28+
this.profile = profile;
29+
this.logPrefix = logPrefix;
30+
}
31+
32+
@NonNull
33+
@Override
34+
public Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes) {
35+
if (profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)) {
36+
String localRack = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK);
37+
LOG.debug("[{}] Local rack set from configuration: {}", logPrefix, localRack);
38+
return Optional.of(localRack);
39+
} else {
40+
LOG.debug("[{}] Local rack not set, rack awareness will be disabled", logPrefix);
41+
return Optional.empty();
42+
}
43+
}
44+
}

core/src/main/java/com/datastax/oss/driver/internal/core/util/ArrayUtils.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,34 @@ public static <ElementT> void shuffleHead(
8989
}
9090
}
9191

92+
/**
93+
* Shuffles elements of the array in a range.
94+
*
95+
* @param elements the array to shuffle.
96+
* @param startIndex the start index of the range; must be {@code >= 0}.
97+
* @param endIndex the end index of the range; must be {@code <= elements.length}.
98+
* @see <a
99+
* href="https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_modern_algorithm">Modern
100+
* Fisher-Yates shuffle</a>
101+
*/
102+
public static <ElementT> void shuffleInRange(
103+
@NonNull ElementT[] elements, int startIndex, int endIndex) {
104+
assert startIndex >= 0 && startIndex < elements.length;
105+
assert endIndex >= 0 && endIndex < elements.length;
106+
107+
if (startIndex > endIndex) {
108+
throw new IllegalArgumentException(
109+
String.format(
110+
"Start index %d should be less than or equal to endIndex %d", startIndex, endIndex));
111+
}
112+
113+
final ThreadLocalRandom random = ThreadLocalRandom.current();
114+
for (int i = endIndex; i > startIndex; i--) {
115+
int j = random.nextInt(startIndex, i + 1);
116+
swap(elements, i, j);
117+
}
118+
}
119+
92120
/** Rotates the elements in the specified range by the specified amount (round-robin). */
93121
public static <ElementT> void rotate(
94122
@NonNull ElementT[] elements, int startIndex, int length, int amount) {

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DcInferringLoadBalancingPolicyQueryPlanTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ protected DcInferringLoadBalancingPolicy createAndInitPolicy() {
3232
@Override
3333
protected void shuffleHead(Object[] array, int n) {}
3434

35+
@Override
36+
protected void shuffleInRange(Object[] currentNodes, int startIndex, int endIndex) {}
37+
3538
@Override
3639
protected long nanoTime() {
3740
return nanoTime;

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyInitTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,23 @@ public void should_use_local_dc_if_provided_via_config() {
5151
assertThat(policy.getLocalDatacenter()).isEqualTo("dc1");
5252
}
5353

54+
@Test
55+
public void should_use_local_rack_if_provided_via_config() {
56+
// Given
57+
when(defaultProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)).thenReturn(true);
58+
when(defaultProfile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK))
59+
.thenReturn("rack1");
60+
when(metadataManager.getContactPoints()).thenReturn(ImmutableSet.of(node1));
61+
62+
DefaultLoadBalancingPolicy policy = createPolicy();
63+
64+
// When
65+
policy.init(ImmutableMap.of(UUID.randomUUID(), node1), distanceReporter);
66+
67+
// Then
68+
assertThat(policy.getLocalRack()).isEqualTo("rack1");
69+
}
70+
5471
@Test
5572
public void should_use_local_dc_if_provided_via_context() {
5673
// Given

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyQueryPlanTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,54 @@ public void should_reorder_first_two_replicas_when_first_has_more_in_flight_than
331331
then(dsePolicy).should(never()).diceRoll1d4();
332332
}
333333

334+
@Test
335+
public void should_not_shuffle_local_rack_replica_with_all_replicas_healthy() {
336+
// Given
337+
given(request.getRoutingKeyspace()).willReturn(KEYSPACE);
338+
given(request.getRoutingKey()).willReturn(ROUTING_KEY);
339+
given(tokenMap.getReplicas(KEYSPACE, null, ROUTING_KEY))
340+
.willReturn(ImmutableSet.of(node1, node3, node5));
341+
String localRack = "rack1";
342+
given(dsePolicy.getLocalRack()).willReturn(localRack);
343+
given(node1.getRack()).willReturn(localRack);
344+
given(pool1.getInFlight()).willReturn(0);
345+
given(pool3.getInFlight()).willReturn(0);
346+
given(pool5.getInFlight()).willReturn(0);
347+
348+
// When
349+
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);
350+
Queue<Node> plan2 = dsePolicy.newQueryPlan(request, session);
351+
352+
// Then
353+
// nodes 1, 3 and 5 always first, round-robin on the rest
354+
assertThat(plan1).containsExactly(node1, node3, node5, node2, node4);
355+
assertThat(plan2).containsExactly(node1, node3, node5, node4, node2);
356+
}
357+
358+
@Test
359+
public void should_prefer_local_rack_replica_with_less_inflight_requests() {
360+
// Given
361+
given(request.getRoutingKeyspace()).willReturn(KEYSPACE);
362+
given(request.getRoutingKey()).willReturn(ROUTING_KEY);
363+
given(tokenMap.getReplicas(KEYSPACE, null, ROUTING_KEY))
364+
.willReturn(ImmutableSet.of(node1, node3, node5));
365+
String localRack = "rack1";
366+
given(dsePolicy.getLocalRack()).willReturn(localRack);
367+
given(node3.getRack()).willReturn(localRack);
368+
given(node5.getRack()).willReturn(localRack);
369+
given(pool1.getInFlight()).willReturn(0);
370+
given(pool3.getInFlight()).willReturn(20);
371+
given(pool5.getInFlight()).willReturn(10);
372+
373+
// When
374+
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);
375+
Queue<Node> plan2 = dsePolicy.newQueryPlan(request, session);
376+
377+
// Then
378+
assertThat(plan1).containsExactly(node5, node3, node1, node2, node4);
379+
assertThat(plan2).containsExactly(node5, node3, node1, node4, node2);
380+
}
381+
334382
@Override
335383
protected DefaultLoadBalancingPolicy createAndInitPolicy() {
336384
DefaultLoadBalancingPolicy policy =
@@ -339,6 +387,9 @@ protected DefaultLoadBalancingPolicy createAndInitPolicy() {
339387
@Override
340388
protected void shuffleHead(Object[] array, int n) {}
341389

390+
@Override
391+
protected void shuffleInRange(Object[] array, int startIndex, int endIndex) {}
392+
342393
@Override
343394
protected long nanoTime() {
344395
return nanoTime;

0 commit comments

Comments
 (0)