Skip to content

Commit e95fbbb

Browse files
committed
Load Balancer
Load Balancer for Amazon Keyspaces
1 parent 0a5ac3e commit e95fbbb

File tree

5 files changed

+299
-3
lines changed

5 files changed

+299
-3
lines changed

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,23 @@ To activate this throttler, modify the {@code advanced.throttler} section in the
5252
* `number-of-hosts` : The number of hosts in the system.peers table. Depending on the endpoint type and region the number of hosts in the system.peers table may be different. This number is Used to validate throughput based on the number of connections specified in:`advanced.connection.pool.local.size`
5353
* `register-timeout` timeout waiting for permits. Should be less than or equal to `basic.request.timeout'
5454

55+
## Load balancing policies
56+
57+
Load balancing policies for the Cassandra driver have two main functions. First is to help distribute load across all nodes in a cluster, and the second is to route request to nodes for optimized access. The policy does not have visibility across all client sessions, which typically are instantiated one session per jvm. For each request, the load balancer policy constructs a new "query plan" . A query plan decides which node to send a cql request. Additionally, if retries are needed, the query plan will decide the order of nodes to be attempted. Most cassandra driver load balancing policies are designed to randomize the request in a "round-robin" algorithm, but weighted by replica set, latency, least-busy connection, and node uptime. The weights are designed for routing, but sometimes the weights can result in more transactions headed to a fewer number of hosts.
58+
59+
With Amazon Keyspaces its important for the driver to load balance traffic across connections, but routing is a responsibility owned by the service. With this improvement, the driver can used a simplified load balancing policy. Thus, the most efficient load balancing policy is often one which evenly distributes request across available host and connections without considering additional weights.
60+
61+
62+
### AmazonKeyspacesLoadBalancingPolicy
63+
Is a roundrobin policy, for each request a random order of nodes is designated as the query plan. The order is created randomly without weights such as latency and token awareness. Customers scale throughput by creating more connections.
64+
65+
The AmazonKeyspacesLoadBalancingPolicy load balancing policy is configured in the following way. The ```local-datacenter``` should be the Amazon Keyspaces region name.
66+
```
67+
basic.load-balancing-policy {
68+
class = com.aws.ssa.keyspaces.loadbalancing.AmazonKeyspacesRoundRobinLoadBalancingPolicy
69+
local-datacenter = "us-east-1"
70+
}
71+
```
5572

5673
# Build this project
5774
To build and use this library execute the following mvn command and place on the classpath of your application.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.aws.ssa.keyspaces.loadbalancing;
2+
3+
import com.datastax.oss.driver.api.core.context.DriverContext;
4+
import com.datastax.oss.driver.api.core.metadata.Node;
5+
import com.datastax.oss.driver.api.core.session.Request;
6+
import com.datastax.oss.driver.api.core.session.Session;
7+
import com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy;
8+
import com.datastax.oss.driver.internal.core.loadbalancing.helper.MandatoryLocalDcHelper;
9+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
10+
import com.datastax.oss.driver.internal.core.session.DefaultSession;
11+
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
12+
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
13+
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
14+
import edu.umd.cs.findbugs.annotations.NonNull;
15+
import edu.umd.cs.findbugs.annotations.Nullable;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import java.util.*;
20+
import java.util.concurrent.ThreadLocalRandom;
21+
22+
/***
23+
* AmazonKeyspacesLoadBalancingPolicy is a round robin policy that randomizes host order. While you may see a three to nine
24+
* node cluster when connecting to Amazon Keyspaces, connections are loadbalanced service side to multiple request handlers. This
25+
* policy provides even distribution across the driver connection pool. Traditional token-aware policies and latency aware policies
26+
* are not necessary for good performance in Amazon Keyspaces.
27+
*/
28+
public class AmazonKeyspacesRoundRobinLoadBalancingPolicy extends BasicLoadBalancingPolicy {
29+
private static final Logger LOG = LoggerFactory.getLogger(AmazonKeyspacesRoundRobinLoadBalancingPolicy.class);
30+
31+
public AmazonKeyspacesRoundRobinLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
32+
super(context, profileName);
33+
}
34+
35+
public void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter) {
36+
super.init(nodes, distanceReporter);
37+
LOG.info("Total number of nodes visible to driver: " + ((nodes == null)?0:nodes.size()));
38+
}
39+
40+
@NonNull
41+
protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
42+
return (new MandatoryLocalDcHelper(this.context, this.profile, this.logPrefix)).discoverLocalDc(nodes);
43+
}
44+
45+
protected int getInFlight(@NonNull Node node, @NonNull Session session) {
46+
ChannelPool pool = (ChannelPool)((DefaultSession)session).getPools().get(node);
47+
return pool == null ? 0 : pool.getInFlight();
48+
}
49+
protected int getSize(@NonNull Node node, @NonNull Session session) {
50+
ChannelPool pool = (ChannelPool)((DefaultSession)session).getPools().get(node);
51+
return pool == null ? 0 : pool.size();
52+
}
53+
/***
54+
* Fisher–Yates or Richard Durstenfeld shuffle implemented from lowest index to highest
55+
* https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_modern_algorithm
56+
* @param currentNodes
57+
* @return shuffledNodes
58+
*/
59+
public static void reverseDurstenfeldShuffle(Object[] currentNodes, ThreadLocalRandom random){
60+
int totalNodes = currentNodes.length;
61+
62+
for(int currentNodeIndex = 0; currentNodeIndex < totalNodes-1; currentNodeIndex++) {
63+
64+
int swapNodeIndex = random.nextInt(currentNodeIndex, totalNodes);
65+
66+
ArrayUtils.swap(currentNodes, currentNodeIndex, swapNodeIndex);
67+
}
68+
}
69+
@NonNull
70+
public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
71+
72+
Object[] currentNodes = this.getLiveNodes().dc(this.getLocalDatacenter()).toArray();
73+
74+
Queue<Node> queryPlan = newQueryPlan(request, session, currentNodes);
75+
76+
int totalNodes = currentNodes.length;
77+
78+
if (LOG.isTraceEnabled()) {
79+
if (totalNodes > 0) {
80+
//int currentSize = getSize((Node) currentNodes[0], session);
81+
82+
int inflight = getInFlight((Node) currentNodes[0], session);
83+
84+
String firstNode = ((Node) currentNodes[0]).getEndPoint().toString();
85+
86+
int openConnections = ((Node) currentNodes[0]).getOpenConnections();
87+
88+
int requestPerMostUsedConnection = (openConnections > 0) ? (inflight / openConnections) : 0;
89+
90+
LOG.trace(" Total local nodes: [{}], First Node [{}], Number of Connections: [{}], Total inflight:[{}], Number of Request per connection: [{}]", totalNodes, firstNode, openConnections, inflight, requestPerMostUsedConnection);
91+
}
92+
}
93+
return queryPlan;
94+
}
95+
96+
@NonNull
97+
public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session, Object[] currentNodes ) {
98+
int totalNodes = currentNodes.length;
99+
100+
if(totalNodes == 0) {
101+
102+
LOG.trace(" Total local nodes is 0, returning empty query plan");
103+
104+
return this.maybeAddDcFailover(request, QueryPlan.EMPTY);
105+
}
106+
if(totalNodes > 1){
107+
108+
ThreadLocalRandom random = ThreadLocalRandom.current();
109+
110+
reverseDurstenfeldShuffle(currentNodes, random);
111+
}
112+
113+
QueryPlan plan = new SimpleQueryPlan(currentNodes);
114+
115+
return this.maybeAddDcFailover(request, (Queue) plan);
116+
}
117+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package com.aws.ssa.keyspaces.loadbalancing;
2+
3+
4+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
5+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
6+
import com.datastax.oss.driver.api.core.context.DriverContext;
7+
import com.datastax.oss.driver.api.core.metadata.Node;
8+
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
9+
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
10+
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
11+
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
12+
import org.junit.Assert;
13+
import org.junit.Test;
14+
15+
import java.util.Arrays;
16+
import java.util.Queue;
17+
import java.util.concurrent.ThreadLocalRandom;
18+
19+
public class AmazonKeyspacesLoadbalancingPolicyTest {
20+
21+
@Test
22+
public void emptyQueryPlan() {
23+
24+
DriverContext context = new DefaultDriverContext(new DefaultProgrammaticDriverConfigLoaderBuilder().build(), ProgrammaticArguments.builder().build());
25+
AmazonKeyspacesRoundRobinLoadBalancingPolicy st = new AmazonKeyspacesRoundRobinLoadBalancingPolicy(context, "default");
26+
27+
Assert.assertEquals(QueryPlan.EMPTY, st.newQueryPlan(null, null, new Node[0]));
28+
}
29+
@Test
30+
public void notEmptyQueryPlanExceed() {
31+
32+
DriverContext context = new DefaultDriverContext(new DefaultProgrammaticDriverConfigLoaderBuilder().build(), ProgrammaticArguments.builder().build());
33+
AmazonKeyspacesRoundRobinLoadBalancingPolicy st = new AmazonKeyspacesRoundRobinLoadBalancingPolicy(context, "default");
34+
35+
Assert.assertNotEquals(QueryPlan.EMPTY, st.newQueryPlan(null, null, new Node[1]));
36+
37+
}
38+
@Test
39+
public void twoNodeShuffle() {
40+
41+
DriverContext context = new DefaultDriverContext(new DefaultProgrammaticDriverConfigLoaderBuilder().build(), ProgrammaticArguments.builder().build());
42+
AmazonKeyspacesRoundRobinLoadBalancingPolicy st = new AmazonKeyspacesRoundRobinLoadBalancingPolicy(context, "default");
43+
44+
Integer[] original = new Integer[2];
45+
Integer[] clone = new Integer[2];
46+
47+
for(int i=0; i<original.length;i++){
48+
original[i] = i;
49+
clone[i] = i;
50+
}
51+
52+
Queue<Node> queryPlan = st.newQueryPlan(null, null, original);
53+
54+
Assert.assertTrue(original.length == clone.length);
55+
Assert.assertNotEquals(QueryPlan.EMPTY, queryPlan);
56+
57+
}
58+
@Test
59+
public void largeShuffle() {
60+
61+
DriverContext context = new DefaultDriverContext(new DefaultProgrammaticDriverConfigLoaderBuilder().build(), ProgrammaticArguments.builder().build());
62+
AmazonKeyspacesRoundRobinLoadBalancingPolicy st = new AmazonKeyspacesRoundRobinLoadBalancingPolicy(context, "default");
63+
64+
Integer[] original = new Integer[100];
65+
Integer[] clone = new Integer[100];
66+
67+
for(int i=0; i<original.length;i++){
68+
original[i] = i;
69+
clone[i] = i;
70+
}
71+
72+
Queue<Node> queryPlan = st.newQueryPlan(null, null, original);
73+
74+
Assert.assertFalse(Arrays.deepEquals(original,clone));
75+
Assert.assertNotEquals(QueryPlan.EMPTY, queryPlan);
76+
}
77+
@Test
78+
public void testShuffleAlgorithm() {
79+
80+
DriverContext context = new DefaultDriverContext(new DefaultProgrammaticDriverConfigLoaderBuilder().build(), ProgrammaticArguments.builder().build());
81+
AmazonKeyspacesRoundRobinLoadBalancingPolicy st = new AmazonKeyspacesRoundRobinLoadBalancingPolicy(context, "default");
82+
83+
Integer[] original = new Integer[100];
84+
Integer[] clone = new Integer[100];
85+
86+
for(int i=0; i<original.length;i++){
87+
original[i] = i;
88+
clone[i] = i;
89+
}
90+
91+
AmazonKeyspacesRoundRobinLoadBalancingPolicy.reverseDurstenfeldShuffle(original, ThreadLocalRandom.current());
92+
93+
Assert.assertFalse(Arrays.deepEquals(original,clone));
94+
}
95+
96+
97+
@Test
98+
public void testConfig() {
99+
Assert.assertEquals("us-east-1", DriverConfigLoader.fromClasspath("loadbalancer-example").getInitialConfig().getDefaultProfile().getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER));
100+
}
101+
102+
}
103+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
2+
datastax-java-driver {
3+
basic.request.consistency = "LOCAL_QUORUM"
4+
basic.contact-points = [ "cassandra.us-east-1.amazonaws.com:9142"]
5+
6+
advanced.reconnect-on-init = true
7+
8+
basic.load-balancing-policy {
9+
class = com.aws.ssa.keyspaces.loadbalancing.AmazonKeyspacesRoundRobinLoadBalancingPolicy
10+
local-datacenter = "us-east-1"
11+
}
12+
basic.request.timeout = 4000 milliseconds
13+
14+
basic.request.default-idempotence = false
15+
16+
advanced.retry-policy {
17+
class = com.aws.ssa.keyspaces.retry.AmazonKeyspacesRetryPolicy
18+
max-attempts = 3
19+
}
20+
21+
#advanced.auth-provider{
22+
# class = PlainTextAuthProvider
23+
# username = "alice-at-963740746376"
24+
# password = "XHi3boSMQMLpUd3NvmV6c5uJ1CT81ApS0QDMgGX"
25+
#}
26+
advanced.auth-provider = {
27+
class = software.aws.mcs.auth.SigV4AuthProvider
28+
aws-region = us-east-1
29+
}
30+
31+
#advanced.netty
32+
33+
advanced.ssl-engine-factory {
34+
class = DefaultSslEngineFactory
35+
#truststore-path = "/Users/mjraney/.cassandra/cassandra_truststore.jks"
36+
#truststore-password = "amazon"
37+
hostname-validation = false
38+
}
39+
40+
advanced.connection.max-requests-per-connection = 1024
41+
advanced.connection.pool.local.size = 3
42+
43+
}

src/test/resources/logback-test.xml

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,30 @@
1313
<appender-ref ref="STDOUT" />
1414
</root>
1515

16-
<logger name="com.aws.ssa.keyspaces.retry.AmazonKeyspacesRetryPolicy" level="TRACE" additivity="false">
16+
<logger name="com.aws.ssa.keyspaces.retry.AmazonKeyspacesRetryPolicy" level="INFO" additivity="false">
1717
<appender-ref ref="STDOUT" />
1818
</logger>
19-
<logger name="com.aws.ssa.keyspaces.throttler.AmazonKeyspacesFixedRateThrottler" level="DEBUG" additivity="false">
19+
<logger name="com.aws.ssa.keyspaces.throttler.AmazonKeyspacesFixedRateThrottler" level="INFO" additivity="false">
2020
<appender-ref ref="STDOUT" />
2121
</logger>
22-
<logger name="com.aws.ssa.keyspaces.throttler.AmazonKeyspacesBlockingFIxedRateThrottler" level="DEBUG" additivity="false">
22+
<logger name="com.aws.ssa.keyspaces.throttler.AmazonKeyspacesBlockingFixedRateThrottler" level="INFO" additivity="false">
2323
<appender-ref ref="STDOUT" />
2424
</logger>
25+
<logger name="com.datastax.driver.core.Connection" level="INFO" additivity="false">
26+
<appender-ref ref="STDOUT" />
27+
</logger>
28+
<logger name="com.datastax.driver.core.Cluster" level="INFO" additivity="false">
29+
<appender-ref ref="STDOUT" />
30+
</logger>
31+
<logger name="com.datastax.driver.core.Session" level="INFO" additivity="false">
32+
<appender-ref ref="STDOUT" />
33+
</logger>
34+
<logger name="com.aws.ssa.keyspaces.loadbalancing.AmazonKeyspacesRoundRobinLoadBalancingPolicy" level="TRACE" additivity="false">
35+
<appender-ref ref="STDOUT" />
36+
</logger>
37+
38+
39+
40+
2541

2642
</configuration>

0 commit comments

Comments
 (0)