-
Notifications
You must be signed in to change notification settings - Fork 930
Retry support for Java-based S3 multipart client for multipart GET to bytes #6328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 13 commits
7c279ef
00e97c1
24aedc6
49c7ef2
78058b1
76d3277
f386bfe
7bae1ed
292df0c
1d2dc6c
523b2bd
76281d6
1d136e5
c381f8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"type": "feature", | ||
"category": "Amazon S3", | ||
"contributor": "", | ||
"description": "Add retry support for Java based S3 multipart client download to Byte array" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.core.internal.async; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.core.ResponseBytes; | ||
import software.amazon.awssdk.core.async.AsyncResponseTransformer; | ||
import software.amazon.awssdk.core.async.SdkPublisher; | ||
import software.amazon.awssdk.core.exception.SdkClientException; | ||
import software.amazon.awssdk.utils.CompletableFutureUtils; | ||
import software.amazon.awssdk.utils.Logger; | ||
import software.amazon.awssdk.utils.Validate; | ||
import software.amazon.awssdk.utils.async.SimplePublisher; | ||
|
||
@SdkInternalApi | ||
public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add tck test? |
||
private static final Logger log = Logger.loggerFor(ByteArraySplittingTransformer.class); | ||
private final AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> upstreamResponseTransformer; | ||
private final CompletableFuture<ResponseBytes<ResponseT>> resultFuture; | ||
private Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> downstreamSubscriber; | ||
private final AtomicInteger onNextSignalsSent = new AtomicInteger(0); | ||
private final AtomicReference<ResponseT> responseT = new AtomicReference<>(); | ||
|
||
private final SimplePublisher<ByteBuffer> publisherToUpstream = new SimplePublisher<>(); | ||
/** | ||
* The amount requested by the downstream subscriber that is still left to fulfill. Updated when the | ||
* {@link Subscription#request(long) request} method is called on the downstream subscriber's subscription. Corresponds to the | ||
* number of {@code AsyncResponseTransformer} that will be published to the downstream subscriber. | ||
*/ | ||
private final AtomicLong outstandingDemand = new AtomicLong(0); | ||
|
||
/** | ||
* This flag stops the current thread from publishing transformers while another thread is already publishing. | ||
*/ | ||
private final AtomicBoolean emitting = new AtomicBoolean(false); | ||
|
||
private final Object lock = new Object(); | ||
|
||
/** | ||
* Set to true once {@code .onStream()} is called on the upstreamResponseTransformer | ||
*/ | ||
private boolean onStreamCalled; | ||
|
||
/** | ||
* Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the | ||
* {@code resultFuture} is cancelled. | ||
*/ | ||
private final AtomicBoolean isCancelled = new AtomicBoolean(false); | ||
|
||
private final Map<Integer, ByteBuffer> buffers; | ||
|
||
/** | ||
* The buffer size used to buffer the content received from the downstream subscriber | ||
*/ | ||
private final long maximumBufferInBytes; | ||
|
||
public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> | ||
upstreamResponseTransformer, | ||
CompletableFuture<ResponseBytes<ResponseT>> resultFuture, | ||
Long maximumBufferSizeInBytes) { | ||
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); | ||
this.maximumBufferInBytes = Validate.isPositive( | ||
maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); | ||
this.upstreamResponseTransformer = upstreamResponseTransformer; | ||
this.resultFuture = resultFuture; | ||
this.buffers = new ConcurrentHashMap<>(); | ||
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> subscriber) { | ||
this.downstreamSubscriber = subscriber; | ||
subscriber.onSubscribe(new DownstreamSubscription()); | ||
} | ||
|
||
private final class DownstreamSubscription implements Subscription { | ||
|
||
@Override | ||
public void request(long n) { | ||
if (n <= 0) { | ||
downstreamSubscriber.onError(new IllegalArgumentException("Amount requested must be positive")); | ||
return; | ||
} | ||
long newDemand = outstandingDemand.updateAndGet(current -> { | ||
if (Long.MAX_VALUE - current < n) { | ||
return Long.MAX_VALUE; | ||
} | ||
return current + n; | ||
}); | ||
log.trace(() -> String.format("new outstanding demand: %s", newDemand)); | ||
emit(); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
log.trace(() -> String.format("received cancel signal. Current cancel state is 'isCancelled=%s'", isCancelled.get())); | ||
if (isCancelled.compareAndSet(false, true)) { | ||
handleSubscriptionCancel(); | ||
} | ||
} | ||
} | ||
|
||
private void emit() { | ||
do { | ||
if (!emitting.compareAndSet(false, true)) { | ||
return; | ||
} | ||
try { | ||
if (doEmit()) { | ||
return; | ||
} | ||
} finally { | ||
emitting.compareAndSet(true, false); | ||
} | ||
} while (outstandingDemand.get() > 0); | ||
} | ||
|
||
private boolean doEmit() { | ||
long demand = outstandingDemand.get(); | ||
|
||
while (demand > 0) { | ||
if (isCancelled.get()) { | ||
return true; | ||
} | ||
if (outstandingDemand.get() > 0) { | ||
demand = outstandingDemand.decrementAndGet(); | ||
downstreamSubscriber.onNext(new IndividualTransformer(onNextSignalsSent.incrementAndGet())); | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream | ||
* transformer need to finish processing before we complete. One typical use case for this is completing the multipart | ||
* download, the subscriber having reached the final part will signal that it doesn't need more | ||
* {@link AsyncResponseTransformer}s by calling {@code .cancel()} on the subscription. | ||
*/ | ||
private void handleSubscriptionCancel() { | ||
synchronized (lock) { | ||
if (downstreamSubscriber == null) { | ||
log.trace(() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()"); | ||
return; | ||
} | ||
if (!onStreamCalled) { | ||
// we never subscribe publisherToUpstream to the upstream, it would not complete | ||
downstreamSubscriber = null; | ||
return; | ||
} | ||
|
||
// if result future is already complete (likely by exception propagation), skip. | ||
if (resultFuture.isDone()) { | ||
return; | ||
} | ||
|
||
CompletableFuture<ResponseBytes<ResponseT>> upstreamPrepareFuture = upstreamResponseTransformer.prepare(); | ||
CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture); | ||
|
||
upstreamResponseTransformer.onResponse(responseT.get()); | ||
|
||
try { | ||
buffers.keySet().stream().sorted().forEach(index -> { | ||
publisherToUpstream.send(buffers.get(index)).exceptionally(ex -> { | ||
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", ex)); | ||
return null; | ||
}); | ||
}); | ||
|
||
publisherToUpstream.complete().exceptionally(ex -> { | ||
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", ex)); | ||
return null; | ||
}); | ||
upstreamResponseTransformer.onStream(SdkPublisher.adapt(publisherToUpstream)); | ||
|
||
} catch (Throwable throwable) { | ||
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", throwable)); | ||
} | ||
} | ||
} | ||
|
||
private final class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> { | ||
private final int onNextCount; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kind of like partNumber; I'd argue that "part" is not an S3 specific term :) |
||
private final ByteArrayAsyncResponseTransformer<ResponseT> delegate = new ByteArrayAsyncResponseTransformer<>(); | ||
|
||
private CompletableFuture<ResponseT> future; | ||
private final List<CompletableFuture<ResponseBytes<ResponseT>>> delegatePrepareFutures = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like delegatePrepareFutures is not needed? |
||
|
||
private IndividualTransformer(int onNextCount) { | ||
this.onNextCount = onNextCount; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<ResponseT> prepare() { | ||
future = new CompletableFuture<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this future is not needed? |
||
CompletableFuture<ResponseBytes<ResponseT>> prepare = delegate.prepare(); | ||
CompletableFutureUtils.forwardExceptionTo(prepare, future); | ||
delegatePrepareFutures.add(prepare); | ||
return prepare.thenApply(responseTResponseBytes -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is my bad, maybe just |
||
buffers.put(onNextCount, responseTResponseBytes.asByteBuffer()); | ||
return responseTResponseBytes.response(); | ||
}); | ||
} | ||
|
||
@Override | ||
public void onResponse(ResponseT response) { | ||
responseT.compareAndSet(null, response); | ||
delegate.onResponse(response); | ||
} | ||
|
||
@Override | ||
public void onStream(SdkPublisher<ByteBuffer> publisher) { | ||
delegate.onStream(publisher, maximumBufferInBytes); | ||
synchronized (lock) { | ||
if (!onStreamCalled) { | ||
onStreamCalled = true; | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void exceptionOccurred(Throwable error) { | ||
delegate.exceptionOccurred(error); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import software.amazon.awssdk.annotations.SdkPublicApi; | ||
import software.amazon.awssdk.core.async.AsyncRequestBody; | ||
import software.amazon.awssdk.core.async.AsyncResponseTransformer; | ||
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; | ||
import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; | ||
import software.amazon.awssdk.services.s3.model.CopyObjectRequest; | ||
|
@@ -28,15 +29,21 @@ | |
import software.amazon.awssdk.utils.builder.ToCopyableBuilder; | ||
|
||
/** | ||
* Class that hold configuration properties related to multipart operation for a {@link S3AsyncClient}. Passing this class to the | ||
* {@link S3AsyncClientBuilder#multipartConfiguration(MultipartConfiguration)} will enable automatic conversion of | ||
* {@link S3AsyncClient#getObject(GetObjectRequest, AsyncResponseTransformer)}, | ||
* {@link S3AsyncClient#putObject(PutObjectRequest, AsyncRequestBody)} and | ||
* {@link S3AsyncClient#copyObject(CopyObjectRequest)} to their respective multipart operation. | ||
* Class that holds configuration properties related to multipart operations for a {@link S3AsyncClient}. Passing this class to | ||
* the {@link S3AsyncClientBuilder#multipartConfiguration(MultipartConfiguration)} will enable automatic conversion of the | ||
* following operations to their respective multipart variants: | ||
* <ul> | ||
* <li>{@link S3AsyncClient#getObject(GetObjectRequest, AsyncResponseTransformer)}, | ||
* <li>{@link S3AsyncClient#putObject(PutObjectRequest, AsyncRequestBody)} | ||
* <li>{@link S3AsyncClient#copyObject(CopyObjectRequest)} | ||
* </ul> | ||
* <p> | ||
* Note that multipart download fetch individual part of the object using {@link GetObjectRequest#partNumber() part number}, this | ||
* means it will only download multiple parts if the | ||
* object itself was uploaded as a {@link S3AsyncClient#createMultipartUpload(CreateMultipartUploadRequest) multipart object} | ||
* Note that multipart download fetches individual parts of the object using {@link GetObjectRequest#partNumber() PartNumber}. | ||
* This means the S3 client will only download multiple parts if the object itself was uploaded as a | ||
* {@link S3AsyncClient#createMultipartUpload(CreateMultipartUploadRequest) multipart object} | ||
* <p> | ||
* When performing multipart download, retry is only supported for downloading to byte array, i.e., when providing a | ||
* {@link ByteArrayAsyncResponseTransformer} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ByteArrayAsyncResponseTransformer is an internal API |
||
*/ | ||
@SdkPublicApi | ||
public final class MultipartConfiguration implements ToCopyableBuilder<MultipartConfiguration.Builder, MultipartConfiguration> { | ||
|
@@ -170,20 +177,24 @@ private static class DefaultMultipartConfigBuilder implements Builder { | |
private Long minimumPartSizeInBytes; | ||
private Long apiCallBufferSizeInBytes; | ||
|
||
@Override | ||
public Builder thresholdInBytes(Long thresholdInBytes) { | ||
this.thresholdInBytes = thresholdInBytes; | ||
return this; | ||
} | ||
|
||
@Override | ||
public Long thresholdInBytes() { | ||
return this.thresholdInBytes; | ||
} | ||
|
||
@Override | ||
public Builder minimumPartSizeInBytes(Long minimumPartSizeInBytes) { | ||
this.minimumPartSizeInBytes = minimumPartSizeInBytes; | ||
return this; | ||
} | ||
|
||
@Override | ||
public Long minimumPartSizeInBytes() { | ||
return this.minimumPartSizeInBytes; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -289,8 +289,8 @@ | |
"useS3ExpressSessionAuth": true, | ||
"multipartCustomization": { | ||
"multipartConfigurationClass": "software.amazon.awssdk.services.s3.multipart.MultipartConfiguration", | ||
"multipartConfigMethodDoc": "Configuration for multipart operation of this client.", | ||
"multipartEnableMethodDoc": "Enables automatic conversion of GET, PUT and COPY methods to their equivalent multipart operation. CRC32 checksum will be enabled for PUT, unless the checksum is specified or checksum validation is disabled.", | ||
"multipartConfigMethodDoc": "Configuration for multipart operation of this client.<p>When performing multipart download, retry is only supported for downloading to byte array, i.e., when providing a {@code ByteArrayAsyncResponseTransformer}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here |
||
"multipartEnableMethodDoc": "Enables automatic conversion of GET, PUT and COPY methods to their equivalent multipart operation. CRC32 checksum will be enabled for PUT, unless the checksum is specified or checksum validation is disabled.<p>When performing multipart download, retry is only supported for downloading to byte array, i.e., when providing a {@code ByteArrayAsyncResponseTransformer}", | ||
"contextParamEnabledKey": "S3AsyncClientDecorator.MULTIPART_ENABLED_KEY", | ||
"contextParamConfigKey": "S3AsyncClientDecorator.MULTIPART_CONFIGURATION_KEY" | ||
}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure if we want to add another onStream here, why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was thinking to use maximumBufferSize for large part sizes, but ByteArrayAsyncResponseTransformer loads everything into memory anyways so its redundant. Will revert this addition