Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ public enum DefaultDriverOption implements DriverOption {
* <p>Value-type: {@link String}
*/
LOAD_BALANCING_LOCAL_DATACENTER("basic.load-balancing-policy.local-datacenter"),
/**
* The rack that is considered "local".
*
* <p>Value-type: {@link String}
*/
LOAD_BALANCING_LOCAL_RACK("basic.load-balancing-policy.local-rack"),
/**
* A custom filter to include/exclude nodes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public String toString() {
public static final TypedDriverOption<String> LOAD_BALANCING_LOCAL_DATACENTER =
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, GenericType.STRING);

public static final TypedDriverOption<String> LOAD_BALANCING_LOCAL_RACK =
new TypedDriverOption<>(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK, GenericType.STRING);
/**
* A custom filter to include/exclude nodes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.loadbalancing.helper.DefaultNodeDistanceEvaluatorHelper;
import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalDcHelper;
import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalRackHelper;
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.DcAgnosticNodeSet;
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.MultiDcNodeSet;
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
private volatile DistanceReporter distanceReporter;
private volatile NodeDistanceEvaluator nodeDistanceEvaluator;
private volatile String localDc;
private volatile String localRack;
private volatile NodeSet liveNodes;

public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
Expand Down Expand Up @@ -154,6 +156,11 @@ protected String getLocalDatacenter() {
return localDc;
}

@Nullable
protected String getLocalRack() {
return localRack;
}

/** @return The nodes currently considered as live. */
protected NodeSet getLiveNodes() {
return liveNodes;
Expand All @@ -163,6 +170,8 @@ protected NodeSet getLiveNodes() {
public void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter) {
this.distanceReporter = distanceReporter;
localDc = discoverLocalDc(nodes).orElse(null);
// If local datacenter is not provided then the rack awareness is disabled
localRack = localDc != null ? discoverLocalRack(nodes).orElse(null) : null;
nodeDistanceEvaluator = createNodeDistanceEvaluator(localDc, nodes);
liveNodes =
localDc == null
Expand Down Expand Up @@ -207,6 +216,11 @@ protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
return new OptionalLocalDcHelper(context, profile, logPrefix).discoverLocalDc(nodes);
}

@NonNull
protected Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes) {
return new OptionalLocalRackHelper(profile, logPrefix).discoverLocalRack(nodes);
}

/**
* Creates a new node distance evaluator to use with this policy.
*
Expand Down Expand Up @@ -356,6 +370,11 @@ protected void shuffleHead(Object[] currentNodes, int headLength) {
ArrayUtils.shuffleHead(currentNodes, headLength);
}

/** Exposed as a protected method so that it can be accessed by tests */
protected void shuffleInRange(Object[] currentNodes, int startIndex, int endIndex) {
ArrayUtils.shuffleInRange(currentNodes, startIndex, endIndex);
}

@Override
public void onAdd(@NonNull Node node) {
NodeDistance distance = computeNodeDistance(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.BitSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -132,21 +133,36 @@ public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session ses

Set<Node> allReplicas = getReplicas(request, session);
int replicaCount = 0; // in currentNodes
int localRackReplicaCount = 0; // in currentNodes
String localRack = getLocalRack();

if (!allReplicas.isEmpty()) {

// Move replicas to the beginning of the plan
// Replicas in local rack should precede other replicas
for (int i = 0; i < currentNodes.length; i++) {
Node node = (Node) currentNodes[i];
if (allReplicas.contains(node)) {
ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
if (Objects.equals(node.getRack(), localRack)
&& Objects.equals(node.getDatacenter(), getLocalDatacenter())) {
ArrayUtils.bubbleUp(currentNodes, i, localRackReplicaCount);
localRackReplicaCount++;
} else {
ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
}
replicaCount++;
}
}

if (replicaCount > 1) {

shuffleHead(currentNodes, replicaCount);
if (localRack != null && localRackReplicaCount > 0) {
// Shuffle only replicas that are in the local rack
shuffleHead(currentNodes, localRackReplicaCount);
// Shuffles only replicas that are not in local rack
shuffleInRange(currentNodes, localRackReplicaCount, replicaCount - 1);
} else {
shuffleHead(currentNodes, replicaCount);
}

if (replicaCount > 2) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.datastax.oss.driver.internal.core.loadbalancing.helper;

import com.datastax.oss.driver.api.core.metadata.Node;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

public interface LocalRackHelper {
@NonNull
Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.datastax.oss.driver.internal.core.loadbalancing.helper;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of {@link LocalRackHelper} that fetches the local rack from the driver
* configuration. If no user-supplied rack can be retrieved, it returns {@link Optional#empty
* empty}.
*/
@ThreadSafe
public class OptionalLocalRackHelper implements LocalRackHelper {
private static final Logger LOG = LoggerFactory.getLogger(OptionalLocalRackHelper.class);

@NonNull protected final DriverExecutionProfile profile;
@NonNull protected final String logPrefix;

public OptionalLocalRackHelper(
@NonNull DriverExecutionProfile profile, @NonNull String logPrefix) {
this.profile = profile;
this.logPrefix = logPrefix;
}

@NonNull
@Override
public Optional<String> discoverLocalRack(@NonNull Map<UUID, Node> nodes) {
if (profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)) {
String localRack = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK);
LOG.debug("[{}] Local rack set from configuration: {}", logPrefix, localRack);
return Optional.of(localRack);
} else {
LOG.debug("[{}] Local rack not set, rack awareness will be disabled", logPrefix);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ public static <ElementT> void shuffleHead(
}
}

/**
* Shuffles elements of the array in a range.
*
* @param elements the array to shuffle.
* @param startIndex the start index of the range; must be {@code >= 0}.
* @param endIndex the end index of the range; must be {@code <= elements.length}.
* @see <a
* href="https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_modern_algorithm">Modern
* Fisher-Yates shuffle</a>
*/
public static <ElementT> void shuffleInRange(
@NonNull ElementT[] elements, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < elements.length;
assert endIndex >= 0 && endIndex < elements.length;

if (startIndex > endIndex) {
throw new IllegalArgumentException(
String.format(
"Start index %d should be less than or equal to endIndex %d", startIndex, endIndex));
}

final ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = endIndex; i > startIndex; i--) {
int j = random.nextInt(startIndex, i + 1);
swap(elements, i, j);
}
}

/** Rotates the elements in the specified range by the specified amount (round-robin). */
public static <ElementT> void rotate(
@NonNull ElementT[] elements, int startIndex, int length, int amount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ protected DcInferringLoadBalancingPolicy createAndInitPolicy() {
@Override
protected void shuffleHead(Object[] array, int n) {}

@Override
protected void shuffleInRange(Object[] currentNodes, int startIndex, int endIndex) {}

@Override
protected long nanoTime() {
return nanoTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ public void should_use_local_dc_if_provided_via_config() {
assertThat(policy.getLocalDatacenter()).isEqualTo("dc1");
}

@Test
public void should_use_local_rack_if_provided_via_config() {
// Given
when(defaultProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)).thenReturn(true);
when(defaultProfile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK))
.thenReturn("rack1");
when(metadataManager.getContactPoints()).thenReturn(ImmutableSet.of(node1));

DefaultLoadBalancingPolicy policy = createPolicy();

// When
policy.init(ImmutableMap.of(UUID.randomUUID(), node1), distanceReporter);

// Then
assertThat(policy.getLocalRack()).isEqualTo("rack1");
}

@Test
public void should_use_local_dc_if_provided_via_context() {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,54 @@ public void should_reorder_first_two_replicas_when_first_has_more_in_flight_than
then(dsePolicy).should(never()).diceRoll1d4();
}

@Test
public void should_not_shuffle_local_rack_replica_with_all_replicas_healthy() {
// Given
given(request.getRoutingKeyspace()).willReturn(KEYSPACE);
given(request.getRoutingKey()).willReturn(ROUTING_KEY);
given(tokenMap.getReplicas(KEYSPACE, null, ROUTING_KEY))
.willReturn(ImmutableSet.of(node1, node3, node5));
String localRack = "rack1";
given(dsePolicy.getLocalRack()).willReturn(localRack);
given(node1.getRack()).willReturn(localRack);
given(pool1.getInFlight()).willReturn(0);
given(pool3.getInFlight()).willReturn(0);
given(pool5.getInFlight()).willReturn(0);

// When
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);
Queue<Node> plan2 = dsePolicy.newQueryPlan(request, session);

// Then
// nodes 1, 3 and 5 always first, round-robin on the rest
assertThat(plan1).containsExactly(node1, node3, node5, node2, node4);
assertThat(plan2).containsExactly(node1, node3, node5, node4, node2);
}

@Test
public void should_prefer_local_rack_replica_with_less_inflight_requests() {
// Given
given(request.getRoutingKeyspace()).willReturn(KEYSPACE);
given(request.getRoutingKey()).willReturn(ROUTING_KEY);
given(tokenMap.getReplicas(KEYSPACE, null, ROUTING_KEY))
.willReturn(ImmutableSet.of(node1, node3, node5));
String localRack = "rack1";
given(dsePolicy.getLocalRack()).willReturn(localRack);
given(node3.getRack()).willReturn(localRack);
given(node5.getRack()).willReturn(localRack);
given(pool1.getInFlight()).willReturn(0);
given(pool3.getInFlight()).willReturn(20);
given(pool5.getInFlight()).willReturn(10);

// When
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);
Queue<Node> plan2 = dsePolicy.newQueryPlan(request, session);

// Then
assertThat(plan1).containsExactly(node5, node3, node1, node2, node4);
assertThat(plan2).containsExactly(node5, node3, node1, node4, node2);
}

@Override
protected DefaultLoadBalancingPolicy createAndInitPolicy() {
DefaultLoadBalancingPolicy policy =
Expand All @@ -339,6 +387,9 @@ protected DefaultLoadBalancingPolicy createAndInitPolicy() {
@Override
protected void shuffleHead(Object[] array, int n) {}

@Override
protected void shuffleInRange(Object[] array, int startIndex, int endIndex) {}

@Override
protected long nanoTime() {
return nanoTime;
Expand Down
31 changes: 24 additions & 7 deletions manual/core/load_balancing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,24 @@ that case, the driver will connect to 127.0.0.1:9042, and use that node's datace
for a better out-of-the-box experience for users who have just downloaded the driver; beyond that
initial development phase, you should provide explicit contact points and a local datacenter.

##### Finding the local datacenter
##### Rack awareness

The `DefaultLoadBalancingPolicy` and implicitly the `DcInferringLoadBalancingPolicy` prioritize replicas that are in the
local datacenter, however, sometimes there is a need to prioritize replicas that are in the local rack and to not send
queries to other replicas in the local datacenter. This will allow to avoid high network traffic between racks/availability zones
and thus will reduce data transfer costs. The rack-awareness feature is optional and to enable it the local rack should
be supplied through the configuration:

```
datastax-java-driver.basic.load-balancing-policy {
local-rack = rack1
}
```

The feature is disabled by default and unlike the local datacenter it will not be implicitly fetched from the provided
contact points.

##### Finding the local datacenter & local rack

To check which datacenters are defined in a given cluster, you can run [`nodetool status`]. It will
print information about each node in the cluster, grouped by datacenters. Here is an example:
Expand All @@ -165,17 +182,17 @@ UN <IP5> 1.5 TB 256 ? <ID5> rack2
UN <IP6> 1.5 TB 256 ? <ID6> rack3
```

To find out which datacenter should be considered local, you need to first determine which nodes the
driver is going to be co-located with, then choose their datacenter as local. In case of doubt, you
To find out which datacenter and rack(availability zone) should be considered local, you need to first determine which nodes the
driver is going to be co-located with, then choose their datacenter and rack as local. In case of doubt, you
can also use [cqlsh]; if cqlsh is co-located too in the same datacenter, simply run the command
below:

```
cqlsh> select data_center from system.local;
cqlsh> select data_center,rack from system.local;
data_center
-------------
DC1
data_center | rack
-------------+-------
datacenter1 | rack1
```

#### Cross-datacenter failover
Expand Down