19
19
20
20
import java .io .IOException ;
21
21
import java .time .Duration ;
22
- import java .util .OptionalDouble ;
22
+ import java .util .Optional ;
23
23
import java .util .concurrent .CompletableFuture ;
24
24
import java .util .concurrent .ScheduledExecutorService ;
25
25
import software .amazon .awssdk .annotations .SdkInternalApi ;
26
- import software .amazon .awssdk .annotations .SdkTestInternalApi ;
27
26
import software .amazon .awssdk .core .Response ;
28
27
import software .amazon .awssdk .core .async .AsyncRequestBody ;
29
28
import software .amazon .awssdk .core .client .config .SdkClientOption ;
30
- import software .amazon .awssdk .core .exception .SdkClientException ;
31
29
import software .amazon .awssdk .core .exception .SdkException ;
32
30
import software .amazon .awssdk .core .internal .http .HttpClientDependencies ;
33
31
import software .amazon .awssdk .core .internal .http .RequestExecutionContext ;
34
32
import software .amazon .awssdk .core .internal .http .TransformingAsyncResponseHandler ;
35
33
import software .amazon .awssdk .core .internal .http .pipeline .RequestPipeline ;
36
34
import software .amazon .awssdk .core .internal .http .pipeline .stages .utils .RetryableStageHelper ;
37
- import software .amazon .awssdk .core .internal .retry .RateLimitingTokenBucket ;
38
35
import software .amazon .awssdk .http .SdkHttpFullRequest ;
39
36
import software .amazon .awssdk .utils .CompletableFutureUtils ;
40
37
@@ -49,97 +46,54 @@ public final class AsyncRetryableStage<OutputT> implements RequestPipeline<SdkHt
49
46
private final RequestPipeline <SdkHttpFullRequest , CompletableFuture <Response <OutputT >>> requestPipeline ;
50
47
private final ScheduledExecutorService scheduledExecutor ;
51
48
private final HttpClientDependencies dependencies ;
52
- private final RateLimitingTokenBucket rateLimitingTokenBucket ;
53
49
54
50
public AsyncRetryableStage (TransformingAsyncResponseHandler <Response <OutputT >> responseHandler ,
55
51
HttpClientDependencies dependencies ,
56
52
RequestPipeline <SdkHttpFullRequest , CompletableFuture <Response <OutputT >>> requestPipeline ) {
57
53
this .responseHandler = responseHandler ;
58
54
this .dependencies = dependencies ;
59
55
this .scheduledExecutor = dependencies .clientConfiguration ().option (SdkClientOption .SCHEDULED_EXECUTOR_SERVICE );
60
- this .rateLimitingTokenBucket = new RateLimitingTokenBucket ();
61
56
this .requestPipeline = requestPipeline ;
62
57
}
63
58
64
- @ SdkTestInternalApi
65
- public AsyncRetryableStage (TransformingAsyncResponseHandler <Response <OutputT >> responseHandler ,
66
- HttpClientDependencies dependencies ,
67
- RequestPipeline <SdkHttpFullRequest , CompletableFuture <Response <OutputT >>> requestPipeline ,
68
- RateLimitingTokenBucket rateLimitingTokenBucket ) {
69
- this .responseHandler = responseHandler ;
70
- this .dependencies = dependencies ;
71
- this .scheduledExecutor = dependencies .clientConfiguration ().option (SdkClientOption .SCHEDULED_EXECUTOR_SERVICE );
72
- this .requestPipeline = requestPipeline ;
73
- this .rateLimitingTokenBucket = rateLimitingTokenBucket ;
74
- }
75
-
76
59
@ Override
77
60
public CompletableFuture <Response <OutputT >> execute (SdkHttpFullRequest request ,
78
61
RequestExecutionContext context ) throws Exception {
79
62
return new RetryingExecutor (request , context ).execute ();
80
63
}
81
64
82
- private class RetryingExecutor {
65
+ private final class RetryingExecutor {
83
66
private final AsyncRequestBody originalRequestBody ;
84
67
private final RequestExecutionContext context ;
85
68
private final RetryableStageHelper retryableStageHelper ;
86
69
87
70
private RetryingExecutor (SdkHttpFullRequest request , RequestExecutionContext context ) {
88
71
this .originalRequestBody = context .requestProvider ();
89
72
this .context = context ;
90
- this .retryableStageHelper = new RetryableStageHelper (request , context , rateLimitingTokenBucket , dependencies );
73
+ this .retryableStageHelper = new RetryableStageHelper (request , context , dependencies );
91
74
}
92
75
93
- public CompletableFuture <Response <OutputT >> execute () throws Exception {
76
+ public CompletableFuture <Response <OutputT >> execute () {
94
77
CompletableFuture <Response <OutputT >> future = new CompletableFuture <>();
95
- maybeAttemptExecute (future );
78
+ attemptFirstExecute (future );
96
79
return future ;
97
80
}
98
81
99
- public void maybeAttemptExecute (CompletableFuture <Response <OutputT >> future ) {
100
- retryableStageHelper .startingAttempt ();
101
-
102
- if (!retryableStageHelper .retryPolicyAllowsRetry ()) {
103
- future .completeExceptionally (retryableStageHelper .retryPolicyDisallowedRetryException ());
104
- return ;
105
- }
106
-
107
- if (retryableStageHelper .getAttemptNumber () > 1 ) {
108
- // We failed the last attempt, but will retry. The response handler wants to know when that happens.
109
- responseHandler .onError (retryableStageHelper .getLastException ());
110
-
111
- // Reset the request provider to the original one before retries, in case it was modified downstream.
112
- context .requestProvider (originalRequestBody );
113
- }
114
-
115
- Duration backoffDelay = retryableStageHelper .getBackoffDelay ();
116
-
117
- OptionalDouble tokenAcquireTimeSeconds = retryableStageHelper .getSendTokenNonBlocking ();
118
- if (!tokenAcquireTimeSeconds .isPresent ()) {
119
- String errorMessage = "Unable to acquire a send token immediately without waiting. This indicates that ADAPTIVE "
120
- + "retry mode is enabled, fast fail rate limiting is enabled, and that rate limiting is "
121
- + "engaged because of prior throttled requests. The request will not be executed." ;
122
- future .completeExceptionally (SdkClientException .create (errorMessage ));
123
- return ;
124
- }
125
- long tokenAcquireTimeMillis = (long ) (tokenAcquireTimeSeconds .getAsDouble () * 1000 );
126
-
127
- if (!backoffDelay .isZero ()) {
82
+ public void attemptFirstExecute (CompletableFuture <Response <OutputT >> future ) {
83
+ Duration backoffDelay = retryableStageHelper .acquireInitialToken ();
84
+ if (backoffDelay .isZero ()) {
85
+ attemptExecute (future );
86
+ } else {
128
87
retryableStageHelper .logBackingOff (backoffDelay );
129
- }
130
-
131
- long totalDelayMillis = backoffDelay .toMillis () + tokenAcquireTimeMillis ;
132
-
133
- if (totalDelayMillis > 0 ) {
88
+ long totalDelayMillis = backoffDelay .toMillis ();
134
89
scheduledExecutor .schedule (() -> attemptExecute (future ), totalDelayMillis , MILLISECONDS );
135
- } else {
136
- attemptExecute (future );
137
90
}
138
91
}
139
92
140
93
private void attemptExecute (CompletableFuture <Response <OutputT >> future ) {
141
94
CompletableFuture <Response <OutputT >> responseFuture ;
142
95
try {
96
+ retryableStageHelper .startingAttempt ();
143
97
retryableStageHelper .logSendingRequest ();
144
98
responseFuture = requestPipeline .execute (retryableStageHelper .requestToSend (), context );
145
99
@@ -164,23 +118,37 @@ private void attemptExecute(CompletableFuture<Response<OutputT>> future) {
164
118
}
165
119
166
120
retryableStageHelper .setLastResponse (response .httpResponse ());
167
-
168
121
if (!response .isSuccess ()) {
169
122
retryableStageHelper .adjustClockIfClockSkew (response );
170
123
maybeRetryExecute (future , response .exception ());
171
124
return ;
172
125
}
173
126
174
- retryableStageHelper .updateClientSendingRateForSuccessResponse ();
175
-
176
- retryableStageHelper .attemptSucceeded ();
127
+ retryableStageHelper .recordAttemptSucceeded ();
177
128
future .complete (response );
178
129
});
179
130
}
180
131
132
+ public void maybeAttemptExecute (CompletableFuture <Response <OutputT >> future ) {
133
+ Optional <Duration > delay = retryableStageHelper .tryRefreshToken (Duration .ZERO );
134
+ if (!delay .isPresent ()) {
135
+ future .completeExceptionally (retryableStageHelper .retryPolicyDisallowedRetryException ());
136
+ return ;
137
+ }
138
+ // We failed the last attempt, but will retry. The response handler wants to know when that happens.
139
+ responseHandler .onError (retryableStageHelper .getLastException ());
140
+
141
+ // Reset the request provider to the original one before retries, in case it was modified downstream.
142
+ context .requestProvider (originalRequestBody );
143
+
144
+ Duration backoffDelay = delay .get ();
145
+ retryableStageHelper .logBackingOff (backoffDelay );
146
+ long totalDelayMillis = backoffDelay .toMillis ();
147
+ scheduledExecutor .schedule (() -> attemptExecute (future ), totalDelayMillis , MILLISECONDS );
148
+ }
149
+
181
150
private void maybeRetryExecute (CompletableFuture <Response <OutputT >> future , Exception exception ) {
182
151
retryableStageHelper .setLastException (exception );
183
- retryableStageHelper .updateClientSendingRateForErrorResponse ();
184
152
maybeAttemptExecute (future );
185
153
}
186
154
}
0 commit comments