Skip to content

Commit 4a927cb

Browse files
committed
Address comments and refactor tests
1 parent c381f8a commit 4a927cb

File tree

6 files changed

+88
-173
lines changed

6 files changed

+88
-173
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
package software.amazon.awssdk.core.internal.async;
1717

1818
import java.nio.ByteBuffer;
19-
import java.util.ArrayList;
20-
import java.util.List;
2119
import java.util.Map;
2220
import java.util.concurrent.CompletableFuture;
2321
import java.util.concurrent.ConcurrentHashMap;
@@ -42,7 +40,7 @@ public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<As
4240
private final AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> upstreamResponseTransformer;
4341
private final CompletableFuture<ResponseBytes<ResponseT>> resultFuture;
4442
private Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> downstreamSubscriber;
45-
private final AtomicInteger onNextSignalsSent = new AtomicInteger(0);
43+
private final AtomicInteger nextPartNumber = new AtomicInteger(1);
4644
private final AtomicReference<ResponseT> responseT = new AtomicReference<>();
4745

4846
private final SimplePublisher<ByteBuffer> publisherToUpstream = new SimplePublisher<>();
@@ -138,7 +136,7 @@ private boolean doEmit() {
138136
}
139137
if (outstandingDemand.get() > 0) {
140138
demand = outstandingDemand.decrementAndGet();
141-
downstreamSubscriber.onNext(new IndividualTransformer(onNextSignalsSent.incrementAndGet()));
139+
downstreamSubscriber.onNext(new IndividualTransformer(nextPartNumber.getAndIncrement()));
142140
}
143141
}
144142
return false;
@@ -193,25 +191,19 @@ private void handleSubscriptionCancel() {
193191
}
194192

195193
private final class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> {
196-
private final int onNextCount;
194+
private final int partNumber;
197195
private final ByteArrayAsyncResponseTransformer<ResponseT> delegate = new ByteArrayAsyncResponseTransformer<>();
198196

199-
private CompletableFuture<ResponseT> future;
200-
private final List<CompletableFuture<ResponseBytes<ResponseT>>> delegatePrepareFutures = new ArrayList<>();
201-
202-
private IndividualTransformer(int onNextCount) {
203-
this.onNextCount = onNextCount;
197+
private IndividualTransformer(int partNumber) {
198+
this.partNumber = partNumber;
204199
}
205200

206201
@Override
207202
public CompletableFuture<ResponseT> prepare() {
208-
future = new CompletableFuture<>();
209203
CompletableFuture<ResponseBytes<ResponseT>> prepare = delegate.prepare();
210-
CompletableFutureUtils.forwardExceptionTo(prepare, future);
211-
delegatePrepareFutures.add(prepare);
212-
return prepare.thenApply(responseTResponseBytes -> {
213-
buffers.put(onNextCount, responseTResponseBytes.asByteBuffer());
214-
return responseTResponseBytes.response();
204+
return prepare.thenApply(responseBytes -> {
205+
buffers.put(partNumber, responseBytes.asByteBuffer());
206+
return responseBytes.response();
215207
});
216208
}
217209

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.tck.PublisherVerification;
21+
import org.reactivestreams.tck.TestEnvironment;
22+
import software.amazon.awssdk.core.ResponseBytes;
23+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
24+
import software.amazon.awssdk.core.async.SdkPublisher;
25+
26+
public class ByteArraySplittingTransformerTckTest extends PublisherVerification<AsyncResponseTransformer<Object, Object>> {
27+
28+
public ByteArraySplittingTransformerTckTest() {
29+
super(new TestEnvironment());
30+
}
31+
32+
@Override
33+
public Publisher<AsyncResponseTransformer<Object, Object>> createPublisher(long l) {
34+
CompletableFuture<ResponseBytes<Object>> future = new CompletableFuture<>();
35+
AsyncResponseTransformer<Object, ResponseBytes<Object>> upstreamTransformer = AsyncResponseTransformer.toBytes();
36+
ByteArraySplittingTransformer<Object> transformer = new ByteArraySplittingTransformer<>(upstreamTransformer, future);
37+
return SdkPublisher.adapt(transformer).limit(Math.toIntExact(l));
38+
}
39+
40+
@Override
41+
public Publisher<AsyncResponseTransformer<Object, Object>> createFailedPublisher() {
42+
return null;
43+
}
44+
45+
@Override
46+
public long maxElementsFromPublisher() {
47+
return Long.MAX_VALUE;
48+
}
49+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/MultipartConfiguration.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@
4242
* This means the S3 client will only download multiple parts if the object itself was uploaded as a
4343
* {@link S3AsyncClient#createMultipartUpload(CreateMultipartUploadRequest) multipart object}
4444
* <p>
45-
* When performing multipart download, retry is only supported for downloading to byte array, i.e., when providing a
46-
* {@link ByteArrayAsyncResponseTransformer}
45+
* When performing multipart download, retry is only supported when using an {@link AsyncResponseTransformer} implementation
46+
* that downloads the object into memory such, as {@link AsyncResponseTransformer#toBytes()}
4747
*/
4848
@SdkPublicApi
4949
public final class MultipartConfiguration implements ToCopyableBuilder<MultipartConfiguration.Builder, MultipartConfiguration> {
@@ -91,8 +91,8 @@ public Long minimumPartSizeInBytes() {
9191
/**
9292
* The maximum memory, in bytes, that the SDK will use to buffer requests content into memory.
9393
* <p>
94-
* This setting is not supported and will be ignored when downloading to a byte array, i.e., when providing a
95-
* {@link ByteArrayAsyncResponseTransformer}.
94+
* This setting does not apply if you are using an {@link AsyncResponseTransformer} implementation that downloads the
95+
* object into memory such as {@link AsyncResponseTransformer#toBytes}
9696
*
9797
* @return the value of the configured maximum memory usage.
9898
*/

services/s3/src/main/resources/codegen-resources/customization.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@
289289
"useS3ExpressSessionAuth": true,
290290
"multipartCustomization": {
291291
"multipartConfigurationClass": "software.amazon.awssdk.services.s3.multipart.MultipartConfiguration",
292-
"multipartConfigMethodDoc": "Configuration for multipart operation of this client.<p>When performing multipart download, retry is only supported for downloading to byte array, i.e., when providing a {@code ByteArrayAsyncResponseTransformer}",
292+
"multipartConfigMethodDoc": "Configuration for multipart operation of this client.<p>When performing multipart download, retry is only supported when using an {@code AsyncResponseTransformer} implementation that downloads the object into memory, such as {@code AsyncResponseTransformer#toBytes()}",
293293
"multipartEnableMethodDoc": "Enables automatic conversion of GET, PUT and COPY methods to their equivalent multipart operation. CRC32 checksum will be enabled for PUT, unless the checksum is specified or checksum validation is disabled.<p>When performing multipart download, retry is only supported for downloading to byte array, i.e., when providing a {@code ByteArrayAsyncResponseTransformer}",
294294
"contextParamEnabledKey": "S3AsyncClientDecorator.MULTIPART_ENABLED_KEY",
295295
"contextParamConfigKey": "S3AsyncClientDecorator.MULTIPART_CONFIGURATION_KEY"

0 commit comments

Comments
 (0)