Skip to content

Fix bug in MultipartS3AsyncClient GetObject Retryable Errors #6309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-263fed5.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon S3",
"contributor": "",
"description": "Fix a bug in the Java based multipart client where retryable errors from getObject may not be retried correctly."
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -54,20 +55,10 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
*/
private final AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer;

/**
* Set to true once {@code .prepare()} is called on the upstreamResponseTransformer
*/
private final AtomicBoolean preparedCalled = new AtomicBoolean(false);

/**
* Set to true once {@code .onResponse()} is called on the upstreamResponseTransformer
*/
private final AtomicBoolean onResponseCalled = new AtomicBoolean(false);

/**
* Set to true once {@code .onStream()} is called on the upstreamResponseTransformer
*/
private final AtomicBoolean onStreamCalled = new AtomicBoolean(false);
private boolean onStreamCalled;

/**
* Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the
Expand Down Expand Up @@ -111,6 +102,17 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As

private final Object cancelLock = new Object();

/**
* Keeps track of the upstream future returned by {@code upstreamResponseTransformer.prepare()}. If an error occurs, we
* forward this to the {@code resultFuture}.
*/
private volatile CompletableFuture<ResultT> upstreamFuture;

/**
* Tracks the part number. Errors will only be retried for the first part.
*/
private final AtomicInteger partNumber = new AtomicInteger(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT (non-blocking): I think we should avoid referring to 'parts' in the SplittingTranformer, that's a s3 concept, when this class is more abstract, in a core package. Maybe rename to something like 'onNextSignalsSent' or onNextNumber which would track the total amount of 'onNext' signals sent to the downstreamSubscriber


private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,
Long maximumBufferSizeInBytes,
CompletableFuture<ResultT> resultFuture) {
Expand Down Expand Up @@ -198,7 +200,7 @@ private boolean doEmit() {
}
if (outstandingDemand.get() > 0) {
demand = outstandingDemand.decrementAndGet();
downstreamSubscriber.onNext(new IndividualTransformer());
downstreamSubscriber.onNext(new IndividualTransformer(partNumber.incrementAndGet()));
}
}
return false;
Expand All @@ -216,7 +218,7 @@ private void handleSubscriptionCancel() {
log.trace(() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()");
return;
}
if (!onStreamCalled.get()) {
if (!onStreamCalled) {
// we never subscribe publisherToUpstream to the upstream, it would not complete
downstreamSubscriber = null;
return;
Expand All @@ -230,6 +232,7 @@ private void handleSubscriptionCancel() {
} else {
log.trace(() -> "calling downstreamSubscriber.onComplete()");
downstreamSubscriber.onComplete();
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
}
downstreamSubscriber = null;
});
Expand Down Expand Up @@ -259,28 +262,27 @@ private void handleFutureCancel(Throwable e) {
* body publisher.
*/
private class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> {
private final int partNumber;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: same here, avoid referring to s3 related terms

private ResponseT response;
private CompletableFuture<ResponseT> individualFuture;

IndividualTransformer(int partNumber) {
this.partNumber = partNumber;
}

@Override
public CompletableFuture<ResponseT> prepare() {
this.individualFuture = new CompletableFuture<>();
if (preparedCalled.compareAndSet(false, true)) {

if (partNumber == 1) {
if (isCancelled.get()) {
return individualFuture;
}
log.trace(() -> "calling prepare on the upstream transformer");
CompletableFuture<ResultT> upstreamFuture = upstreamResponseTransformer.prepare();
if (!resultFuture.isDone()) {
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
}
upstreamFuture = upstreamResponseTransformer.prepare();

}
resultFuture.whenComplete((r, e) -> {
if (e == null) {
return;
}
individualFuture.completeExceptionally(e);
});

individualFuture.whenComplete((r, e) -> {
if (isCancelled.get()) {
handleSubscriptionCancel();
Expand All @@ -291,7 +293,7 @@ public CompletableFuture<ResponseT> prepare() {

@Override
public void onResponse(ResponseT response) {
if (onResponseCalled.compareAndSet(false, true)) {
if (partNumber == 1) {
log.trace(() -> "calling onResponse on the upstream transformer");
upstreamResponseTransformer.onResponse(response);
}
Expand All @@ -304,7 +306,9 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
return;
}
synchronized (cancelLock) {
if (onStreamCalled.compareAndSet(false, true)) {
if (partNumber == 1) {
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
onStreamCalled = true;
log.trace(() -> "calling onStream on the upstream transformer");
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
DelegatingBufferingSubscriber.builder()
Expand All @@ -319,9 +323,19 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {

@Override
public void exceptionOccurred(Throwable error) {
publisherToUpstream.error(error);
log.trace(() -> "calling exceptionOccurred on the upstream transformer");
upstreamResponseTransformer.exceptionOccurred(error);
if (partNumber == 1) {
log.trace(() -> "calling exceptionOccurred on the upstream transformer");
upstreamResponseTransformer.exceptionOccurred(error);
}

// Invoking publisherToUpstream.error() essentially fails the request immediately. We should only call this if
// 1) The part number is greater than 1, since we want to retry errors on the first part OR 2) onStream() has
// already been invoked and data has started to be written
synchronized (cancelLock) {
if (partNumber > 1 || onStreamCalled) {
publisherToUpstream.error(error);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
Expand Down
6 changes: 6 additions & 0 deletions services/s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@
<version>${awsjavasdk.version}</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>retries</artifactId>
<scope>test</scope>
<version>${awsjavasdk.version}</version>
</dependency>
<dependency>
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;

@SdkInternalApi
Expand All @@ -49,6 +50,7 @@ public <T> CompletableFuture<T> downloadObject(
.build());
MultipartDownloaderSubscriber subscriber = subscriber(getObjectRequest);
split.publisher().subscribe(subscriber);
CompletableFutureUtils.forwardExceptionTo(subscriber.future(), split.resultFuture());
return split.resultFuture();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

package software.amazon.awssdk.services.s3.internal.multipart;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -79,6 +81,8 @@ public class MultipartDownloaderSubscriber implements Subscriber<AsyncResponseTr
*/
private final Object lock = new Object();

private final Queue<CompletableFuture<GetObjectResponse>> getObjectFutures = new ConcurrentLinkedQueue<>();

public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest) {
this(s3, getObjectRequest, 0);
}
Expand Down Expand Up @@ -119,6 +123,7 @@ public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse
GetObjectRequest actualRequest = nextRequest(nextPartToGet);
log.debug(() -> "Sending GetObjectRequest for next part with partNumber=" + nextPartToGet);
CompletableFuture<GetObjectResponse> getObjectFuture = s3.getObject(actualRequest, asyncResponseTransformer);
getObjectFutures.add(getObjectFuture);
getObjectFuture.whenComplete((response, error) -> {
if (error != null) {
log.debug(() -> "Error encountered during GetObjectRequest with partNumber=" + nextPartToGet);
Expand Down Expand Up @@ -166,6 +171,10 @@ private void requestMoreIfNeeded(GetObjectResponse response) {

@Override
public void onError(Throwable t) {
CompletableFuture<GetObjectResponse> partFuture;
while ((partFuture = getObjectFutures.poll()) != null) {
partFuture.cancel(true);
}
future.completeExceptionally(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
import software.amazon.awssdk.utils.Validate;

/**
* An {@link S3AsyncClient} that automatically converts PUT, COPY requests to their respective multipart call. CRC32 will be
* enabled for the PUT and COPY requests, unless the the checksum is specified or checksum validation is disabled.
* Note: GET is not yet supported.
* An {@link S3AsyncClient} that automatically converts PUT, COPY, and GET requests to their respective multipart call. CRC32
* will be enabled for the requests, unless the checksum is specified or checksum validation is disabled.
*
* @see MultipartConfiguration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,32 +98,6 @@ <T> void happyPath_shouldReceiveAllBodyPartInCorrectOrder(AsyncResponseTransform
util.verifyCorrectAmountOfRequestsMade(amountOfPartToTest);
}

@ParameterizedTest
@MethodSource("argumentsProvider")
<T> void errorOnFirstRequest_shouldCompleteExceptionally(AsyncResponseTransformerTestSupplier<T> supplier,
int amountOfPartToTest,
int partSize) {
stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", testBucket, testKey))).willReturn(
aResponse()
.withStatus(400)
.withBody("<Error><Code>400</Code><Message>test error message</Message></Error>")));
AsyncResponseTransformer<GetObjectResponse, T> transformer = supplier.transformer();
AsyncResponseTransformer.SplitResult<GetObjectResponse, T> split = transformer.split(
SplittingTransformerConfiguration.builder()
.bufferSizeInBytes(1024 * 32L)
.build());
Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> subscriber = new MultipartDownloaderSubscriber(
s3AsyncClient,
GetObjectRequest.builder()
.bucket(testBucket)
.key(testKey)
.build());

split.publisher().subscribe(subscriber);
assertThatThrownBy(() -> split.resultFuture().join())
.hasMessageContaining("test error message");
}

@ParameterizedTest
@MethodSource("argumentsProvider")
<T> void errorOnThirdRequest_shouldCompleteExceptionallyOnlyPartsGreaterThanTwo(
Expand Down
Loading
Loading