Skip to content

Commit 8f3a1ac

Browse files
zoewanggZoe Wang
authored andcommitted
Introduce a new split method in AsyncRequestBody that returns an SdkPublisher of ClosableAsyncRequestBody and use it in s3 multipart client
1 parent 40cedd3 commit 8f3a1ac

18 files changed

+442
-275
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import software.amazon.awssdk.annotations.SdkPublicApi;
3434
import software.amazon.awssdk.core.FileRequestBodyConfiguration;
3535
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
36+
import software.amazon.awssdk.core.internal.async.ClosableAsyncRequestBodyAdaptor;
3637
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3738
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
3839
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
@@ -507,17 +508,33 @@ static AsyncRequestBody empty() {
507508
* is 2MB and the default buffer size is 8MB.
508509
*
509510
* <p>
510-
* By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is
511-
* delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after
512-
* the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger
513-
* than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this
514-
* interface overrides this method.
511+
* Each divided {@link AsyncRequestBody} is sent after the entire content for that chunk is buffered.
515512
*
516513
* @see AsyncRequestBodySplitConfiguration
514+
* @deprecated Use {@link #splitV2(AsyncRequestBodySplitConfiguration)} instead.
517515
*/
516+
@Deprecated
518517
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
519518
Validate.notNull(splitConfiguration, "splitConfiguration");
519+
return splitV2(splitConfiguration).map(body -> new ClosableAsyncRequestBodyAdaptor(body));
520+
}
520521

522+
/**
523+
* Converts this {@link AsyncRequestBody} to a publisher of {@link ClosableAsyncRequestBody}s, each of which publishes
524+
* specific portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk
525+
* size is 2MB and the default buffer size is 8MB.
526+
*
527+
* <p>
528+
* Each divided {@link ClosableAsyncRequestBody} is sent after the entire content for that chunk is buffered. This behavior
529+
* may be different if a specific implementation of this interface overrides this method.
530+
*
531+
* <p>
532+
* Each {@link ClosableAsyncRequestBody} MUST be closed by the user when it is ready to be disposed.
533+
*
534+
* @see AsyncRequestBodySplitConfiguration
535+
*/
536+
default SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
537+
Validate.notNull(splitConfiguration, "splitConfiguration");
521538
return new SplittingPublisher(this, splitConfiguration);
522539
}
523540

@@ -526,12 +543,26 @@ default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration
526543
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
527544
*
528545
* @see #split(AsyncRequestBodySplitConfiguration)
546+
* @deprecated Use {@link #splitV2(Consumer)} instead.
529547
*/
548+
@Deprecated
530549
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
531550
Validate.notNull(splitConfiguration, "splitConfiguration");
532551
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
533552
}
534553

554+
/**
555+
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
556+
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
557+
*
558+
* @see #splitV2(Consumer)
559+
*/
560+
default SdkPublisher<ClosableAsyncRequestBody> splitV2(
561+
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
562+
Validate.notNull(splitConfiguration, "splitConfiguration");
563+
return splitV2(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
564+
}
565+
535566
@SdkProtectedApi
536567
enum BodyType {
537568
FILE("File", "f"),
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import software.amazon.awssdk.annotations.SdkPublicApi;
19+
import software.amazon.awssdk.utils.SdkAutoCloseable;
20+
21+
/**
22+
* An extension of {@link AsyncRequestBody} that is closable.
23+
*/
24+
@SdkPublicApi
25+
public interface ClosableAsyncRequestBody extends AsyncRequestBody, SdkAutoCloseable {
26+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody, SdkA
7676
private final Object lock = new Object();
7777
private boolean closed;
7878

79-
private ByteBuffersAsyncRequestBody(String mimetype, Long length, List<ByteBuffer> buffers) {
79+
private ByteBuffersAsyncRequestBody(String mimetype,
80+
Long length,
81+
List<ByteBuffer> buffers) {
8082
this.mimetype = mimetype;
8183
this.buffers = buffers;
8284
this.length = length;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import org.reactivestreams.Subscriber;
22+
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.core.async.AsyncRequestBody;
24+
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
25+
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
26+
import software.amazon.awssdk.core.exception.NonRetryableException;
27+
import software.amazon.awssdk.core.internal.util.NoopSubscription;
28+
29+
/**
30+
* Adaptor to convert a {@link ClosableAsyncRequestBody} to an {@link AsyncRequestBody}
31+
*
32+
* <p>
33+
* This is needed to maintain backwards compatibility for the deprecated
34+
* {@link AsyncRequestBody#split(AsyncRequestBodySplitConfiguration)}
35+
*/
36+
@SdkInternalApi
37+
public final class ClosableAsyncRequestBodyAdaptor implements AsyncRequestBody {
38+
39+
private final AtomicBoolean subscribeCalled;
40+
private final ClosableAsyncRequestBody delegate;
41+
42+
public ClosableAsyncRequestBodyAdaptor(ClosableAsyncRequestBody delegate) {
43+
this.delegate = delegate;
44+
subscribeCalled = new AtomicBoolean(false);
45+
}
46+
47+
@Override
48+
public Optional<Long> contentLength() {
49+
return delegate.contentLength();
50+
}
51+
52+
@Override
53+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
54+
if (subscribeCalled.compareAndSet(false, true)) {
55+
delegate.doAfterOnComplete(() -> delegate.close())
56+
.doAfterOnCancel(() -> delegate.close())
57+
.doAfterOnError(t -> delegate.close())
58+
.subscribe(subscriber);
59+
} else {
60+
subscriber.onSubscribe(new NoopSubscription(subscriber));
61+
subscriber.onError(NonRetryableException.create(
62+
"A retry was attempted, but AsyncRequestBody.split does not "
63+
+ "support retries."));
64+
}
65+
}
66+
67+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import software.amazon.awssdk.annotations.SdkInternalApi;
3535
import software.amazon.awssdk.core.async.AsyncRequestBody;
3636
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
37+
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
3738
import software.amazon.awssdk.core.async.SdkPublisher;
3839
import software.amazon.awssdk.core.internal.util.Mimetype;
3940
import software.amazon.awssdk.core.internal.util.NoopSubscription;
@@ -86,6 +87,11 @@ public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration s
8687
return new FileAsyncRequestBodySplitHelper(this, splitConfiguration).split();
8788
}
8889

90+
@Override
91+
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
92+
return split(splitConfiguration).map(body -> new ClosableAsyncRequestBodyWrapper(body));
93+
}
94+
8995
public Path path() {
9096
return path;
9197
}
@@ -436,4 +442,26 @@ private void signalOnError(Throwable t) {
436442
private static AsynchronousFileChannel openInputChannel(Path path) throws IOException {
437443
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
438444
}
445+
446+
private static class ClosableAsyncRequestBodyWrapper implements ClosableAsyncRequestBody {
447+
private final AsyncRequestBody body;
448+
449+
ClosableAsyncRequestBodyWrapper(AsyncRequestBody body) {
450+
this.body = body;
451+
}
452+
453+
@Override
454+
public Optional<Long> contentLength() {
455+
return body.contentLength();
456+
}
457+
458+
@Override
459+
public void subscribe(Subscriber<? super ByteBuffer> s) {
460+
body.subscribe(s);
461+
}
462+
463+
@Override
464+
public void close() {
465+
}
466+
}
439467
}

0 commit comments

Comments
 (0)