Skip to content

Commit 0a5ac3e

Browse files
committed
New Exponential Retry Policy
New Exponential Retry Policy
1 parent dab553a commit 0a5ac3e

File tree

11 files changed

+833
-19
lines changed

11 files changed

+833
-19
lines changed

pom.xml

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,33 @@
1616

1717

1818
<dependencies>
19+
<!--
20+
<dependency>
21+
<groupId>com.datastax.cassandra</groupId>
22+
<artifactId>cassandra-driver-core</artifactId>
23+
<version>3.11.0</version>
24+
</dependency>
25+
-->
26+
<dependency>
27+
<groupId>com.datastax.oss</groupId>
28+
<artifactId>java-driver-core</artifactId>
29+
<version>4.11.3</version>
30+
<scope>provided</scope>
31+
</dependency>
1932
<dependency>
20-
<groupId>com.datastax.oss</groupId>
21-
<artifactId>java-driver-core</artifactId>
22-
<version>4.2.0</version>
23-
<scope>provided</scope>
33+
<groupId>software.aws.mcs</groupId>
34+
<artifactId>aws-sigv4-auth-cassandra-java-driver-plugin</artifactId>
35+
<version>4.0.4</version>
36+
<scope>test</scope>
2437
</dependency>
2538
<dependency>
2639
<groupId>junit</groupId>
2740
<artifactId>junit</artifactId>
2841
<version>4.13.2</version>
2942
<scope>test</scope>
3043
</dependency>
31-
<dependency>
32-
<groupId>software.aws.mcs</groupId>
33-
<artifactId>aws-sigv4-auth-cassandra-java-driver-plugin</artifactId>
34-
<version>4.0.4</version>
35-
<scope>test</scope>
36-
</dependency>
44+
45+
3746
<dependency>
3847
<groupId>ch.qos.logback</groupId>
3948
<artifactId>logback-classic</artifactId>
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package com.aws.ssa.keyspaces.retry;
2+
3+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
5+
import com.datastax.oss.driver.api.core.context.DriverContext;
6+
import com.datastax.oss.driver.api.core.retry.RetryDecision;
7+
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
8+
import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
9+
import com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException;
10+
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
11+
import com.datastax.oss.driver.api.core.servererrors.WriteType;
12+
import com.datastax.oss.driver.api.core.session.Request;
13+
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
14+
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
15+
import edu.umd.cs.findbugs.annotations.NonNull;
16+
import net.jcip.annotations.ThreadSafe;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import java.util.concurrent.ThreadLocalRandom;
21+
import java.util.concurrent.TimeUnit;
22+
23+
24+
/**
25+
* This is a conservative retry policy adapted for the Amazon Keyspaces Service.
26+
* It allows for a configurable number of attempts, but by default the number of attempts is {@value KeyspacesRetryOption#DEFAULT_KEYSPACES_RETRY_MAX_ATTEMPTS}
27+
* <p>
28+
* This policy will either reattempt request on the same host or rethrow the exception to the calling thread. The main difference between
29+
* this policy from the original {@link com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy} is that the {@link AmazonKeyspacesExponentialRetryPolicy} will call {@link RetryDecision#RETRY_SAME} instead of {@link RetryDecision#RETRY_NEXT}
30+
* <p>
31+
* In Amazon Keyspaces, it's likely that {@link WriteTimeoutException} or {@link ReadTimeoutException} is the result of exceeding current table
32+
* capacity. Learn more about Amazon Keyspaces capacity here: @see <a href="https://docs.aws.amazon.com/keyspaces/latest/devguide/ReadWriteCapacityMode.html">Amazon Keyspaces CapacityModes</a>.
33+
* In most cases you should allow for small number of retries, and handle the exception in your application threads.
34+
*
35+
* <p>To activate this policy, modify the {@code advanced.retry-policy} section in the driver
36+
* configuration, for example:
37+
*
38+
* <pre>
39+
* datastax-java-driver {
40+
* advanced.retry-policy {
41+
* class = com.aws.ssa.keyspaces.retry.AmazonKeyspacesRetryPolicy
42+
* max-attempts = 2
43+
* }
44+
* }
45+
* </pre>
46+
*/
47+
48+
@ThreadSafe
49+
public class AmazonKeyspacesExponentialRetryPolicy implements RetryPolicy {
50+
51+
52+
private static final Logger LOG = LoggerFactory.getLogger(AmazonKeyspacesExponentialRetryPolicy.class);
53+
@VisibleForTesting
54+
public static final String RETRYING_ON_READ_TIMEOUT = "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, received responses: {}, data retrieved: {}, retries: {})";
55+
@VisibleForTesting
56+
public static final String RETRYING_ON_WRITE_TIMEOUT = "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, required acknowledgments: {}, received acknowledgments: {}, retries: {})";
57+
@VisibleForTesting
58+
public static final String RETRYING_ON_UNAVAILABLE = "[{}] Retrying on unavailable exception on next host (consistency: {}, required replica: {}, alive replica: {}, retries: {})";
59+
@VisibleForTesting
60+
public static final String RETRYING_ON_ABORTED = "[{}] Retrying on aborted request on next host (retries: {})";
61+
@VisibleForTesting
62+
public static final String RETRYING_ON_ERROR = "[{}] Retrying on node error on next host (retries: {})";
63+
64+
private final String logPrefix;
65+
66+
private final Integer maxRetryCount;
67+
68+
//private final Integer maxTimeToWait;
69+
70+
public AmazonKeyspacesExponentialRetryPolicy(DriverContext context) {
71+
this(context, context.getConfig().getDefaultProfile().getName());
72+
}
73+
74+
public AmazonKeyspacesExponentialRetryPolicy(DriverContext context, Integer maxRetryCount) {
75+
76+
String profileName = context.getConfig().getDefaultProfile().getName();
77+
78+
this.maxRetryCount = maxRetryCount;
79+
80+
this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName;
81+
}
82+
83+
public AmazonKeyspacesExponentialRetryPolicy(DriverContext context, String profileName) {
84+
DriverExecutionProfile retryExecutionProfile = context.getConfig().getProfile(profileName);
85+
86+
maxRetryCount = retryExecutionProfile.getInt(KeyspacesRetryOption.KEYSPACES_RETRY_MAX_ATTEMPTS, KeyspacesRetryOption.DEFAULT_KEYSPACES_RETRY_MAX_ATTEMPTS);
87+
88+
this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName;
89+
}
90+
91+
92+
protected RetryDecision determineRetryDecision(int retryCount) {
93+
94+
if (retryCount < maxRetryCount) {
95+
timeToWait(retryCount);
96+
97+
return RetryDecision.RETRY_SAME;
98+
} else {
99+
return RetryDecision.RETHROW;
100+
101+
102+
}
103+
}
104+
protected void timeToWait(int retryCount){
105+
106+
int timeToWaitCalculation = (retryCount + 1) * ThreadLocalRandom.current().nextInt(1, 20);
107+
108+
int timeToWaitFinal = Math.min(500, timeToWaitCalculation);
109+
110+
Uninterruptibles.sleepUninterruptibly(timeToWaitFinal, TimeUnit.MILLISECONDS);
111+
}
112+
113+
114+
/**
115+
* {@inheritDoc}
116+
*
117+
* <p>This implementation triggers a maximum of configured retry (to the same connection)
118+
*
119+
* <p>Otherwise, the exception is rethrown.
120+
*/
121+
@Override
122+
public RetryDecision onReadTimeout(
123+
@NonNull Request request,
124+
@NonNull ConsistencyLevel cl,
125+
int blockFor,
126+
int received,
127+
boolean dataPresent,
128+
int retryCount) {
129+
130+
RetryDecision decision = determineRetryDecision(retryCount);
131+
132+
LOG.trace(RETRYING_ON_READ_TIMEOUT, logPrefix, cl, blockFor, received, false, retryCount);
133+
134+
return decision;
135+
136+
}
137+
138+
139+
/**
140+
* {@inheritDoc}
141+
*
142+
* <p>This implementation triggers a maximum of configured retry (to the same connection)
143+
*
144+
* <p>Otherwise, the exception is rethrown.
145+
*/
146+
@Override
147+
public RetryDecision onWriteTimeout(
148+
@NonNull Request request,
149+
@NonNull ConsistencyLevel cl,
150+
@NonNull WriteType writeType,
151+
int blockFor,
152+
int received,
153+
int retryCount) {
154+
155+
RetryDecision decision = determineRetryDecision(retryCount);
156+
157+
LOG.trace(RETRYING_ON_WRITE_TIMEOUT, logPrefix, cl, blockFor, received, false, retryCount);
158+
159+
return decision;
160+
}
161+
162+
/**
163+
* {@inheritDoc}
164+
*
165+
* <p>This implementation triggers a maximum of configured retry (to the same connection)
166+
*
167+
* <p>Otherwise, the exception is rethrown.
168+
*/
169+
@Override
170+
public RetryDecision onUnavailable(
171+
@NonNull Request request,
172+
@NonNull ConsistencyLevel cl,
173+
int required,
174+
int alive,
175+
int retryCount) {
176+
177+
RetryDecision decision = determineRetryDecision(retryCount);
178+
179+
LOG.trace(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount);
180+
181+
return decision;
182+
}
183+
184+
/**
185+
* {@inheritDoc}
186+
*
187+
* <p>This implementation triggers a maximum of configured retry (to the same connection)
188+
*/
189+
@Override
190+
public RetryDecision onRequestAborted(
191+
@NonNull Request request, @NonNull Throwable error, int retryCount) {
192+
193+
RetryDecision decision = determineRetryDecision(retryCount);
194+
195+
LOG.trace(RETRYING_ON_ABORTED, logPrefix, retryCount, error);
196+
197+
return decision;
198+
}
199+
200+
/**
201+
* {@inheritDoc}
202+
*
203+
* <p>This implementation triggers a maximum of configured retry (to the same connection)
204+
*/
205+
@Override
206+
public RetryDecision onErrorResponse(
207+
@NonNull Request request, @NonNull CoordinatorException error, int retryCount) {
208+
209+
RetryDecision decision = determineRetryDecision(retryCount);
210+
211+
LOG.trace(RETRYING_ON_ERROR, logPrefix, retryCount, error);
212+
213+
return decision;
214+
}
215+
216+
@Override
217+
public void close() {
218+
// nothing to do
219+
}
220+
}

src/main/java/com/aws/ssa/keyspaces/retry/AmazonKeyspacesRetryPolicy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ public AmazonKeyspacesRetryPolicy(DriverContext context, String profileName) {
8282

8383

8484
protected RetryDecision determineRetryDecision(int retryCount) {
85-
if (retryCount > maxRetryCount) {
86-
return RetryDecision.RETHROW;
87-
} else {
85+
if (retryCount < maxRetryCount) {
8886
return RetryDecision.RETRY_SAME;
87+
} else {
88+
return RetryDecision.RETHROW;
8989
}
9090
}
9191

src/main/java/com/aws/ssa/keyspaces/retry/AmazonKeyspacesRetryPolicy_driver_v3.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
/*
1+
/***
2+
23
package com.aws.ssa.keyspaces.retry;
34
45
import com.datastax.driver.core.*;
56
import com.datastax.driver.core.exceptions.DriverException;
67
import com.datastax.driver.core.policies.RetryPolicy;
8+
import com.google.common.util.concurrent.Uninterruptibles;
9+
10+
import java.util.concurrent.ThreadLocalRandom;
11+
import java.util.concurrent.TimeUnit;
712
813
// ** Commented out so that it will not break compile. For v3 you should use the appropriate maven dependency.
914
// Taken from https://github.com/aws-samples/amazon-keyspaces-examples/
@@ -30,11 +35,24 @@ public AmazonKeyspacesRetryPolicy(int numberOfRetries) {
3035
}
3136
3237
protected RetryDecision makeDecisionBasedOnNumberOfConfiguredRetries(int nbRetry, ConsistencyLevel cl){
33-
if(nbRetry > maxNumberOfRetries){
38+
if(nbRetry < maxNumberOfRetries){
39+
timeToWait(nbRetry);
40+
41+
return RetryDecision.retry(cl);
42+
43+
}else{
3444
return RetryDecision.rethrow();
3545
}
3646
37-
return RetryDecision.retry(cl);
47+
48+
}
49+
protected void timeToWait(int retryCount){
50+
51+
int timeToWaitCalculation = (retryCount + 1) * ThreadLocalRandom.current().nextInt(1, 20);
52+
53+
int timeToWaitFinal = Math.min(1000, timeToWaitCalculation);
54+
55+
Uninterruptibles.sleepUninterruptibly(timeToWaitFinal, TimeUnit.MILLISECONDS);
3856
}
3957
4058
public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
@@ -58,6 +76,4 @@ public void init(Cluster cluster) {
5876
5977
public void close() {
6078
}
61-
62-
63-
}*/
79+
}***/

0 commit comments

Comments
 (0)