Skip to content

Fix bug in MultipartS3AsyncClient GetObject Retryable Errors #6309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-263fed5.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon S3",
"contributor": "",
"description": "Fix MultipartS3AsyncClient GetObject retryable errors"
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,6 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
*/
private final AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer;

/**
* Set to true once {@code .prepare()} is called on the upstreamResponseTransformer
*/
private final AtomicBoolean preparedCalled = new AtomicBoolean(false);

/**
* Set to true once {@code .onResponse()} is called on the upstreamResponseTransformer
*/
private final AtomicBoolean onResponseCalled = new AtomicBoolean(false);

/**
* Set to true once {@code .onStream()} is called on the upstreamResponseTransformer
*/
Expand Down Expand Up @@ -111,6 +101,24 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As

private final Object cancelLock = new Object();

/**
* Keeps track of the upstream future returned by {@code upstreamResponseTransformer.prepare()}. If an error occurs, we
* forward this to the {@code resultFuture}.
*/
private volatile CompletableFuture<ResultT> upstreamFuture;

/**
* Tracks whether an {@code IndividualTransformer} has been created for the first part yet. Errors will only be retried for
* the first part.
*/
private final AtomicBoolean isFirstIndividualTransformer = new AtomicBoolean(true);

/**
* Tracks whether an {@code IndividualPartSubscriber} has been created for the first part yet. Errors will only be retried for
* the first part.
*/
private final AtomicBoolean isFirstIndividualSubscriber = new AtomicBoolean(true);

private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,
Long maximumBufferSizeInBytes,
CompletableFuture<ResultT> resultFuture) {
Expand Down Expand Up @@ -198,7 +206,7 @@ private boolean doEmit() {
}
if (outstandingDemand.get() > 0) {
demand = outstandingDemand.decrementAndGet();
downstreamSubscriber.onNext(new IndividualTransformer());
downstreamSubscriber.onNext(new IndividualTransformer(isFirstIndividualTransformer.compareAndSet(true, false)));
}
}
return false;
Expand Down Expand Up @@ -230,6 +238,7 @@ private void handleSubscriptionCancel() {
} else {
log.trace(() -> "calling downstreamSubscriber.onComplete()");
downstreamSubscriber.onComplete();
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
}
downstreamSubscriber = null;
});
Expand Down Expand Up @@ -259,28 +268,27 @@ private void handleFutureCancel(Throwable e) {
* body publisher.
*/
private class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> {
private final boolean isFirstPart;
private ResponseT response;
private CompletableFuture<ResponseT> individualFuture;

IndividualTransformer(boolean isFirstPart) {
this.isFirstPart = isFirstPart;
}

@Override
public CompletableFuture<ResponseT> prepare() {
this.individualFuture = new CompletableFuture<>();
if (preparedCalled.compareAndSet(false, true)) {

if (isFirstPart) {
if (isCancelled.get()) {
return individualFuture;
}
log.trace(() -> "calling prepare on the upstream transformer");
CompletableFuture<ResultT> upstreamFuture = upstreamResponseTransformer.prepare();
if (!resultFuture.isDone()) {
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
}
upstreamFuture = upstreamResponseTransformer.prepare();

}
resultFuture.whenComplete((r, e) -> {
if (e == null) {
return;
}
individualFuture.completeExceptionally(e);
});

individualFuture.whenComplete((r, e) -> {
if (isCancelled.get()) {
handleSubscriptionCancel();
Expand All @@ -291,7 +299,7 @@ public CompletableFuture<ResponseT> prepare() {

@Override
public void onResponse(ResponseT response) {
if (onResponseCalled.compareAndSet(false, true)) {
if (isFirstPart) {
log.trace(() -> "calling onResponse on the upstream transformer");
upstreamResponseTransformer.onResponse(response);
}
Expand All @@ -304,7 +312,8 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
return;
}
synchronized (cancelLock) {
if (onStreamCalled.compareAndSet(false, true)) {
if (isFirstPart) {
onStreamCalled.set(true);
log.trace(() -> "calling onStream on the upstream transformer");
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
DelegatingBufferingSubscriber.builder()
Expand All @@ -314,14 +323,23 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
);
}
}
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response));

if (!resultFuture.isDone()) {
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
}

publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response,
isFirstIndividualSubscriber.compareAndSet(true, false)));
}

@Override
public void exceptionOccurred(Throwable error) {
publisherToUpstream.error(error);
log.trace(() -> "calling exceptionOccurred on the upstream transformer");
upstreamResponseTransformer.exceptionOccurred(error);

if (!isFirstPart || onStreamCalled.get()) {
publisherToUpstream.error(error);
}
}
}

Expand All @@ -332,11 +350,13 @@ class IndividualPartSubscriber<T> implements Subscriber<ByteBuffer> {

private final CompletableFuture<T> future;
private final T response;
private final boolean isFirstPart;
private Subscription subscription;

IndividualPartSubscriber(CompletableFuture<T> future, T response) {
IndividualPartSubscriber(CompletableFuture<T> future, T response, boolean isFirstPart) {
this.future = future;
this.response = response;
this.isFirstPart = isFirstPart;
}

@Override
Expand Down Expand Up @@ -376,7 +396,9 @@ public void onComplete() {
}

private void handleError(Throwable t) {
publisherToUpstream.error(t);
if (!isFirstPart) {
publisherToUpstream.error(t);
}
future.completeExceptionally(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
Expand All @@ -45,7 +44,7 @@ public Subscriber<ByteBuffer> createSubscriber(WhiteboxSubscriberProbe<ByteBuffe
.maximumBufferSizeInBytes(32L)
.resultFuture(new CompletableFuture<>())
.build();
return transformer.new IndividualPartSubscriber<ByteBuffer>(future, ByteBuffer.wrap(new byte[0])) {
return transformer.new IndividualPartSubscriber<ByteBuffer>(future, ByteBuffer.wrap(new byte[0]), true) {
@Override
public void onSubscribe(Subscription s) {
super.onSubscribe(s);
Expand Down
6 changes: 6 additions & 0 deletions services/s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@
<version>${awsjavasdk.version}</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>retries</artifactId>
<scope>test</scope>
<version>${awsjavasdk.version}</version>
</dependency>
<dependency>
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;

@SdkInternalApi
Expand All @@ -49,6 +50,7 @@ public <T> CompletableFuture<T> downloadObject(
.build());
MultipartDownloaderSubscriber subscriber = subscriber(getObjectRequest);
split.publisher().subscribe(subscriber);
CompletableFutureUtils.forwardExceptionTo(subscriber.future(), split.resultFuture());
return split.resultFuture();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.s3.internal.multipart;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -79,6 +81,8 @@ public class MultipartDownloaderSubscriber implements Subscriber<AsyncResponseTr
*/
private final Object lock = new Object();

private final List<CompletableFuture<GetObjectResponse>> getObjectFutures = new ArrayList<>();

public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest) {
this(s3, getObjectRequest, 0);
}
Expand Down Expand Up @@ -119,6 +123,7 @@ public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse
GetObjectRequest actualRequest = nextRequest(nextPartToGet);
log.debug(() -> "Sending GetObjectRequest for next part with partNumber=" + nextPartToGet);
CompletableFuture<GetObjectResponse> getObjectFuture = s3.getObject(actualRequest, asyncResponseTransformer);
getObjectFutures.add(getObjectFuture);
getObjectFuture.whenComplete((response, error) -> {
if (error != null) {
log.debug(() -> "Error encountered during GetObjectRequest with partNumber=" + nextPartToGet);
Expand Down Expand Up @@ -166,6 +171,7 @@ private void requestMoreIfNeeded(GetObjectResponse response) {

@Override
public void onError(Throwable t) {
getObjectFutures.forEach(future -> future.cancel(true));
future.completeExceptionally(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
import software.amazon.awssdk.utils.Validate;

/**
* An {@link S3AsyncClient} that automatically converts PUT, COPY requests to their respective multipart call. CRC32 will be
* enabled for the PUT and COPY requests, unless the the checksum is specified or checksum validation is disabled.
* Note: GET is not yet supported.
* An {@link S3AsyncClient} that automatically converts PUT, COPY, and GET requests to their respective multipart call. CRC32
* will be enabled for the requests, unless the checksum is specified or checksum validation is disabled.
*
* @see MultipartConfiguration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,32 +98,6 @@ <T> void happyPath_shouldReceiveAllBodyPartInCorrectOrder(AsyncResponseTransform
util.verifyCorrectAmountOfRequestsMade(amountOfPartToTest);
}

@ParameterizedTest
@MethodSource("argumentsProvider")
<T> void errorOnFirstRequest_shouldCompleteExceptionally(AsyncResponseTransformerTestSupplier<T> supplier,
int amountOfPartToTest,
int partSize) {
stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", testBucket, testKey))).willReturn(
aResponse()
.withStatus(400)
.withBody("<Error><Code>400</Code><Message>test error message</Message></Error>")));
AsyncResponseTransformer<GetObjectResponse, T> transformer = supplier.transformer();
AsyncResponseTransformer.SplitResult<GetObjectResponse, T> split = transformer.split(
SplittingTransformerConfiguration.builder()
.bufferSizeInBytes(1024 * 32L)
.build());
Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> subscriber = new MultipartDownloaderSubscriber(
s3AsyncClient,
GetObjectRequest.builder()
.bucket(testBucket)
.key(testKey)
.build());

split.publisher().subscribe(subscriber);
assertThatThrownBy(() -> split.resultFuture().join())
.hasMessageContaining("test error message");
}

@ParameterizedTest
@MethodSource("argumentsProvider")
<T> void errorOnThirdRequest_shouldCompleteExceptionallyOnlyPartsGreaterThanTwo(
Expand Down
Loading
Loading