Skip to content

Commit 64dc1e0

Browse files
committed
Add rack awareness to default load balancing policy
The default load balancing policy will prefer replicas that are in the local datacenter and in the local rack. The rack awareness feature is optional, to enable it a local rack should be supplied through configuration by defining the option `DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK` `basic.load-balancing-policy.local-datacenter`, otherwise it will be disabled and unlike the local datacenter it will not be implicitly fetched from the provided contact points.
1 parent 834f231 commit 64dc1e0

File tree

9 files changed

+174
-9
lines changed

9 files changed

+174
-9
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public enum DefaultDriverOption implements DriverOption {
9393
* <p>Value-type: {@link String}
9494
*/
9595
LOAD_BALANCING_LOCAL_DATACENTER("basic.load-balancing-policy.local-datacenter"),
96+
/**
97+
* The rack that is considered "local".
98+
*
99+
* <p>Value-type: {@link String}
100+
*/
101+
LOAD_BALANCING_LOCAL_RACK("basic.load-balancing-policy.local-rack"),
96102
/**
97103
* A custom filter to include/exclude nodes.
98104
*

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ public String toString() {
137137
public static final TypedDriverOption<String> LOAD_BALANCING_LOCAL_DATACENTER =
138138
new TypedDriverOption<>(
139139
DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, GenericType.STRING);
140+
141+
public static final TypedDriverOption<String> LOAD_BALANCING_LOCAL_RACK =
142+
new TypedDriverOption<>(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK, GenericType.STRING);
140143
/**
141144
* A custom filter to include/exclude nodes.
142145
*

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
4141
import com.datastax.oss.driver.internal.core.loadbalancing.helper.DefaultNodeDistanceEvaluatorHelper;
4242
import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalDcHelper;
43+
import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalRackHelper;
4344
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.DcAgnosticNodeSet;
4445
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.MultiDcNodeSet;
4546
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet;
@@ -121,6 +122,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
121122
private volatile DistanceReporter distanceReporter;
122123
private volatile NodeDistanceEvaluator nodeDistanceEvaluator;
123124
private volatile String localDc;
125+
private volatile String localRack;
124126
private volatile NodeSet liveNodes;
125127

126128
public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
@@ -154,6 +156,11 @@ protected String getLocalDatacenter() {
154156
return localDc;
155157
}
156158

159+
@Nullable
160+
protected String getLocalRack() {
161+
return localRack;
162+
}
163+
157164
/** @return The nodes currently considered as live. */
158165
protected NodeSet getLiveNodes() {
159166
return liveNodes;
@@ -163,6 +170,8 @@ protected NodeSet getLiveNodes() {
163170
public void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter) {
164171
this.distanceReporter = distanceReporter;
165172
localDc = discoverLocalDc(nodes).orElse(null);
173+
// If local datacenter is not provided then the rack awareness is disabled
174+
localRack = localDc != null ? discoverLocalRack(nodes).orElse(null) : null;
166175
nodeDistanceEvaluator = createNodeDistanceEvaluator(localDc, nodes);
167176
liveNodes =
168177
localDc == null
@@ -207,6 +216,11 @@ protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
207216
return new OptionalLocalDcHelper(context, profile, logPrefix).discoverLocalDc(nodes);
208217
}
209218

219+
@NonNull
220+
protected Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes) {
221+
return new OptionalLocalRackHelper(profile, logPrefix).discoverLocalRack(nodes);
222+
}
223+
210224
/**
211225
* Creates a new node distance evaluator to use with this policy.
212226
*
@@ -356,6 +370,11 @@ protected void shuffleHead(Object[] currentNodes, int headLength) {
356370
ArrayUtils.shuffleHead(currentNodes, headLength);
357371
}
358372

373+
/** Exposed as a protected method so that it can be accessed by tests */
374+
protected void shuffleInRange(Object[] currentNodes, int startIndex, int endIndex) {
375+
ArrayUtils.shuffleInRange(currentNodes, startIndex, endIndex);
376+
}
377+
359378
@Override
360379
public void onAdd(@NonNull Node node) {
361380
NodeDistance distance = computeNodeDistance(node);

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,7 @@
3333
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
3434
import edu.umd.cs.findbugs.annotations.NonNull;
3535
import edu.umd.cs.findbugs.annotations.Nullable;
36-
import java.util.BitSet;
37-
import java.util.Map;
38-
import java.util.Optional;
39-
import java.util.Queue;
40-
import java.util.Set;
41-
import java.util.UUID;
36+
import java.util.*;
4237
import java.util.concurrent.ConcurrentHashMap;
4338
import java.util.concurrent.ThreadLocalRandom;
4439
import java.util.concurrent.atomic.AtomicLongArray;
@@ -132,21 +127,34 @@ public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session ses
132127

133128
Set<Node> allReplicas = getReplicas(request, session);
134129
int replicaCount = 0; // in currentNodes
130+
int localRackReplicaCount = 0; // in currentNodes
131+
String localRack = getLocalRack();
135132

136133
if (!allReplicas.isEmpty()) {
137134

138135
// Move replicas to the beginning of the plan
136+
// Replicas in local rack should precede other replicas
139137
for (int i = 0; i < currentNodes.length; i++) {
140138
Node node = (Node) currentNodes[i];
141139
if (allReplicas.contains(node)) {
142-
ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
140+
if (Objects.equals(node.getRack(), localRack)
141+
&& Objects.equals(node.getDatacenter(), getLocalDatacenter())) {
142+
ArrayUtils.bubbleUp(currentNodes, i, localRackReplicaCount);
143+
localRackReplicaCount++;
144+
} else {
145+
ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
146+
}
143147
replicaCount++;
144148
}
145149
}
146150

147151
if (replicaCount > 1) {
148-
149-
shuffleHead(currentNodes, replicaCount);
152+
// Shuffles only replicas that are not in local rack
153+
if (localRack != null && localRackReplicaCount > 0) {
154+
shuffleInRange(currentNodes, localRackReplicaCount, replicaCount - 1);
155+
} else {
156+
shuffleHead(currentNodes, replicaCount);
157+
}
150158

151159
if (replicaCount > 2) {
152160

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.datastax.oss.driver.internal.core.loadbalancing.helper;
2+
3+
import com.datastax.oss.driver.api.core.metadata.Node;
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import java.util.Map;
6+
import java.util.Optional;
7+
import java.util.UUID;
8+
9+
public interface LocalRackHelper {
10+
@NonNull
11+
Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes);
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.datastax.oss.driver.internal.core.loadbalancing.helper;
2+
3+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
4+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
5+
import com.datastax.oss.driver.api.core.metadata.Node;
6+
import edu.umd.cs.findbugs.annotations.NonNull;
7+
import java.util.Map;
8+
import java.util.Optional;
9+
import java.util.UUID;
10+
import net.jcip.annotations.ThreadSafe;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
/**
15+
* An implementation of {@link LocalRackHelper} that fetches the local rack from the driver
16+
* configuration. If no user-supplied rack can be retrieved, it returns {@link Optional#empty
17+
* empty}.
18+
*/
19+
@ThreadSafe
20+
public class OptionalLocalRackHelper implements LocalRackHelper {
21+
private static final Logger LOG = LoggerFactory.getLogger(OptionalLocalRackHelper.class);
22+
23+
@NonNull protected final DriverExecutionProfile profile;
24+
@NonNull protected final String logPrefix;
25+
26+
public OptionalLocalRackHelper(
27+
@NonNull DriverExecutionProfile profile, @NonNull String logPrefix) {
28+
this.profile = profile;
29+
this.logPrefix = logPrefix;
30+
}
31+
32+
@NonNull
33+
@Override
34+
public Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes) {
35+
if (profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)) {
36+
String localRack = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK);
37+
LOG.debug("[{}] Local rack set from configuration: {}", logPrefix, localRack);
38+
return Optional.of(localRack);
39+
} else {
40+
LOG.debug("[{}] Local rack not set, rack awareness will be disabled", logPrefix);
41+
}
42+
43+
return Optional.empty();
44+
}
45+
}

core/src/main/java/com/datastax/oss/driver/internal/core/util/ArrayUtils.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,34 @@ public static <ElementT> void shuffleHead(
8989
}
9090
}
9191

92+
/**
93+
* Shuffles elements of the array in a range.
94+
*
95+
* @param elements the array to shuffle.
96+
* @param startIndex the start index of the range; must be {@code >= 0}.
97+
* @param endIndex the end index of the range; must be {@code <= elements.length}.
98+
* @see <a
99+
* href="https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_modern_algorithm">Modern
100+
* Fisher-Yates shuffle</a>
101+
*/
102+
public static <ElementT> void shuffleInRange(
103+
@NonNull ElementT[] elements, int startIndex, int endIndex) {
104+
assert startIndex >= 0 && startIndex < elements.length;
105+
assert endIndex >= 0 && endIndex < elements.length;
106+
107+
if (startIndex > endIndex) {
108+
throw new IllegalArgumentException(
109+
String.format(
110+
"Start index %d should be less than or equal to endIndex %d", startIndex, endIndex));
111+
}
112+
113+
final ThreadLocalRandom random = ThreadLocalRandom.current();
114+
for (int i = endIndex; i > startIndex; i--) {
115+
int j = random.nextInt(startIndex, i + 1);
116+
swap(elements, i, j);
117+
}
118+
}
119+
92120
/** Rotates the elements in the specified range by the specified amount (round-robin). */
93121
public static <ElementT> void rotate(
94122
@NonNull ElementT[] elements, int startIndex, int length, int amount) {

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyInitTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,23 @@ public void should_use_local_dc_if_provided_via_config() {
5151
assertThat(policy.getLocalDatacenter()).isEqualTo("dc1");
5252
}
5353

54+
@Test
55+
public void should_use_local_rack_if_provided_via_config() {
56+
// Given
57+
when(defaultProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)).thenReturn(true);
58+
when(defaultProfile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK))
59+
.thenReturn("rack1");
60+
when(metadataManager.getContactPoints()).thenReturn(ImmutableSet.of(node1));
61+
62+
DefaultLoadBalancingPolicy policy = createPolicy();
63+
64+
// When
65+
policy.init(ImmutableMap.of(UUID.randomUUID(), node1), distanceReporter);
66+
67+
// Then
68+
assertThat(policy.getLocalRack()).isEqualTo("rack1");
69+
}
70+
5471
@Test
5572
public void should_use_local_dc_if_provided_via_context() {
5673
// Given

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyQueryPlanTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,30 @@ public void should_reorder_first_two_replicas_when_first_has_more_in_flight_than
331331
then(dsePolicy).should(never()).diceRoll1d4();
332332
}
333333

334+
@Test
335+
public void should_not_shuffle_local_rack_replica_with_all_replicas_healthy() {
336+
// Given
337+
given(request.getRoutingKeyspace()).willReturn(KEYSPACE);
338+
given(request.getRoutingKey()).willReturn(ROUTING_KEY);
339+
given(tokenMap.getReplicas(KEYSPACE, null, ROUTING_KEY))
340+
.willReturn(ImmutableSet.of(node1, node3, node5));
341+
String localRack = "rack1";
342+
given(dsePolicy.getLocalRack()).willReturn(localRack);
343+
given(node1.getRack()).willReturn(localRack);
344+
given(pool1.getInFlight()).willReturn(0);
345+
given(pool3.getInFlight()).willReturn(0);
346+
given(pool5.getInFlight()).willReturn(0);
347+
348+
// When
349+
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);
350+
Queue<Node> plan2 = dsePolicy.newQueryPlan(request, session);
351+
352+
// Then
353+
// nodes 1, 3 and 5 always first, round-robin on the rest
354+
assertThat(plan1).containsExactly(node1, node3, node5, node2, node4);
355+
assertThat(plan2).containsExactly(node1, node3, node5, node4, node2);
356+
}
357+
334358
@Override
335359
protected DefaultLoadBalancingPolicy createAndInitPolicy() {
336360
DefaultLoadBalancingPolicy policy =
@@ -339,6 +363,9 @@ protected DefaultLoadBalancingPolicy createAndInitPolicy() {
339363
@Override
340364
protected void shuffleHead(Object[] array, int n) {}
341365

366+
@Override
367+
protected void shuffleInRange(Object[] array, int startIndex, int endIndex) {}
368+
342369
@Override
343370
protected long nanoTime() {
344371
return nanoTime;

0 commit comments

Comments
 (0)