Skip to content

Commit 1d136e5

Browse files
committed
Use maximumBufferSize in ByteArraySplittingTransformer
1 parent 76281d6 commit 1d136e5

File tree

3 files changed

+23
-10
lines changed

3 files changed

+23
-10
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2929
import software.amazon.awssdk.core.async.SdkPublisher;
3030
import software.amazon.awssdk.utils.BinaryUtils;
31+
import software.amazon.awssdk.utils.Validate;
32+
import software.amazon.awssdk.utils.async.DelegatingBufferingSubscriber;
3133

3234
/**
3335
* Implementation of {@link AsyncResponseTransformer} that dumps content into a byte array and supports further
@@ -61,16 +63,24 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
6163
publisher.subscribe(new BaosSubscriber(cf));
6264
}
6365

66+
public void onStream(SdkPublisher<ByteBuffer> publisher, long maximumBufferSize) {
67+
publisher.subscribe(DelegatingBufferingSubscriber.builder()
68+
.maximumBufferInBytes(maximumBufferSize)
69+
.delegate(new BaosSubscriber(cf))
70+
.build());
71+
}
72+
6473
@Override
6574
public void exceptionOccurred(Throwable throwable) {
6675
cf.completeExceptionally(throwable);
6776
}
6877

6978
@Override
7079
public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(SplittingTransformerConfiguration splitConfig) {
80+
Validate.notNull(splitConfig, "splitConfig must not be null");
7181
CompletableFuture<ResponseBytes<ResponseT>> future = new CompletableFuture<>();
7282
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer =
73-
new ByteArraySplittingTransformer<>(this, future);
83+
new ByteArraySplittingTransformer<>(this, future, splitConfig.bufferSizeInBytes());
7484
return AsyncResponseTransformer.SplitResult.<ResponseT, ResponseBytes<ResponseT>>builder()
7585
.publisher(transformer)
7686
.resultFuture(future)

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import software.amazon.awssdk.core.exception.SdkClientException;
3535
import software.amazon.awssdk.utils.CompletableFutureUtils;
3636
import software.amazon.awssdk.utils.Logger;
37+
import software.amazon.awssdk.utils.Validate;
3738
import software.amazon.awssdk.utils.async.SimplePublisher;
3839

3940
@SdkInternalApi
@@ -73,9 +74,18 @@ public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<As
7374

7475
private final Map<Integer, ByteBuffer> buffers;
7576

77+
/**
78+
* The buffer size used to buffer the content received from the downstream subscriber
79+
*/
80+
private final long maximumBufferInBytes;
81+
7682
public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
7783
upstreamResponseTransformer,
78-
CompletableFuture<ResponseBytes<ResponseT>> resultFuture) {
84+
CompletableFuture<ResponseBytes<ResponseT>> resultFuture,
85+
Long maximumBufferSizeInBytes) {
86+
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
87+
this.maximumBufferInBytes = Validate.isPositive(
88+
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
7989
this.upstreamResponseTransformer = upstreamResponseTransformer;
8090
this.resultFuture = resultFuture;
8191
this.buffers = new ConcurrentHashMap<>();
@@ -223,7 +233,7 @@ public void onResponse(ResponseT response) {
223233

224234
@Override
225235
public void onStream(SdkPublisher<ByteBuffer> publisher) {
226-
delegate.onStream(publisher);
236+
delegate.onStream(publisher, maximumBufferInBytes);
227237
synchronized (lock) {
228238
if (!onStreamCalled) {
229239
onStreamCalled = true;

services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/MultipartConfiguration.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,6 @@ public Long minimumPartSizeInBytes() {
9090

9191
/**
9292
* The maximum memory, in bytes, that the SDK will use to buffer requests content into memory.
93-
* <p>
94-
* This setting is not supported and will be ignored when downloading to a byte array, i.e., when providing a
95-
* {@link ByteArrayAsyncResponseTransformer}.
96-
*
9793
* @return the value of the configured maximum memory usage.
9894
*/
9995
public Long apiCallBufferSizeInBytes() {
@@ -163,9 +159,6 @@ public interface Builder extends CopyableBuilder<Builder, MultipartConfiguration
163159
* Increasing this value may lead to better performance at the cost of using more memory.
164160
* <p>
165161
* Default value: If not specified, the SDK will use the equivalent of four parts worth of memory, so 32 Mib by default.
166-
* <p>
167-
* This setting is not supported and will be ignored when downloading to a byte array, i.e., when providing a
168-
* {@link ByteArrayAsyncResponseTransformer}.
169162
*
170163
* @param apiCallBufferSizeInBytes the value of the maximum memory usage.
171164
* @return an instance of this builder.

0 commit comments

Comments
 (0)