Skip to content

Commit ca48b77

Browse files
committed
adjusted endpoint type to number of host
VPCE have a number of endpoints depending on the number of availablity zones for a region.
1 parent f8c1406 commit ca48b77

File tree

12 files changed

+259
-105
lines changed

12 files changed

+259
-105
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ To activate this throttler, modify the {@code advanced.throttler} section in the
4040
advanced.throttler = {
4141
class = com.aws.ssa.keyspaces.throttler.AmazonKeyspacesFixedRateThrottler
4242
max-requests-per-second = 1000
43-
endpoint-type = VPC
43+
number-of-hosts = 3
4444
register-timeout = 1 seconds
4545
}
4646
}
@@ -49,7 +49,7 @@ To activate this throttler, modify the {@code advanced.throttler} section in the
4949

5050

5151
* `max-requests-per-second` : controls the request rate. Blocks until available permits or timeout it reached
52-
* `endpoint-type` : if you are connected to the VPC endpoint or the public endpoint. Used to determine throughput based on the number of connections specified in:`advanced.connection.pool.local.size`
52+
* `number-of-hosts` : The number of hosts in the system.peers table. Depending on the endpoint type and region the number of hosts in the system.peers table may be different. This number is Used to validate throughput based on the number of connections specified in:`advanced.connection.pool.local.size`
5353
* `register-timeout` timeout waiting for permits. Should be less than or equal to `basic.request.timeout'
5454

5555

src/main/java/com/aws/ssa/keyspaces/core/EndpointType.java

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/main/java/com/aws/ssa/keyspaces/retry/AmazonKeyspacesRetryPolicy.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ public RetryDecision onWriteTimeout(
134134

135135
LOG.trace(RETRYING_ON_WRITE_TIMEOUT, logPrefix, cl, blockFor, received, false, retryCount);
136136

137-
138137
return decision;
139138
}
140139

@@ -157,7 +156,6 @@ public RetryDecision onUnavailable(
157156

158157
LOG.trace(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount);
159158

160-
161159
return decision;
162160
}
163161

@@ -174,7 +172,6 @@ public RetryDecision onRequestAborted(
174172

175173
LOG.trace(RETRYING_ON_ABORTED, logPrefix, retryCount, error);
176174

177-
178175
return decision;
179176
}
180177

src/main/java/com/aws/ssa/keyspaces/throttler/AmazonKeyspacesFixedRateThrottler.java

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.aws.ssa.keyspaces.throttler;
22

3-
import com.aws.ssa.keyspaces.core.EndpointType;
43
import com.datastax.oss.driver.api.core.RequestThrottlingException;
54
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
65
import com.datastax.oss.driver.api.core.context.DriverContext;
@@ -69,10 +68,10 @@ public class AmazonKeyspacesFixedRateThrottler implements RequestThrottler {
6968
public static int PUBLIC_ENDPOINT_DEFAULT_HOST = 9;
7069

7170
/***
72-
* Amazon Keyspaces Virtual Private Cloud Endpoint (VPCE) exposes single host to drivers. The default behavior will establish one connection for every conection specified in
71+
* Amazon Keyspaces Virtual Private Cloud Endpoint (VPCE) exposes host per availability zone. The default behavior will establish one connection to each peer IP address.
7372
* the driver.
7473
*/
75-
public static int VPC_ENDPOINT_DEFAULT_HOST = 1;
74+
public static int VPC_ENDPOINT_DEFAULT_HOST = 2;
7675

7776
/***
7877
* Rate limiter used to meter the CQL Request Per Second up to maxRequestsPerSecond
@@ -85,9 +84,9 @@ public class AmazonKeyspacesFixedRateThrottler implements RequestThrottler {
8584
private final RateLimiter maxConnectionsLimiter;
8685

8786
/***
88-
* Type of endpoint used which dictate the number of connections generated when creating a new session
87+
* Number of hosts available when creating to a new session
8988
*/
90-
private final EndpointType endpointType;
89+
private final Integer numberOfHosts;
9190

9291
/***
9392
* Configured Rate of desired throughput
@@ -116,9 +115,9 @@ public AmazonKeyspacesFixedRateThrottler(DriverContext context) {
116115
.getDuration(KeyspacesThrottleOption.KEYSPACES_THROTTLE_TIMEOUT,context.getConfig()
117116
.getDefaultProfile()
118117
.getDuration(DefaultDriverOption.REQUEST_TIMEOUT)).toMillis(),
119-
EndpointType.valueOf(context.getConfig()
118+
context.getConfig()
120119
.getDefaultProfile()
121-
.getString(KeyspacesThrottleOption.KEYSPACES_THROTTLE_ENDPOINT_TYPE, KeyspacesThrottleOption.DEFAULT_ENDPOINT_TYPE.toString())),
120+
.getInt(KeyspacesThrottleOption.KEYSPACES_THROTTLE_NUMBER_OF_HOSTS, KeyspacesThrottleOption.DEFAULT_NUMBER_OF_HOSTS),
122121
context.getConfig()
123122
.getDefaultProfile()
124123
.getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE));
@@ -127,14 +126,14 @@ public AmazonKeyspacesFixedRateThrottler(DriverContext context) {
127126

128127
/*** Initialization of the Throttler ***/
129128
public AmazonKeyspacesFixedRateThrottler(DriverContext context, long maxRequestsPerSecond, long registerTimeoutInMs
130-
, EndpointType endpointType, int numberOfConnectionsPerHost) {
129+
, int numberOfHosts, int numberOfConnectionsPerHost) {
131130
this.logPrefix = context.getSessionName();
132131

133132
this.maxRequestsPerSecond = maxRequestsPerSecond;
134133

135134
this.registerTimeoutInMs = registerTimeoutInMs;
136135

137-
this.endpointType = endpointType;
136+
this.numberOfHosts = numberOfHosts;
138137

139138
this.numberOfConnectionsPerHost = numberOfConnectionsPerHost;
140139

@@ -179,29 +178,29 @@ public AmazonKeyspacesFixedRateThrottler(DriverContext context, long maxRequests
179178
registerTimeoutInMs);
180179

181180
LOG.info(
182-
"[{}] Based on Throttler max of {} request per second, the recommended number of connections for VPC Endpoint is: {}, currently {}",
181+
"[{}] Based on Throttler max of {} request per second, the recommended number of connections for number of hosts: {}, currently {}",
183182
logPrefix,
184183
maxRequestsPerSecond,
185-
simpleConnectionRecommendation(maxRequestsPerSecond, EndpointType.VPC),
184+
simpleConnectionRecommendation(maxRequestsPerSecond, numberOfHosts),
186185
numberOfConnectionsPerHost);
187186

188187
LOG.info(
189188
"[{}] Based on Throttler max of {} request per second, the recommended number of connections for the Public Endpoint is: {}, currently {}",
190189
logPrefix,
191190
maxRequestsPerSecond,
192-
simpleConnectionRecommendation(maxRequestsPerSecond, EndpointType.PUBLIC),
191+
simpleConnectionRecommendation(maxRequestsPerSecond, numberOfHosts),
193192
numberOfConnectionsPerHost);
194193

195194

196195
//The rate for the number of request per second based on the number of connections. Should be greater than maxRequestsPerSecond
197-
int maxRequestPerSecondByForConnections = calculateConnectionMaxRequestPerSecond(endpointType, numberOfConnectionsPerHost);
196+
int maxRequestPerSecondByForConnections = calculateConnectionMaxRequestPerSecond(numberOfHosts, numberOfConnectionsPerHost);
198197

199198
if(maxRequestPerSecondByForConnections < maxRequestsPerSecond){
200199
LOG.warn(
201-
"[{}] Cannot reach Max Request Per Second of {}. Specified Endpoint type of {}, and number of connections {} will provide at most {} request per second. Try increasing advanced.connection.pool.local.size ",
200+
"[{}] Cannot reach Max Request Per Second of {}. Specified number of hosts {}, and number of connections {} will provide at most {} request per second. Try increasing advanced.connection.pool.local.size or check system.peers table fo the number of hosts ",
202201
logPrefix,
203202
maxRequestsPerSecond,
204-
endpointType,
203+
numberOfHosts,
205204
numberOfConnectionsPerHost,
206205
maxRequestPerSecondByForConnections);
207206

@@ -218,27 +217,23 @@ public AmazonKeyspacesFixedRateThrottler(DriverContext context, long maxRequests
218217

219218
/***
220219
* Calculate the number of request per second based on the number of connections, number of hosts, and 2000 request per second.
221-
* @param endpointType VPC or Public Endpoint
220+
* @param numberOfHosts Number of hosts in the peers table. Depends on region and end point
222221
* @param numberOfConnectionsPerHost Number of connections for each host ip
223222
* @return max rate per second based on the number of connections
224223
*/
225-
public static int calculateConnectionMaxRequestPerSecond(EndpointType endpointType, int numberOfConnectionsPerHost){
226-
return (endpointType == EndpointType.VPC)
227-
? numberOfConnectionsPerHost * VPC_ENDPOINT_DEFAULT_HOST * REQUEST_PER_CONNECTION_DEFAULT
228-
: numberOfConnectionsPerHost * PUBLIC_ENDPOINT_DEFAULT_HOST * REQUEST_PER_CONNECTION_DEFAULT;
224+
public static int calculateConnectionMaxRequestPerSecond(Integer numberOfHosts, int numberOfConnectionsPerHost){
225+
return numberOfConnectionsPerHost * numberOfHosts * REQUEST_PER_CONNECTION_DEFAULT;
229226
}
230227

231228
/***
232229
* Calculate recommended connections based on the current maxRequestRate specified. Max Connections rate should be greater than configured max request rate
233230
* @param maxRequestsPerSecond number of configured request per second
234-
* @param endpointType type of endpoint that will be used to identify the number of hosts
231+
* @param numberOfHosts type of endpoint that will be used to identify the number of hosts
235232
* @return
236233
*/
237-
public static double simpleConnectionRecommendation(long maxRequestsPerSecond, EndpointType endpointType){
238-
int hosts = (endpointType == EndpointType.VPC)
239-
? VPC_ENDPOINT_DEFAULT_HOST:PUBLIC_ENDPOINT_DEFAULT_HOST;
234+
public static double simpleConnectionRecommendation(long maxRequestsPerSecond, Integer numberOfHosts){
240235

241-
return Math.max(1.0, Math.ceil(maxRequestsPerSecond/(double)(hosts * REQUEST_PER_CONNECTION_DEFAULT)));
236+
return Math.max(1.0, Math.ceil(maxRequestsPerSecond/(double)(numberOfHosts * REQUEST_PER_CONNECTION_DEFAULT)));
242237
}
243238

244239
/***

src/main/java/com/aws/ssa/keyspaces/throttler/KeyspacesThrottleOption.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package com.aws.ssa.keyspaces.throttler;
22

3-
import com.aws.ssa.keyspaces.core.EndpointType;
43
import com.datastax.oss.driver.api.core.config.DriverOption;
54

65
public enum KeyspacesThrottleOption implements DriverOption {
76

8-
KEYSPACES_THROTTLE_ENDPOINT_TYPE("advanced.throttler.endpoint-type"),
7+
KEYSPACES_THROTTLE_NUMBER_OF_HOSTS("advanced.throttler.number-of-hosts"),
98
KEYSPACES_THROTTLE_TIMEOUT("advanced.throttler.register-timeout");;
109

11-
public static final EndpointType DEFAULT_ENDPOINT_TYPE = EndpointType.VPC;
12-
10+
public static final int DEFAULT_NUMBER_OF_HOSTS = 1;
1311

1412
private final String path;
1513

src/test/java/com/aws/ssa/keyspaces/core/EndpointTest.java

Lines changed: 0 additions & 45 deletions
This file was deleted.
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package com.aws.ssa.keyspaces.export;
2+
3+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4+
import com.datastax.oss.driver.api.core.CqlSession;
5+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
6+
import com.datastax.oss.driver.api.core.cql.*;
7+
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8+
9+
import java.nio.ByteBuffer;
10+
import java.util.*;
11+
import java.util.concurrent.*;
12+
import java.util.concurrent.atomic.AtomicLong;
13+
14+
15+
public class Exporter {
16+
17+
private static BlockingQueue<Map<ByteBuffer, List<Row>>> blockingQueue = new LinkedBlockingDeque<>(2000);
18+
19+
private static int PAGE_SIZE = 5000;
20+
21+
private static String query = "";
22+
23+
private static ByteBuffer pagingState = null;
24+
25+
private static AtomicLong count = new AtomicLong(0L);
26+
27+
private static AtomicLong queueCount = new AtomicLong(0L);
28+
29+
private RateLimiter limiter = RateLimiter.create(10000);
30+
31+
public static void main(String[] args){
32+
33+
34+
//query = args[0];
35+
36+
query = "SELECT * FROM tlp_stress.counter_wide";
37+
38+
SimpleStatement t = new SimpleStatementBuilder(query).build();
39+
40+
t.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
41+
t.setIdempotent(true);
42+
t.setPageSize(PAGE_SIZE);
43+
44+
45+
CqlSession session = CqlSession.builder().withConfigLoader(DriverConfigLoader.fromClasspath("throttler-example")).build();
46+
47+
new Exporter().saveAllRows(session, t);
48+
49+
ExecutorService executor = Executors.newSingleThreadExecutor();
50+
Future task = executor.submit(() -> {
51+
52+
String threadName = Thread.currentThread().getName();
53+
System.out.println("Hello " + threadName);
54+
55+
do{
56+
try {
57+
Map<ByteBuffer, List<Row>> results = blockingQueue.poll(10000, TimeUnit.MILLISECONDS);
58+
//results.entrySet().forEach();
59+
if (results != null) {
60+
results.entrySet().forEach(x -> {
61+
x.getValue().forEach(y -> {
62+
queueCount.incrementAndGet();
63+
});
64+
});
65+
//System.out.println(results);
66+
}
67+
}catch (Exception ex){
68+
69+
}
70+
System.out.println("Fin " + count + " " + queueCount);
71+
}while(true);
72+
});
73+
74+
while(task.isDone()!=true){
75+
try{
76+
Thread.sleep(100);
77+
}catch (Exception ex){
78+
79+
}
80+
}
81+
System.out.println("Fin " + count + " " + queueCount);
82+
83+
}
84+
private void saveAllRows(CqlSession session, SimpleStatement statement){
85+
86+
CompletionStage<AsyncResultSet> resultSetFuture =
87+
session.executeAsync(statement);
88+
89+
CompletionStage<Void> fetchRows = resultSetFuture.thenCompose(this::queueUpPages);
90+
91+
fetchRows.whenComplete(
92+
(resultSet, error) -> {
93+
if (error != null) {
94+
System.out.printf("Error: %s%n", error.getMessage());
95+
96+
SimpleStatement t = new SimpleStatementBuilder(query).build();
97+
98+
t.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
99+
t.setIdempotent(true);
100+
t.setPageSize(PAGE_SIZE);
101+
t.setPagingState(pagingState);
102+
103+
saveAllRows(session, t);
104+
105+
} else {
106+
//String version = resultSet.one().getString(0);
107+
System.out.printf("Finished");
108+
}
109+
});
110+
111+
112+
113+
}
114+
private CompletionStage<Void> queueUpPages(AsyncResultSet resultSet) {
115+
116+
List<Row> rowsPerPage = new ArrayList<Row>(PAGE_SIZE+50);
117+
118+
int totalCount = 0;
119+
120+
for (Row row : resultSet.currentPage()) {
121+
count.incrementAndGet();
122+
rowsPerPage.add(row);
123+
totalCount++;
124+
}
125+
126+
Map<ByteBuffer, List<Row>> oneState = new HashMap<>(1);
127+
128+
pagingState = resultSet.getExecutionInfo().getPagingState();
129+
130+
oneState.put(pagingState, rowsPerPage);
131+
132+
try {
133+
blockingQueue.put(oneState);
134+
}catch (Exception ex){
135+
throw new CompletionException(ex);
136+
}
137+
138+
if (resultSet.hasMorePages()) {
139+
limiter.acquire(totalCount);
140+
return resultSet.fetchNextPage().thenCompose(this::queueUpPages);
141+
} else {
142+
return CompletableFuture.completedFuture(null);
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)