Skip to content

Commit 3c91cc9

Browse files
committed
Fix build and add more tests
1 parent d94cec4 commit 3c91cc9

File tree

6 files changed

+159
-27
lines changed

6 files changed

+159
-27
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.annotations.SdkProtectedApi;
2424
import software.amazon.awssdk.core.async.AsyncRequestBody;
2525
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
26+
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
2627
import software.amazon.awssdk.core.async.SdkPublisher;
2728
import software.amazon.awssdk.utils.Logger;
2829
import software.amazon.awssdk.utils.Validate;
@@ -76,6 +77,17 @@ public SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfig
7677
return delegate.split(splitConfiguration);
7778
}
7879

80+
@Override
81+
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
82+
return delegate.splitV2(splitConfiguration);
83+
}
84+
85+
@Override
86+
public SdkPublisher<ClosableAsyncRequestBody> splitV2(
87+
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
88+
return delegate.splitV2(splitConfiguration);
89+
}
90+
7991
@Override
8092
public void subscribe(Subscriber<? super ByteBuffer> s) {
8193
invoke(() -> listener.publisherSubscribe(s), "publisherSubscribe");

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public SplittingPublisher(AsyncRequestBody asyncRequestBody,
6565

6666
Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes,
6767
"bufferSizeInBytes must be larger than or equal to " +
68-
"chunkSizeInBytes if the content length is unknown");
68+
"chunkSizeInBytes");
6969
}
7070

7171
@Override
@@ -77,7 +77,10 @@ public void subscribe(Subscriber<? super ClosableAsyncRequestBody> downstreamSub
7777
private class SplittingSubscriber implements Subscriber<ByteBuffer> {
7878
private Subscription upstreamSubscription;
7979
private final Long upstreamSize;
80-
private final AtomicInteger chunkNumber = new AtomicInteger(0);
80+
/**
81+
* 1 based index number for each part/chunk
82+
*/
83+
private final AtomicInteger partNumber = new AtomicInteger(1);
8184
private volatile DownstreamBody currentBody;
8285
private final AtomicBoolean hasOpenUpstreamDemand = new AtomicBoolean(false);
8386
private final AtomicLong dataBuffered = new AtomicLong(0);
@@ -97,15 +100,15 @@ public void onSubscribe(Subscription s) {
97100
this.upstreamSubscription = s;
98101
this.currentBody =
99102
initializeNextDownstreamBody(upstreamSize != null, calculateChunkSize(upstreamSize),
100-
chunkNumber.get());
103+
partNumber.get());
101104
// We need to request subscription *after* we set currentBody because onNext could be invoked right away.
102105
upstreamSubscription.request(1);
103106
}
104107

105-
private DownstreamBody initializeNextDownstreamBody(boolean contentLengthKnown, long chunkSize, int chunkNumber) {
108+
private DownstreamBody initializeNextDownstreamBody(boolean contentLengthKnown, long chunkSize, int partNumber) {
106109
currentBodySent.set(false);
107-
log.debug(() -> "initializing next downstream body " + chunkNumber);
108-
return new DownstreamBody(contentLengthKnown, chunkSize, chunkNumber);
110+
log.debug(() -> "initializing next downstream body " + partNumber);
111+
return new DownstreamBody(contentLengthKnown, chunkSize, partNumber);
109112
}
110113

111114
@Override
@@ -155,7 +158,7 @@ public void onNext(ByteBuffer byteBuffer) {
155158

156159
private void completeCurrentBodyAndCreateNewIfNeeded(ByteBuffer byteBuffer) {
157160
completeCurrentBodyAndDeliver();
158-
int currentChunk = chunkNumber.incrementAndGet();
161+
int nextChunk = partNumber.incrementAndGet();
159162
boolean shouldCreateNewDownstreamRequestBody;
160163
Long dataRemaining = totalDataRemaining();
161164

@@ -167,7 +170,7 @@ private void completeCurrentBodyAndCreateNewIfNeeded(ByteBuffer byteBuffer) {
167170

168171
if (shouldCreateNewDownstreamRequestBody) {
169172
long chunkSize = calculateChunkSize(dataRemaining);
170-
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
173+
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, nextChunk);
171174
}
172175
}
173176

@@ -181,7 +184,7 @@ private int amountRemainingInChunk() {
181184
*/
182185
private void completeCurrentBodyAndDeliver() {
183186
if (currentBodySent.compareAndSet(false, true)) {
184-
log.debug(() -> "completeCurrentBody for chunk " + currentBody.chunkNumber);
187+
log.debug(() -> "completeCurrentBody for chunk " + currentBody.partNumber);
185188
// For unknown content length, we always create a new DownstreamBody because we don't know if there is data
186189
// left or not, so we need to only send the body if there is actually data
187190
long bufferedLength = currentBody.bufferedLength;
@@ -218,7 +221,7 @@ public void onError(Throwable t) {
218221
}
219222

220223
private void sendCurrentBody(DownstreamBody body) {
221-
log.debug(() -> "sendCurrentBody for chunk " + body.chunkNumber);
224+
log.debug(() -> "sendCurrentBody for chunk " + body.partNumber);
222225
downstreamPublisher.send(body).exceptionally(t -> {
223226
downstreamPublisher.error(t);
224227
upstreamSubscription.cancel();
@@ -252,7 +255,7 @@ private Long totalDataRemaining() {
252255
if (upstreamSize == null) {
253256
return null;
254257
}
255-
return upstreamSize - (chunkNumber.get() * chunkSizeInBytes);
258+
return upstreamSize - ((partNumber.get() - 1) * chunkSizeInBytes);
256259
}
257260

258261
/**
@@ -267,15 +270,15 @@ private final class DownstreamBody implements ClosableAsyncRequestBody {
267270
*/
268271
private final long maxLength;
269272
private final Long totalLength;
270-
private final int chunkNumber;
273+
private final int partNumber;
271274
private volatile long bufferedLength = 0;
272275
private volatile ByteBuffersAsyncRequestBody delegate;
273276
private final List<ByteBuffer> buffers = new ArrayList<>();
274277

275-
private DownstreamBody(boolean contentLengthKnown, long maxLength, int chunkNumber) {
278+
private DownstreamBody(boolean contentLengthKnown, long maxLength, int partNumber) {
276279
this.totalLength = contentLengthKnown ? maxLength : null;
277280
this.maxLength = maxLength;
278-
this.chunkNumber = chunkNumber;
281+
this.partNumber = partNumber;
279282
}
280283

281284
@Override
@@ -284,21 +287,21 @@ public Optional<Long> contentLength() {
284287
}
285288

286289
public void send(ByteBuffer data) {
287-
log.debug(() -> String.format("Sending bytebuffer %s to chunk %d", data, chunkNumber));
290+
log.debug(() -> String.format("Sending bytebuffer %s to chunk %d", data, partNumber));
288291
int length = data.remaining();
289292
bufferedLength += length;
290293
addDataBuffered(length);
291294
buffers.add(data);
292295
}
293296

294297
public void complete() {
295-
log.debug(() -> "Received complete() for chunk number: " + chunkNumber + " length " + bufferedLength);
298+
log.debug(() -> "Received complete() for chunk number: " + partNumber + " length " + bufferedLength);
296299
this.delegate = ByteBuffersAsyncRequestBody.of(buffers);
297300
}
298301

299302
@Override
300303
public void subscribe(Subscriber<? super ByteBuffer> s) {
301-
log.debug(() -> "Subscribe for chunk number: " + chunkNumber + " length " + bufferedLength);
304+
log.debug(() -> "Subscribe for chunk number: " + partNumber + " length " + bufferedLength);
302305
delegate.subscribe(s);
303306
}
304307

@@ -311,7 +314,7 @@ private void addDataBuffered(long length) {
311314

312315
@Override
313316
public void close() {
314-
log.debug(() -> "Closing current body " + chunkNumber);
317+
log.debug(() -> "Closing current body " + partNumber);
315318
delegate.close();
316319
addDataBuffered(-bufferedLength);
317320
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ClosableAsyncRequestBodyAdaptorTest.java

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,43 @@
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.Mockito.doNothing;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.when;
2326

2427
import io.reactivex.Flowable;
2528
import io.reactivex.FlowableSubscriber;
29+
import io.reactivex.internal.observers.BiConsumerSingleObserver;
30+
import java.nio.ByteBuffer;
31+
import java.nio.charset.StandardCharsets;
32+
import java.util.Arrays;
33+
import java.util.Observable;
34+
import java.util.Observer;
35+
import java.util.Optional;
36+
import java.util.concurrent.CompletableFuture;
2637
import java.util.function.Consumer;
38+
import org.junit.jupiter.api.BeforeEach;
2739
import org.junit.jupiter.api.Test;
2840
import org.mockito.ArgumentCaptor;
2941
import org.mockito.Mockito;
3042
import org.reactivestreams.Subscriber;
43+
import org.reactivestreams.Subscription;
3144
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
3245
import software.amazon.awssdk.core.exception.NonRetryableException;
3346

3447
public class ClosableAsyncRequestBodyAdaptorTest {
48+
private ClosableAsyncRequestBody closableAsyncRequestBody;
3549

36-
@Test
37-
void resubscribe_shouldThrowException() {
38-
ClosableAsyncRequestBody closableAsyncRequestBody = Mockito.mock(ClosableAsyncRequestBody.class);
50+
@BeforeEach
51+
public void setup() {
52+
closableAsyncRequestBody =Mockito.mock(ClosableAsyncRequestBody.class);
3953
Mockito.when(closableAsyncRequestBody.doAfterOnComplete(any(Runnable.class))).thenReturn(closableAsyncRequestBody);
4054
Mockito.when(closableAsyncRequestBody.doAfterOnCancel(any(Runnable.class))).thenReturn(closableAsyncRequestBody);
4155
Mockito.when(closableAsyncRequestBody.doAfterOnError(any(Consumer.class))).thenReturn(closableAsyncRequestBody);
56+
}
4257

58+
@Test
59+
void resubscribe_shouldThrowException() {
4360
ClosableAsyncRequestBodyAdaptor adaptor = new ClosableAsyncRequestBodyAdaptor(closableAsyncRequestBody);
4461
Subscriber subscriber = Mockito.mock(Subscriber.class);
4562
adaptor.subscribe(subscriber);
@@ -55,4 +72,99 @@ void resubscribe_shouldThrowException() {
5572
.hasMessageContaining("A retry was attempted");
5673
}
5774

75+
@Test
76+
void onComplete_shouldCloseAsyncRequestBody() {
77+
TestClosableAsyncRequestBody asyncRequestBody = new TestClosableAsyncRequestBody();
78+
ClosableAsyncRequestBodyAdaptor adaptor = new ClosableAsyncRequestBodyAdaptor(asyncRequestBody);
79+
CompletableFuture<byte[]> future = new CompletableFuture<>();
80+
Subscriber<ByteBuffer> subscriber = new ByteArrayAsyncResponseTransformer.BaosSubscriber(future);
81+
adaptor.subscribe(subscriber);
82+
assertThat(asyncRequestBody.closeInvoked).isTrue();
83+
}
84+
85+
@Test
86+
void cancel_shouldCloseAsyncRequestBody() {
87+
TestClosableAsyncRequestBody asyncRequestBody = new TestClosableAsyncRequestBody();
88+
ClosableAsyncRequestBodyAdaptor adaptor = new ClosableAsyncRequestBodyAdaptor(asyncRequestBody);
89+
Subscriber<ByteBuffer> subscriber = new Subscriber<ByteBuffer>() {
90+
@Override
91+
public void onSubscribe(Subscription s) {
92+
s.cancel();
93+
}
94+
95+
@Override
96+
public void onNext(ByteBuffer byteBuffer) {
97+
}
98+
99+
@Override
100+
public void onError(Throwable t) {
101+
}
102+
103+
@Override
104+
public void onComplete() {
105+
}
106+
};
107+
adaptor.subscribe(subscriber);
108+
assertThat(asyncRequestBody.closeInvoked).isTrue();
109+
}
110+
111+
@Test
112+
void onError_shouldCloseAsyncRequestBody() {
113+
OnErrorClosableAsyncRequestBody asyncRequestBody = new OnErrorClosableAsyncRequestBody();
114+
ClosableAsyncRequestBodyAdaptor adaptor = new ClosableAsyncRequestBodyAdaptor(asyncRequestBody);
115+
CompletableFuture<byte[]> future = new CompletableFuture<>();
116+
Subscriber<ByteBuffer> subscriber = new ByteArrayAsyncResponseTransformer.BaosSubscriber(future);
117+
adaptor.subscribe(subscriber);
118+
assertThat(asyncRequestBody.closeInvoked).isTrue();
119+
}
120+
121+
122+
private static class TestClosableAsyncRequestBody implements ClosableAsyncRequestBody {
123+
private boolean closeInvoked;
124+
125+
@Override
126+
public Optional<Long> contentLength() {
127+
return Optional.empty();
128+
}
129+
130+
@Override
131+
public void subscribe(Subscriber<? super ByteBuffer> s) {
132+
Flowable.just(ByteBuffer.wrap("foo bar".getBytes(StandardCharsets.UTF_8)))
133+
.subscribe(s);
134+
}
135+
136+
@Override
137+
public void close() {
138+
closeInvoked = true;
139+
}
140+
}
141+
142+
private static class OnErrorClosableAsyncRequestBody implements ClosableAsyncRequestBody {
143+
private boolean closeInvoked;
144+
145+
@Override
146+
public Optional<Long> contentLength() {
147+
return Optional.empty();
148+
}
149+
150+
@Override
151+
public void subscribe(Subscriber<? super ByteBuffer> s) {
152+
s.onSubscribe(new Subscription() {
153+
@Override
154+
public void request(long n) {
155+
s.onError(new IllegalStateException("foobar"));
156+
}
157+
158+
@Override
159+
public void cancel() {
160+
161+
}
162+
});
163+
}
164+
165+
@Override
166+
public void close() {
167+
closeInvoked = true;
168+
}
169+
}
58170
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ public void onNext(ClosableAsyncRequestBody asyncRequestBody) {
151151
}
152152

153153
int currentPartNum = partNumber.getAndIncrement();
154+
155+
log.debug(() -> String.format("Received asyncRequestBody for part number %d with length %s", currentPartNum,
156+
asyncRequestBody.contentLength()));
157+
154158
if (existingParts.containsKey(currentPartNum)) {
155159
asyncRequestBody.subscribe(new CancelledSubscriber<>());
156160
asyncRequestBody.contentLength().ifPresent(progressListener::subscriberOnNext);
@@ -210,7 +214,7 @@ private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestB
210214
}
211215

212216
if (currentPartSize != partSize) {
213-
return Optional.of(contentLengthMismatchForPart(partSize, currentPartSize));
217+
return Optional.of(contentLengthMismatchForPart(partSize, currentPartSize, currentPartNum));
214218
}
215219
return Optional.empty();
216220
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,12 @@ static SdkClientException contentLengthMissingForPart(int currentPartNum) {
162162
return SdkClientException.create("Content length is missing on the AsyncRequestBody for part number " + currentPartNum);
163163
}
164164

165-
static SdkClientException contentLengthMismatchForPart(long expected, long actual) {
165+
static SdkClientException contentLengthMismatchForPart(long expected, long actual, int partNum) {
166166
return SdkClientException.create(String.format("Content length must not be greater than "
167-
+ "part size. Expected: %d, Actual: %d",
167+
+ "part size. Expected: %d, Actual: %d, partNum: %d",
168168
expected,
169-
actual));
169+
actual,
170+
partNum));
170171
}
171172

172173
static SdkClientException partNumMismatch(int expectedNumParts, int actualNumParts) {

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public void onNext(ClosableAsyncRequestBody asyncRequestBody) {
159159
return;
160160
}
161161
int currentPartNum = partNumber.incrementAndGet();
162-
log.debug(() -> String.format("Received asyncRequestBody for part number %d with length %d", currentPartNum,
162+
log.debug(() -> String.format("Received asyncRequestBody for part number %d with length %s", currentPartNum,
163163
asyncRequestBody.contentLength()));
164164
asyncRequestBodyInFlight.incrementAndGet();
165165

@@ -209,7 +209,7 @@ private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestB
209209

210210
Long contentLengthCurrentPart = contentLength.get();
211211
if (contentLengthCurrentPart > partSizeInBytes) {
212-
return Optional.of(contentLengthMismatchForPart(partSizeInBytes, contentLengthCurrentPart));
212+
return Optional.of(contentLengthMismatchForPart(partSizeInBytes, contentLengthCurrentPart, currentPartNum));
213213

214214
}
215215
return Optional.empty();

0 commit comments

Comments
 (0)