Skip to content

Stream retry support part 2: Introduce a new split method in AsyncRequestBody that returns an SdkP… #6346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: feature/master/mpu-stream-retry
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.FileRequestBodyConfiguration;
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.ClosableAsyncRequestBodyAdaptor;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
Expand Down Expand Up @@ -507,17 +508,33 @@ static AsyncRequestBody empty() {
* is 2MB and the default buffer size is 8MB.
*
* <p>
* By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is
* delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after
* the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger
* than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this
* interface overrides this method.
* Each divided {@link AsyncRequestBody} is sent after the entire content for that chunk is buffered.
*
* @see AsyncRequestBodySplitConfiguration
* @deprecated Use {@link #splitV2(AsyncRequestBodySplitConfiguration)} instead.
*/
@Deprecated
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
Validate.notNull(splitConfiguration, "splitConfiguration");
return splitV2(splitConfiguration).map(body -> new ClosableAsyncRequestBodyAdaptor(body));
}

/**
* Converts this {@link AsyncRequestBody} to a publisher of {@link ClosableAsyncRequestBody}s, each of which publishes
* specific portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk
* size is 2MB and the default buffer size is 8MB.
*
* <p>
* Each divided {@link ClosableAsyncRequestBody} is sent after the entire content for that chunk is buffered. This behavior
* may be different if a specific implementation of this interface overrides this method.
*
* <p>
* Each {@link ClosableAsyncRequestBody} MUST be closed by the user when it is ready to be disposed.
*
* @see AsyncRequestBodySplitConfiguration
*/
default SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
Validate.notNull(splitConfiguration, "splitConfiguration");
return new SplittingPublisher(this, splitConfiguration);
}

Expand All @@ -526,12 +543,26 @@ default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
*
* @see #split(AsyncRequestBodySplitConfiguration)
* @deprecated Use {@link #splitV2(Consumer)} instead.
*/
@Deprecated
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
Validate.notNull(splitConfiguration, "splitConfiguration");
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
}

/**
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
*
* @see #splitV2(Consumer)
*/
default SdkPublisher<ClosableAsyncRequestBody> splitV2(
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
Validate.notNull(splitConfiguration, "splitConfiguration");
return splitV2(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
}

@SdkProtectedApi
enum BodyType {
FILE("File", "f"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.async;

import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.SdkAutoCloseable;

/**
* An extension of {@link AsyncRequestBody} that is closable.
*/
@SdkPublicApi
public interface ClosableAsyncRequestBody extends AsyncRequestBody, SdkAutoCloseable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
Expand Down Expand Up @@ -76,6 +77,17 @@ public SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfig
return delegate.split(splitConfiguration);
}

@Override
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
return delegate.splitV2(splitConfiguration);
}

@Override
public SdkPublisher<ClosableAsyncRequestBody> splitV2(
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
return delegate.splitV2(splitConfiguration);
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
invoke(() -> listener.publisherSubscribe(s), "publisherSubscribe");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody, SdkA
private final Object lock = new Object();
private boolean closed;

private ByteBuffersAsyncRequestBody(String mimetype, Long length, List<ByteBuffer> buffers) {
private ByteBuffersAsyncRequestBody(String mimetype,
Long length,
List<ByteBuffer> buffers) {
this.mimetype = mimetype;
this.buffers = buffers;
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.internal.util.NoopSubscription;

/**
* Adaptor to convert a {@link ClosableAsyncRequestBody} to an {@link AsyncRequestBody}
*
* <p>
* This is needed to maintain backwards compatibility for the deprecated
* {@link AsyncRequestBody#split(AsyncRequestBodySplitConfiguration)}
*/
@SdkInternalApi
public final class ClosableAsyncRequestBodyAdaptor implements AsyncRequestBody {

private final AtomicBoolean subscribeCalled;
private final ClosableAsyncRequestBody delegate;

public ClosableAsyncRequestBodyAdaptor(ClosableAsyncRequestBody delegate) {
this.delegate = delegate;
subscribeCalled = new AtomicBoolean(false);
}

@Override
public Optional<Long> contentLength() {
return delegate.contentLength();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
if (subscribeCalled.compareAndSet(false, true)) {
delegate.doAfterOnComplete(() -> delegate.close())
.doAfterOnCancel(() -> delegate.close())
.doAfterOnError(t -> delegate.close())
.subscribe(subscriber);
} else {
subscriber.onSubscribe(new NoopSubscription(subscriber));
subscriber.onError(NonRetryableException.create(
"A retry was attempted, but AsyncRequestBody.split does not "
+ "support retries."));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
Expand Down Expand Up @@ -86,6 +87,11 @@ public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration s
return new FileAsyncRequestBodySplitHelper(this, splitConfiguration).split();
}

@Override
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
return split(splitConfiguration).map(body -> new ClosableAsyncRequestBodyWrapper(body));
}

public Path path() {
return path;
}
Expand Down Expand Up @@ -436,4 +442,26 @@ private void signalOnError(Throwable t) {
private static AsynchronousFileChannel openInputChannel(Path path) throws IOException {
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
}

private static class ClosableAsyncRequestBodyWrapper implements ClosableAsyncRequestBody {
private final AsyncRequestBody body;

ClosableAsyncRequestBodyWrapper(AsyncRequestBody body) {
this.body = body;
}

@Override
public Optional<Long> contentLength() {
return body.contentLength();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
body.subscribe(s);
}

@Override
public void close() {
}
}
}
Loading
Loading