Skip to content

Commit 7c279ef

Browse files
committed
Override split method in byte array based response transformer
1 parent b532117 commit 7c279ef

File tree

7 files changed

+985
-24
lines changed

7 files changed

+985
-24
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import java.io.ByteArrayOutputStream;
2121
import java.nio.ByteBuffer;
2222
import java.util.concurrent.CompletableFuture;
23+
import java.util.function.Consumer;
2324
import org.reactivestreams.Subscriber;
2425
import org.reactivestreams.Subscription;
2526
import software.amazon.awssdk.annotations.SdkInternalApi;
2627
import software.amazon.awssdk.core.ResponseBytes;
28+
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
2729
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2830
import software.amazon.awssdk.core.async.SdkPublisher;
2931
import software.amazon.awssdk.utils.BinaryUtils;
@@ -65,6 +67,17 @@ public void exceptionOccurred(Throwable throwable) {
6567
cf.completeExceptionally(throwable);
6668
}
6769

70+
@Override
71+
public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(SplittingTransformerConfiguration splitConfig) {
72+
CompletableFuture<ResponseBytes<ResponseT>> future = new CompletableFuture<>();
73+
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer =
74+
new ByteArraySplittingTransformer(this, future);
75+
return AsyncResponseTransformer.SplitResult.<ResponseT, ResponseBytes<ResponseT>>builder()
76+
.publisher(transformer)
77+
.resultFuture(future)
78+
.build();
79+
}
80+
6881
@Override
6982
public String name() {
7083
return TransformerType.BYTES.getName();
@@ -108,4 +121,5 @@ public void onComplete() {
108121
resultFuture.complete(baos.toByteArray());
109122
}
110123
}
124+
111125
}
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.concurrent.atomic.AtomicLong;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
import org.reactivestreams.Subscriber;
29+
import org.reactivestreams.Subscription;
30+
import software.amazon.awssdk.core.ResponseBytes;
31+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
32+
import software.amazon.awssdk.core.async.SdkPublisher;
33+
import software.amazon.awssdk.core.exception.SdkClientException;
34+
import software.amazon.awssdk.utils.CompletableFutureUtils;
35+
import software.amazon.awssdk.utils.Logger;
36+
import software.amazon.awssdk.utils.async.DelegatingBufferingSubscriber;
37+
import software.amazon.awssdk.utils.async.SimplePublisher;
38+
39+
public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> {
40+
private static final Logger log = Logger.loggerFor(ByteArraySplittingTransformer.class);
41+
private final AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> upstreamResponseTransformer;
42+
private final SimplePublisher<AsyncResponseTransformer<ResponseT, ResponseT>> simplePublisher = new SimplePublisher<>();
43+
private final CompletableFuture<ResponseBytes<ResponseT>> resultFuture;
44+
private Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> downstreamSubscriber;
45+
private final AtomicInteger partNumber = new AtomicInteger(0);
46+
private final AtomicReference<ResponseT> responseT = new AtomicReference<>();
47+
48+
/**
49+
* This publisher is used to send the bytes received from the downstream subscriber's transformers to a
50+
* {@link DelegatingBufferingSubscriber} that will buffer a number of bytes up to {@code maximumBufferSize}.
51+
*/
52+
private final SimplePublisher<ByteBuffer> publisherToUpstream = new SimplePublisher<>();
53+
/**
54+
* The amount requested by the downstream subscriber that is still left to fulfill. Updated. when the
55+
* {@link Subscription#request(long) request} method is called on the downstream subscriber's subscription. Corresponds to the
56+
* number of {@code AsyncResponseTransformer} that will be published to the downstream subscriber.
57+
*/
58+
private final AtomicLong outstandingDemand = new AtomicLong(0);
59+
60+
/**
61+
* This flag stops the current thread from publishing transformers while another thread is already publishing.
62+
*/
63+
private final AtomicBoolean emitting = new AtomicBoolean(false);
64+
65+
private final Object lock = new Object();
66+
67+
/**
68+
* Set to true once {@code .onStream()} is called on the upstreamResponseTransformer
69+
*/
70+
private boolean onStreamCalled;
71+
72+
/**
73+
* Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the
74+
* {@code resultFuture} is cancelled.
75+
*/
76+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
77+
78+
private final Map<Integer, ByteBuffer> buffers;
79+
80+
public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> upstreamResponseTransformer,
81+
CompletableFuture<ResponseBytes<ResponseT>> resultFuture) {
82+
this.upstreamResponseTransformer = upstreamResponseTransformer;
83+
this.resultFuture = resultFuture;
84+
this.buffers = new ConcurrentHashMap<>();
85+
}
86+
87+
@Override
88+
public void subscribe(Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> subscriber) {
89+
this.downstreamSubscriber = subscriber;
90+
subscriber.onSubscribe(new DownstreamSubscription());
91+
}
92+
93+
private final class DownstreamSubscription implements Subscription {
94+
95+
@Override
96+
public void request(long n) {
97+
if (n <= 0) {
98+
downstreamSubscriber.onError(new IllegalArgumentException("Amount requested must be positive"));
99+
return;
100+
}
101+
long newDemand = outstandingDemand.updateAndGet(current -> {
102+
if (Long.MAX_VALUE - current < n) {
103+
return Long.MAX_VALUE;
104+
}
105+
return current + n;
106+
});
107+
log.trace(() -> String.format("new outstanding demand: %s", newDemand));
108+
emit();
109+
}
110+
111+
@Override
112+
public void cancel() {
113+
log.trace(() -> String.format("received cancel signal. Current cancel state is 'isCancelled=%s'", isCancelled.get()));
114+
if (isCancelled.compareAndSet(false, true)) {
115+
handleSubscriptionCancel();
116+
}
117+
}
118+
}
119+
120+
private void emit() {
121+
do {
122+
if (!emitting.compareAndSet(false, true)) {
123+
return;
124+
}
125+
try {
126+
if (doEmit()) {
127+
return;
128+
}
129+
} finally {
130+
emitting.compareAndSet(true, false);
131+
}
132+
} while (outstandingDemand.get() > 0);
133+
}
134+
135+
private boolean doEmit() {
136+
long demand = outstandingDemand.get();
137+
138+
while (demand > 0) {
139+
if (isCancelled.get()) {
140+
return true;
141+
}
142+
if (outstandingDemand.get() > 0) {
143+
demand = outstandingDemand.decrementAndGet();
144+
downstreamSubscriber.onNext(new IndividualTransformer(partNumber.incrementAndGet()));
145+
}
146+
}
147+
return false;
148+
}
149+
150+
/**
151+
* Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream
152+
* transformer need to finish processing before we complete. One typical use case for this is completing the multipart
153+
* download, the subscriber having reached the final part will signal that it doesn't need more
154+
* {@link AsyncResponseTransformer}s by calling {@code .cancel()} on the subscription.
155+
*/
156+
private void handleSubscriptionCancel() {
157+
synchronized (lock) {
158+
if (downstreamSubscriber == null) {
159+
log.trace(() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()");
160+
return;
161+
}
162+
if (!onStreamCalled) {
163+
// we never subscribe publisherToUpstream to the upstream, it would not complete
164+
downstreamSubscriber = null;
165+
return;
166+
}
167+
168+
// if result future is already complete (likely by exception propagation), skip.
169+
if (resultFuture.isDone()) {
170+
return;
171+
}
172+
173+
CompletableFuture<ResponseBytes<ResponseT>> upstreamPrepareFuture = upstreamResponseTransformer.prepare();
174+
CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture);
175+
176+
upstreamResponseTransformer.onResponse(responseT.get());
177+
178+
try {
179+
buffers.keySet().stream().sorted().forEach(index -> {
180+
publisherToUpstream.send(buffers.get(index)).exceptionally(ex -> {
181+
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", ex));
182+
return null;
183+
});
184+
});
185+
186+
publisherToUpstream.complete().exceptionally(ex -> {
187+
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", ex));
188+
return null;
189+
});
190+
upstreamResponseTransformer.onStream(SdkPublisher.adapt(publisherToUpstream));
191+
192+
} catch (Throwable throwable) {
193+
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", throwable));
194+
}
195+
}
196+
}
197+
198+
private class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> {
199+
private final int partNumber;
200+
private ByteArrayAsyncResponseTransformer<ResponseT> delegate = new ByteArrayAsyncResponseTransformer<>();
201+
202+
private CompletableFuture<ResponseT> future;
203+
private List<CompletableFuture<ResponseBytes<ResponseT>>> delegatePrepareFutures = new ArrayList<>();
204+
205+
private IndividualTransformer(int partNumber) {
206+
this.partNumber = partNumber;
207+
}
208+
209+
@Override
210+
public CompletableFuture<ResponseT> prepare() {
211+
future = new CompletableFuture<>();
212+
CompletableFuture<ResponseBytes<ResponseT>> prepare = delegate.prepare();
213+
CompletableFutureUtils.forwardExceptionTo(prepare, future);
214+
delegatePrepareFutures.add(prepare);
215+
return prepare.thenApply(responseTResponseBytes -> {
216+
buffers.put(partNumber, responseTResponseBytes.asByteBuffer());
217+
return responseTResponseBytes.response();
218+
});
219+
}
220+
221+
@Override
222+
public void onResponse(ResponseT response) {
223+
responseT.compareAndSet(null, response);
224+
delegate.onResponse(response);
225+
}
226+
227+
@Override
228+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
229+
delegate.onStream(publisher);
230+
synchronized (lock) {
231+
if (!onStreamCalled) {
232+
onStreamCalled = true;
233+
}
234+
}
235+
}
236+
237+
@Override
238+
public void exceptionOccurred(Throwable error) {
239+
delegate.exceptionOccurred(error);
240+
}
241+
}
242+
}

core/sdk-core/src/test/resources/log4j2.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ rootLogger.appenderRef.stdout.ref = ConsoleAppender
2525

2626
# Uncomment below to enable more specific logging
2727
#
28-
#logger.sdk.name = software.amazon.awssdk
29-
#logger.sdk.level = debug
28+
logger.sdk.name = software.amazon.awssdk.services.s3.internal.multipart
29+
logger.sdk.level = debug
3030
#
3131
#logger.request.name = software.amazon.awssdk.request
3232
#logger.request.level = debug

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.services.s3.S3AsyncClient;
2424
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
2525
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
26+
import software.amazon.awssdk.utils.CompletableFutureUtils;
2627
import software.amazon.awssdk.utils.Logger;
2728

2829
@SdkInternalApi
@@ -49,7 +50,9 @@ public <T> CompletableFuture<T> downloadObject(
4950
.build());
5051
MultipartDownloaderSubscriber subscriber = subscriber(getObjectRequest);
5152
split.publisher().subscribe(subscriber);
52-
return split.resultFuture();
53+
CompletableFuture<T> splitFuture = split.resultFuture();
54+
CompletableFutureUtils.forwardExceptionTo(subscriber.future(), splitFuture);
55+
return splitFuture;
5356
}
5457

5558
private MultipartDownloaderSubscriber subscriber(GetObjectRequest getObjectRequest) {

0 commit comments

Comments
 (0)