diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index b2fba21d6a3..e870f7b6ea8 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -93,6 +93,12 @@ public enum DefaultDriverOption implements DriverOption { *

Value-type: {@link String} */ LOAD_BALANCING_LOCAL_DATACENTER("basic.load-balancing-policy.local-datacenter"), + /** + * The rack that is considered "local". + * + *

Value-type: {@link String} + */ + LOAD_BALANCING_LOCAL_RACK("basic.load-balancing-policy.local-rack"), /** * A custom filter to include/exclude nodes. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index bce8f923c77..4308e695665 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -137,6 +137,9 @@ public String toString() { public static final TypedDriverOption LOAD_BALANCING_LOCAL_DATACENTER = new TypedDriverOption<>( DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, GenericType.STRING); + + public static final TypedDriverOption LOAD_BALANCING_LOCAL_RACK = + new TypedDriverOption<>(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK, GenericType.STRING); /** * A custom filter to include/exclude nodes. * diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index fb813f8b58b..b3a71827edc 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -40,6 +40,7 @@ import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.loadbalancing.helper.DefaultNodeDistanceEvaluatorHelper; import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalDcHelper; +import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalRackHelper; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.DcAgnosticNodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.MultiDcNodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet; @@ -121,6 +122,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy { private volatile DistanceReporter distanceReporter; private volatile NodeDistanceEvaluator nodeDistanceEvaluator; private volatile String localDc; + private volatile String localRack; private volatile NodeSet liveNodes; public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) { @@ -154,6 +156,11 @@ protected String getLocalDatacenter() { return localDc; } + @Nullable + protected String getLocalRack() { + return localRack; + } + /** @return The nodes currently considered as live. */ protected NodeSet getLiveNodes() { return liveNodes; @@ -163,6 +170,8 @@ protected NodeSet getLiveNodes() { public void init(@NonNull Map nodes, @NonNull DistanceReporter distanceReporter) { this.distanceReporter = distanceReporter; localDc = discoverLocalDc(nodes).orElse(null); + // If local datacenter is not provided then the rack awareness is disabled + localRack = localDc != null ? discoverLocalRack(nodes).orElse(null) : null; nodeDistanceEvaluator = createNodeDistanceEvaluator(localDc, nodes); liveNodes = localDc == null @@ -207,6 +216,11 @@ protected Optional discoverLocalDc(@NonNull Map nodes) { return new OptionalLocalDcHelper(context, profile, logPrefix).discoverLocalDc(nodes); } + @NonNull + protected Optional discoverLocalRack(@NonNull Map nodes) { + return new OptionalLocalRackHelper(profile, logPrefix).discoverLocalRack(nodes); + } + /** * Creates a new node distance evaluator to use with this policy. * @@ -356,6 +370,11 @@ protected void shuffleHead(Object[] currentNodes, int headLength) { ArrayUtils.shuffleHead(currentNodes, headLength); } + /** Exposed as a protected method so that it can be accessed by tests */ + protected void shuffleInRange(Object[] currentNodes, int startIndex, int endIndex) { + ArrayUtils.shuffleInRange(currentNodes, startIndex, endIndex); + } + @Override public void onAdd(@NonNull Node node) { NodeDistance distance = computeNodeDistance(node); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index f79fa55b520..e818fdf9b82 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -35,6 +35,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; import java.util.BitSet; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.Set; @@ -132,21 +133,36 @@ public Queue newQueryPlan(@Nullable Request request, @Nullable Session ses Set allReplicas = getReplicas(request, session); int replicaCount = 0; // in currentNodes + int localRackReplicaCount = 0; // in currentNodes + String localRack = getLocalRack(); if (!allReplicas.isEmpty()) { // Move replicas to the beginning of the plan + // Replicas in local rack should precede other replicas for (int i = 0; i < currentNodes.length; i++) { Node node = (Node) currentNodes[i]; if (allReplicas.contains(node)) { - ArrayUtils.bubbleUp(currentNodes, i, replicaCount); + if (Objects.equals(node.getRack(), localRack) + && Objects.equals(node.getDatacenter(), getLocalDatacenter())) { + ArrayUtils.bubbleUp(currentNodes, i, localRackReplicaCount); + localRackReplicaCount++; + } else { + ArrayUtils.bubbleUp(currentNodes, i, replicaCount); + } replicaCount++; } } if (replicaCount > 1) { - - shuffleHead(currentNodes, replicaCount); + if (localRack != null && localRackReplicaCount > 0) { + // Shuffle only replicas that are in the local rack + shuffleHead(currentNodes, localRackReplicaCount); + // Shuffles only replicas that are not in local rack + shuffleInRange(currentNodes, localRackReplicaCount, replicaCount - 1); + } else { + shuffleHead(currentNodes, replicaCount); + } if (replicaCount > 2) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/helper/LocalRackHelper.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/helper/LocalRackHelper.java new file mode 100644 index 00000000000..0d3dc455589 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/helper/LocalRackHelper.java @@ -0,0 +1,12 @@ +package com.datastax.oss.driver.internal.core.loadbalancing.helper; + +import com.datastax.oss.driver.api.core.metadata.Node; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +public interface LocalRackHelper { + @NonNull + Optional discoverLocalRack(@NonNull Map nodes); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/helper/OptionalLocalRackHelper.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/helper/OptionalLocalRackHelper.java new file mode 100644 index 00000000000..7f4840a3461 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/helper/OptionalLocalRackHelper.java @@ -0,0 +1,44 @@ +package com.datastax.oss.driver.internal.core.loadbalancing.helper; + +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.metadata.Node; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of {@link LocalRackHelper} that fetches the local rack from the driver + * configuration. If no user-supplied rack can be retrieved, it returns {@link Optional#empty + * empty}. + */ +@ThreadSafe +public class OptionalLocalRackHelper implements LocalRackHelper { + private static final Logger LOG = LoggerFactory.getLogger(OptionalLocalRackHelper.class); + + @NonNull protected final DriverExecutionProfile profile; + @NonNull protected final String logPrefix; + + public OptionalLocalRackHelper( + @NonNull DriverExecutionProfile profile, @NonNull String logPrefix) { + this.profile = profile; + this.logPrefix = logPrefix; + } + + @NonNull + @Override + public Optional discoverLocalRack(@NonNull Map nodes) { + if (profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)) { + String localRack = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK); + LOG.debug("[{}] Local rack set from configuration: {}", logPrefix, localRack); + return Optional.of(localRack); + } else { + LOG.debug("[{}] Local rack not set, rack awareness will be disabled", logPrefix); + return Optional.empty(); + } + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/ArrayUtils.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/ArrayUtils.java index 25597e190c9..21bbd88713f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/util/ArrayUtils.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/ArrayUtils.java @@ -89,6 +89,34 @@ public static void shuffleHead( } } + /** + * Shuffles elements of the array in a range. + * + * @param elements the array to shuffle. + * @param startIndex the start index of the range; must be {@code >= 0}. + * @param endIndex the end index of the range; must be {@code <= elements.length}. + * @see Modern + * Fisher-Yates shuffle + */ + public static void shuffleInRange( + @NonNull ElementT[] elements, int startIndex, int endIndex) { + assert startIndex >= 0 && startIndex < elements.length; + assert endIndex >= 0 && endIndex < elements.length; + + if (startIndex > endIndex) { + throw new IllegalArgumentException( + String.format( + "Start index %d should be less than or equal to endIndex %d", startIndex, endIndex)); + } + + final ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = endIndex; i > startIndex; i--) { + int j = random.nextInt(startIndex, i + 1); + swap(elements, i, j); + } + } + /** Rotates the elements in the specified range by the specified amount (round-robin). */ public static void rotate( @NonNull ElementT[] elements, int startIndex, int length, int amount) { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DcInferringLoadBalancingPolicyQueryPlanTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DcInferringLoadBalancingPolicyQueryPlanTest.java index f60ed95697e..859d37af905 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DcInferringLoadBalancingPolicyQueryPlanTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DcInferringLoadBalancingPolicyQueryPlanTest.java @@ -32,6 +32,9 @@ protected DcInferringLoadBalancingPolicy createAndInitPolicy() { @Override protected void shuffleHead(Object[] array, int n) {} + @Override + protected void shuffleInRange(Object[] currentNodes, int startIndex, int endIndex) {} + @Override protected long nanoTime() { return nanoTime; diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyInitTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyInitTest.java index e9fd5c68944..07ee7ab5844 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyInitTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyInitTest.java @@ -51,6 +51,23 @@ public void should_use_local_dc_if_provided_via_config() { assertThat(policy.getLocalDatacenter()).isEqualTo("dc1"); } + @Test + public void should_use_local_rack_if_provided_via_config() { + // Given + when(defaultProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)).thenReturn(true); + when(defaultProfile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_RACK)) + .thenReturn("rack1"); + when(metadataManager.getContactPoints()).thenReturn(ImmutableSet.of(node1)); + + DefaultLoadBalancingPolicy policy = createPolicy(); + + // When + policy.init(ImmutableMap.of(UUID.randomUUID(), node1), distanceReporter); + + // Then + assertThat(policy.getLocalRack()).isEqualTo("rack1"); + } + @Test public void should_use_local_dc_if_provided_via_context() { // Given diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyQueryPlanTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyQueryPlanTest.java index 58912c163c3..6047f81cef6 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyQueryPlanTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyQueryPlanTest.java @@ -331,6 +331,54 @@ public void should_reorder_first_two_replicas_when_first_has_more_in_flight_than then(dsePolicy).should(never()).diceRoll1d4(); } + @Test + public void should_not_shuffle_local_rack_replica_with_all_replicas_healthy() { + // Given + given(request.getRoutingKeyspace()).willReturn(KEYSPACE); + given(request.getRoutingKey()).willReturn(ROUTING_KEY); + given(tokenMap.getReplicas(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableSet.of(node1, node3, node5)); + String localRack = "rack1"; + given(dsePolicy.getLocalRack()).willReturn(localRack); + given(node1.getRack()).willReturn(localRack); + given(pool1.getInFlight()).willReturn(0); + given(pool3.getInFlight()).willReturn(0); + given(pool5.getInFlight()).willReturn(0); + + // When + Queue plan1 = dsePolicy.newQueryPlan(request, session); + Queue plan2 = dsePolicy.newQueryPlan(request, session); + + // Then + // nodes 1, 3 and 5 always first, round-robin on the rest + assertThat(plan1).containsExactly(node1, node3, node5, node2, node4); + assertThat(plan2).containsExactly(node1, node3, node5, node4, node2); + } + + @Test + public void should_prefer_local_rack_replica_with_less_inflight_requests() { + // Given + given(request.getRoutingKeyspace()).willReturn(KEYSPACE); + given(request.getRoutingKey()).willReturn(ROUTING_KEY); + given(tokenMap.getReplicas(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableSet.of(node1, node3, node5)); + String localRack = "rack1"; + given(dsePolicy.getLocalRack()).willReturn(localRack); + given(node3.getRack()).willReturn(localRack); + given(node5.getRack()).willReturn(localRack); + given(pool1.getInFlight()).willReturn(0); + given(pool3.getInFlight()).willReturn(20); + given(pool5.getInFlight()).willReturn(10); + + // When + Queue plan1 = dsePolicy.newQueryPlan(request, session); + Queue plan2 = dsePolicy.newQueryPlan(request, session); + + // Then + assertThat(plan1).containsExactly(node5, node3, node1, node2, node4); + assertThat(plan2).containsExactly(node5, node3, node1, node4, node2); + } + @Override protected DefaultLoadBalancingPolicy createAndInitPolicy() { DefaultLoadBalancingPolicy policy = @@ -339,6 +387,9 @@ protected DefaultLoadBalancingPolicy createAndInitPolicy() { @Override protected void shuffleHead(Object[] array, int n) {} + @Override + protected void shuffleInRange(Object[] array, int startIndex, int endIndex) {} + @Override protected long nanoTime() { return nanoTime; diff --git a/manual/core/load_balancing/README.md b/manual/core/load_balancing/README.md index abc950fe378..0bd89f26049 100644 --- a/manual/core/load_balancing/README.md +++ b/manual/core/load_balancing/README.md @@ -139,7 +139,24 @@ that case, the driver will connect to 127.0.0.1:9042, and use that node's datace for a better out-of-the-box experience for users who have just downloaded the driver; beyond that initial development phase, you should provide explicit contact points and a local datacenter. -##### Finding the local datacenter +##### Rack awareness + +The `DefaultLoadBalancingPolicy` and implicitly the `DcInferringLoadBalancingPolicy` prioritize replicas that are in the +local datacenter, however, sometimes there is a need to prioritize replicas that are in the local rack and to not send +queries to other replicas in the local datacenter. This will allow to avoid high network traffic between racks/availability zones +and thus will reduce data transfer costs. The rack-awareness feature is optional and to enable it the local rack should +be supplied through the configuration: + +``` +datastax-java-driver.basic.load-balancing-policy { + local-rack = rack1 +} +``` + +The feature is disabled by default and unlike the local datacenter it will not be implicitly fetched from the provided +contact points. + +##### Finding the local datacenter & local rack To check which datacenters are defined in a given cluster, you can run [`nodetool status`]. It will print information about each node in the cluster, grouped by datacenters. Here is an example: @@ -165,17 +182,17 @@ UN 1.5 TB 256 ? rack2 UN 1.5 TB 256 ? rack3 ``` -To find out which datacenter should be considered local, you need to first determine which nodes the -driver is going to be co-located with, then choose their datacenter as local. In case of doubt, you +To find out which datacenter and rack(availability zone) should be considered local, you need to first determine which nodes the +driver is going to be co-located with, then choose their datacenter and rack as local. In case of doubt, you can also use [cqlsh]; if cqlsh is co-located too in the same datacenter, simply run the command below: ``` -cqlsh> select data_center from system.local; +cqlsh> select data_center,rack from system.local; -data_center -------------- -DC1 + data_center | rack +-------------+------- + datacenter1 | rack1 ``` #### Cross-datacenter failover