Skip to content

Commit 14cccda

Browse files
committed
Address comments
1 parent 05ea027 commit 14cccda

File tree

4 files changed

+210
-163
lines changed

4 files changed

+210
-163
lines changed

.changes/next-release/bugfix-AmazonS3-263fed5.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"type": "bugfix",
33
"category": "Amazon S3",
44
"contributor": "",
5-
"description": "Fix bug in MultipartS3AsyncClient GET where retryable errors may not be retried, and if retried, successful responses are incorrectly processed with the initial error."
5+
"description": "Fix a bug in the Java based multipart client where retryable errors from getObject may not be retried correctly."
66
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
307307
}
308308
synchronized (cancelLock) {
309309
if (partNumber == 1) {
310+
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
310311
onStreamCalled = true;
311312
log.trace(() -> "calling onStream on the upstream transformer");
312313
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
@@ -317,9 +318,7 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
317318
);
318319
}
319320
}
320-
321-
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
322-
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response, partNumber));
321+
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response));
323322
}
324323

325324
@Override
@@ -347,13 +346,11 @@ class IndividualPartSubscriber<T> implements Subscriber<ByteBuffer> {
347346

348347
private final CompletableFuture<T> future;
349348
private final T response;
350-
private final int partNumber;
351349
private Subscription subscription;
352350

353-
IndividualPartSubscriber(CompletableFuture<T> future, T response, int partNumber) {
351+
IndividualPartSubscriber(CompletableFuture<T> future, T response) {
354352
this.future = future;
355353
this.response = response;
356-
this.partNumber = partNumber;
357354
}
358355

359356
@Override
@@ -393,9 +390,7 @@ public void onComplete() {
393390
}
394391

395392
private void handleError(Throwable t) {
396-
if (partNumber > 1) {
397-
publisherToUpstream.error(t);
398-
}
393+
publisherToUpstream.error(t);
399394
future.completeExceptionally(t);
400395
}
401396
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public Subscriber<ByteBuffer> createSubscriber(WhiteboxSubscriberProbe<ByteBuffe
4444
.maximumBufferSizeInBytes(32L)
4545
.resultFuture(new CompletableFuture<>())
4646
.build();
47-
return transformer.new IndividualPartSubscriber<ByteBuffer>(future, ByteBuffer.wrap(new byte[0]), 1) {
47+
return transformer.new IndividualPartSubscriber<ByteBuffer>(future, ByteBuffer.wrap(new byte[0])) {
4848
@Override
4949
public void onSubscribe(Subscription s) {
5050
super.onSubscribe(s);

0 commit comments

Comments
 (0)