Skip to content

Commit d742e20

Browse files
authored
Allow slow recovery of tokens in the token bucket (#5345)
* Allow slow recovery of tokens in the token bucket * Add changelog * Alternate solution, add on the retry strategy * Set the amount to release to on in the release call instead
1 parent c59a075 commit d742e20

File tree

3 files changed

+213
-1
lines changed

3 files changed

+213
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Fix a bug on the token bucket, after success we need to deposit back one token to allow it to slowly recover and allow more retries after seeing several successful responses."
6+
}

core/retries/src/main/java/software/amazon/awssdk/retries/internal/BaseRetryStrategy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,9 @@ private AcquireResponse requestAcquireCapacity(RefreshRetryTokenRequest request,
196196

197197
private ReleaseResponse releaseTokenBucketCapacity(DefaultRetryToken token) {
198198
TokenBucket bucket = tokenBucketStore.tokenBucketForScope(token.scope());
199-
int capacityReleased = token.capacityAcquired();
199+
// Make sure that we release at least one token to allow the token bucket
200+
// to replenish its tokens.
201+
int capacityReleased = Math.max(token.capacityAcquired(), 1);
200202
return bucket.release(capacityReleased);
201203
}
202204

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.retry;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
21+
import static com.github.tomakehurst.wiremock.client.WireMock.post;
22+
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
23+
24+
import com.github.tomakehurst.wiremock.WireMockServer;
25+
import com.github.tomakehurst.wiremock.common.FileSource;
26+
import com.github.tomakehurst.wiremock.extension.Parameters;
27+
import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
28+
import com.github.tomakehurst.wiremock.http.Request;
29+
import com.github.tomakehurst.wiremock.http.Response;
30+
import java.net.URI;
31+
import java.util.concurrent.CompletionException;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
37+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
38+
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
39+
import software.amazon.awssdk.core.SdkPlugin;
40+
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
41+
import software.amazon.awssdk.core.retry.RetryMode;
42+
import software.amazon.awssdk.core.retry.RetryPolicy;
43+
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
44+
import software.amazon.awssdk.regions.Region;
45+
import software.amazon.awssdk.retries.api.RetryStrategy;
46+
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient;
47+
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClientBuilder;
48+
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClient;
49+
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClientBuilder;
50+
import software.amazon.awssdk.services.protocolrestjson.model.AllTypesRequest;
51+
import software.amazon.awssdk.services.protocolrestjson.model.AllTypesResponse;
52+
53+
@SuppressWarnings("deprecation")
54+
public abstract class TokenBucketRecoversTest<ClientT, BuilderT extends AwsClientBuilder<BuilderT, ClientT>> {
55+
56+
protected WireMockServer wireMock;
57+
58+
protected abstract BuilderT newClientBuilder();
59+
60+
protected abstract AllTypesResponse callAllTypes(ClientT client, SdkPlugin... plugins);
61+
62+
private BuilderT clientBuilder() {
63+
StaticCredentialsProvider credentialsProvider =
64+
StaticCredentialsProvider.create(AwsBasicCredentials.create("akid", "skid"));
65+
return newClientBuilder()
66+
.credentialsProvider(credentialsProvider)
67+
.region(Region.US_EAST_1)
68+
.endpointOverride(URI.create("http://localhost:" + wireMock.port()));
69+
}
70+
71+
@Test
72+
public void retryPolicyTokenBucketRecovers() {
73+
RetryPolicy retryPolicy = RetryPolicy.forRetryMode(RetryMode.STANDARD)
74+
.toBuilder()
75+
.throttlingBackoffStrategy(BackoffStrategy.none())
76+
.backoffStrategy(BackoffStrategy.none())
77+
.build();
78+
ClientT client = clientBuilder()
79+
.overrideConfiguration(o -> o.retryPolicy(retryPolicy))
80+
.build();
81+
int exceptions = 0;
82+
for (int x = 0; x < 60; x++) {
83+
try {
84+
callAllTypes(client);
85+
} catch (Exception e) {
86+
exceptions += 1;
87+
}
88+
}
89+
verifyRequestCount(161);
90+
}
91+
92+
@Test
93+
public void retryStrategyTokenBucketRecovers() {
94+
RetryStrategy retryStrategy = SdkDefaultRetryStrategy
95+
.forRetryMode(RetryMode.STANDARD)
96+
.toBuilder()
97+
.backoffStrategy(software.amazon.awssdk.retries.api.BackoffStrategy.retryImmediately())
98+
.build();
99+
ClientT client = clientBuilder()
100+
.overrideConfiguration(o -> o.retryStrategy(retryStrategy))
101+
.build();
102+
int exceptions = 0;
103+
for (int x = 0; x < 60; x++) {
104+
try {
105+
callAllTypes(client);
106+
} catch (Exception e) {
107+
exceptions += 1;
108+
}
109+
}
110+
verifyRequestCount(161);
111+
}
112+
113+
@BeforeEach
114+
private void beforeEach() {
115+
wireMock = new WireMockServer(wireMockConfig()
116+
.extensions(ErrorSimulationResponseTransformer.class));
117+
wireMock.start();
118+
wireMock.stubFor(post(anyUrl())
119+
.willReturn(aResponse()
120+
.withTransformers("error-simulation-transformer")));
121+
}
122+
123+
@AfterEach
124+
private void afterEach() {
125+
wireMock.stop();
126+
}
127+
128+
private void verifyRequestCount(int count) {
129+
wireMock.verify(count, anyRequestedFor(anyUrl()));
130+
}
131+
132+
static class SyncCanOverrideStrategy extends TokenBucketRecoversTest<ProtocolRestJsonClient,
133+
ProtocolRestJsonClientBuilder> {
134+
@Override
135+
protected ProtocolRestJsonClientBuilder newClientBuilder() {
136+
return ProtocolRestJsonClient.builder();
137+
}
138+
139+
@Override
140+
protected AllTypesResponse callAllTypes(ProtocolRestJsonClient client, SdkPlugin... plugins) {
141+
AllTypesRequest.Builder requestBuilder = AllTypesRequest.builder();
142+
for (SdkPlugin plugin : plugins) {
143+
requestBuilder.overrideConfiguration(o -> o.addPlugin(plugin));
144+
}
145+
return client.allTypes(requestBuilder.build());
146+
}
147+
}
148+
149+
static class AsyncCanOverrideStrategy extends TokenBucketRecoversTest<ProtocolRestJsonAsyncClient,
150+
ProtocolRestJsonAsyncClientBuilder> {
151+
@Override
152+
protected ProtocolRestJsonAsyncClientBuilder newClientBuilder() {
153+
return ProtocolRestJsonAsyncClient.builder();
154+
}
155+
156+
@Override
157+
protected AllTypesResponse callAllTypes(ProtocolRestJsonAsyncClient client, SdkPlugin... plugins) {
158+
try {
159+
AllTypesRequest.Builder requestBuilder = AllTypesRequest.builder();
160+
for (SdkPlugin plugin : plugins) {
161+
requestBuilder.overrideConfiguration(o -> o.addPlugin(plugin));
162+
}
163+
return client.allTypes(requestBuilder.build()).join();
164+
} catch (CompletionException e) {
165+
if (e.getCause() instanceof RuntimeException) {
166+
throw (RuntimeException) e.getCause();
167+
}
168+
throw e;
169+
}
170+
}
171+
}
172+
173+
public static class ErrorSimulationResponseTransformer extends ResponseTransformer {
174+
private AtomicInteger requestCount = new AtomicInteger(0);
175+
176+
@Override
177+
public String getName() {
178+
return "error-simulation-transformer";
179+
}
180+
181+
@Override
182+
public Response transform(Request request, Response response, FileSource files, Parameters parameters) {
183+
if (shouldSucceed()) {
184+
return Response.Builder.like(response)
185+
.but().body("{}")
186+
.status(200)
187+
.build();
188+
}
189+
return Response.Builder.like(response)
190+
.but().body("{}")
191+
.status(429)
192+
.build();
193+
}
194+
195+
private boolean shouldSucceed() {
196+
int currentCount = requestCount.getAndIncrement();
197+
if (currentCount < 150 || currentCount >= 155) {
198+
// in between 5 successful calls will allow an additional retry as 5 tokens will be deposited to the store.
199+
return false;
200+
}
201+
return true;
202+
}
203+
}
204+
}

0 commit comments

Comments
 (0)