25
25
import software .amazon .awssdk .core .SplittingTransformerConfiguration ;
26
26
import software .amazon .awssdk .core .async .AsyncResponseTransformer ;
27
27
import software .amazon .awssdk .core .async .SdkPublisher ;
28
+ import software .amazon .awssdk .core .exception .SdkException ;
29
+ import software .amazon .awssdk .core .exception .SdkServiceException ;
30
+ import software .amazon .awssdk .core .internal .retry .SdkDefaultRetrySetting ;
31
+ import software .amazon .awssdk .core .retry .RetryUtils ;
28
32
import software .amazon .awssdk .utils .CompletableFutureUtils ;
29
33
import software .amazon .awssdk .utils .Logger ;
30
34
import software .amazon .awssdk .utils .Validate ;
@@ -54,16 +58,6 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
54
58
*/
55
59
private final AsyncResponseTransformer <ResponseT , ResultT > upstreamResponseTransformer ;
56
60
57
- /**
58
- * Set to true once {@code .prepare()} is called on the upstreamResponseTransformer
59
- */
60
- private final AtomicBoolean preparedCalled = new AtomicBoolean (false );
61
-
62
- /**
63
- * Set to true once {@code .onResponse()} is called on the upstreamResponseTransformer
64
- */
65
- private final AtomicBoolean onResponseCalled = new AtomicBoolean (false );
66
-
67
61
/**
68
62
* Set to true once {@code .onStream()} is called on the upstreamResponseTransformer
69
63
*/
@@ -111,6 +105,24 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
111
105
112
106
private final Object cancelLock = new Object ();
113
107
108
+ /**
109
+ * Keeps track of the upstream future returned by {@code upstreamResponseTransformer.prepare()}. If an error occurs, we
110
+ * forward this to the {@code resultFuture}.
111
+ */
112
+ private volatile CompletableFuture <ResultT > upstreamFuture ;
113
+
114
+ /**
115
+ * Tracks whether an {@code IndividualTransformer} has been created for the first part yet. Errors will only be retried for
116
+ * the first part.
117
+ */
118
+ private final AtomicBoolean isFirstIndividualTransformer = new AtomicBoolean (true );
119
+
120
+ /**
121
+ * Tracks whether an {@code IndividualPartSubscriber} has been created for the first part yet. Errors will only be retried for
122
+ * the first part.
123
+ */
124
+ private final AtomicBoolean isFirstIndividualSubscriber = new AtomicBoolean (true );
125
+
114
126
private SplittingTransformer (AsyncResponseTransformer <ResponseT , ResultT > upstreamResponseTransformer ,
115
127
Long maximumBufferSizeInBytes ,
116
128
CompletableFuture <ResultT > resultFuture ) {
@@ -198,7 +210,7 @@ private boolean doEmit() {
198
210
}
199
211
if (outstandingDemand .get () > 0 ) {
200
212
demand = outstandingDemand .decrementAndGet ();
201
- downstreamSubscriber .onNext (new IndividualTransformer ());
213
+ downstreamSubscriber .onNext (new IndividualTransformer (isFirstIndividualTransformer . compareAndSet ( true , false ) ));
202
214
}
203
215
}
204
216
return false ;
@@ -230,6 +242,7 @@ private void handleSubscriptionCancel() {
230
242
} else {
231
243
log .trace (() -> "calling downstreamSubscriber.onComplete()" );
232
244
downstreamSubscriber .onComplete ();
245
+ CompletableFutureUtils .forwardResultTo (upstreamFuture , resultFuture );
233
246
}
234
247
downstreamSubscriber = null ;
235
248
});
@@ -259,28 +272,27 @@ private void handleFutureCancel(Throwable e) {
259
272
* body publisher.
260
273
*/
261
274
private class IndividualTransformer implements AsyncResponseTransformer <ResponseT , ResponseT > {
275
+ private final boolean isFirstPart ;
262
276
private ResponseT response ;
263
277
private CompletableFuture <ResponseT > individualFuture ;
264
278
279
+ IndividualTransformer (boolean isFirstPart ) {
280
+ this .isFirstPart = isFirstPart ;
281
+ }
282
+
265
283
@ Override
266
284
public CompletableFuture <ResponseT > prepare () {
267
285
this .individualFuture = new CompletableFuture <>();
268
- if (preparedCalled .compareAndSet (false , true )) {
286
+
287
+ if (isFirstPart ) {
269
288
if (isCancelled .get ()) {
270
289
return individualFuture ;
271
290
}
272
291
log .trace (() -> "calling prepare on the upstream transformer" );
273
- CompletableFuture <ResultT > upstreamFuture = upstreamResponseTransformer .prepare ();
274
- if (!resultFuture .isDone ()) {
275
- CompletableFutureUtils .forwardResultTo (upstreamFuture , resultFuture );
276
- }
292
+ upstreamFuture = upstreamResponseTransformer .prepare ();
293
+
277
294
}
278
- resultFuture .whenComplete ((r , e ) -> {
279
- if (e == null ) {
280
- return ;
281
- }
282
- individualFuture .completeExceptionally (e );
283
- });
295
+
284
296
individualFuture .whenComplete ((r , e ) -> {
285
297
if (isCancelled .get ()) {
286
298
handleSubscriptionCancel ();
@@ -291,7 +303,7 @@ public CompletableFuture<ResponseT> prepare() {
291
303
292
304
@ Override
293
305
public void onResponse (ResponseT response ) {
294
- if (onResponseCalled . compareAndSet ( false , true ) ) {
306
+ if (isFirstPart ) {
295
307
log .trace (() -> "calling onResponse on the upstream transformer" );
296
308
upstreamResponseTransformer .onResponse (response );
297
309
}
@@ -304,7 +316,8 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
304
316
return ;
305
317
}
306
318
synchronized (cancelLock ) {
307
- if (onStreamCalled .compareAndSet (false , true )) {
319
+ if (isFirstPart ) {
320
+ onStreamCalled .set (true );
308
321
log .trace (() -> "calling onStream on the upstream transformer" );
309
322
upstreamResponseTransformer .onStream (upstreamSubscriber -> publisherToUpstream .subscribe (
310
323
DelegatingBufferingSubscriber .builder ()
@@ -314,29 +327,65 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
314
327
);
315
328
}
316
329
}
317
- publisher .subscribe (new IndividualPartSubscriber <>(this .individualFuture , response ));
330
+
331
+ if (!resultFuture .isDone ()) {
332
+ CompletableFutureUtils .forwardResultTo (upstreamFuture , resultFuture );
333
+ }
334
+
335
+ publisher .subscribe (new IndividualPartSubscriber <>(this .individualFuture , response ,
336
+ isFirstIndividualSubscriber .compareAndSet (true , false )));
318
337
}
319
338
320
339
@ Override
321
340
public void exceptionOccurred (Throwable error ) {
322
- publisherToUpstream .error (error );
323
341
log .trace (() -> "calling exceptionOccurred on the upstream transformer" );
324
342
upstreamResponseTransformer .exceptionOccurred (error );
343
+
344
+ if (!isFirstPart || onStreamCalled .get ()) {
345
+ publisherToUpstream .error (error );
346
+ }
347
+
348
+ if (!isRetryableError (error )) {
349
+ log .trace (() -> "error is non-retryable, forwarding upstream future to result future" );
350
+ CompletableFutureUtils .forwardResultTo (upstreamFuture , resultFuture );
351
+ }
325
352
}
326
353
}
327
354
355
+ private boolean isRetryableError (Throwable error ) {
356
+ if (error instanceof SdkException ) {
357
+ SdkException ex = (SdkException ) error ;
358
+ return retryOnStatusCodes (ex )
359
+ || RetryUtils .isRetryableException (ex )
360
+ || RetryUtils .isClockSkewException (ex )
361
+ || RetryUtils .isClockSkewException (ex );
362
+
363
+ }
364
+ return false ;
365
+ }
366
+
367
+ private static boolean retryOnStatusCodes (Throwable ex ) {
368
+ if (ex instanceof SdkServiceException ) {
369
+ SdkServiceException failure = (SdkServiceException ) ex ;
370
+ return SdkDefaultRetrySetting .RETRYABLE_STATUS_CODES .contains (failure .statusCode ());
371
+ }
372
+ return false ;
373
+ }
374
+
328
375
/**
329
376
* the Subscriber for each of the individual request's ByteBuffer publisher
330
377
*/
331
378
class IndividualPartSubscriber <T > implements Subscriber <ByteBuffer > {
332
379
333
380
private final CompletableFuture <T > future ;
334
381
private final T response ;
382
+ private final boolean isFirstPart ;
335
383
private Subscription subscription ;
336
384
337
- IndividualPartSubscriber (CompletableFuture <T > future , T response ) {
385
+ IndividualPartSubscriber (CompletableFuture <T > future , T response , boolean isFirstPart ) {
338
386
this .future = future ;
339
387
this .response = response ;
388
+ this .isFirstPart = isFirstPart ;
340
389
}
341
390
342
391
@ Override
@@ -376,7 +425,9 @@ public void onComplete() {
376
425
}
377
426
378
427
private void handleError (Throwable t ) {
379
- publisherToUpstream .error (t );
428
+ if (!isFirstPart ) {
429
+ publisherToUpstream .error (t );
430
+ }
380
431
future .completeExceptionally (t );
381
432
}
382
433
}
0 commit comments