Skip to content

Commit 3814adb

Browse files
committed
Fix build and add more tests
1 parent d94cec4 commit 3814adb

File tree

5 files changed

+135
-8
lines changed

5 files changed

+135
-8
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.annotations.SdkProtectedApi;
2424
import software.amazon.awssdk.core.async.AsyncRequestBody;
2525
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
26+
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
2627
import software.amazon.awssdk.core.async.SdkPublisher;
2728
import software.amazon.awssdk.utils.Logger;
2829
import software.amazon.awssdk.utils.Validate;
@@ -76,6 +77,16 @@ public SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfig
7677
return delegate.split(splitConfiguration);
7778
}
7879

80+
@Override
81+
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
82+
return delegate.splitV2(splitConfiguration);
83+
}
84+
85+
@Override
86+
public SdkPublisher<ClosableAsyncRequestBody> splitV2(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
87+
return delegate.splitV2(splitConfiguration);
88+
}
89+
7990
@Override
8091
public void subscribe(Subscriber<? super ByteBuffer> s) {
8192
invoke(() -> listener.publisherSubscribe(s), "publisherSubscribe");

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public SplittingPublisher(AsyncRequestBody asyncRequestBody,
6565

6666
Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes,
6767
"bufferSizeInBytes must be larger than or equal to " +
68-
"chunkSizeInBytes if the content length is unknown");
68+
"chunkSizeInBytes");
6969
}
7070

7171
@Override
@@ -77,7 +77,7 @@ public void subscribe(Subscriber<? super ClosableAsyncRequestBody> downstreamSub
7777
private class SplittingSubscriber implements Subscriber<ByteBuffer> {
7878
private Subscription upstreamSubscription;
7979
private final Long upstreamSize;
80-
private final AtomicInteger chunkNumber = new AtomicInteger(0);
80+
private final AtomicInteger chunkNumber = new AtomicInteger(1);
8181
private volatile DownstreamBody currentBody;
8282
private final AtomicBoolean hasOpenUpstreamDemand = new AtomicBoolean(false);
8383
private final AtomicLong dataBuffered = new AtomicLong(0);
@@ -155,7 +155,7 @@ public void onNext(ByteBuffer byteBuffer) {
155155

156156
private void completeCurrentBodyAndCreateNewIfNeeded(ByteBuffer byteBuffer) {
157157
completeCurrentBodyAndDeliver();
158-
int currentChunk = chunkNumber.incrementAndGet();
158+
int nextChunk = chunkNumber.incrementAndGet();
159159
boolean shouldCreateNewDownstreamRequestBody;
160160
Long dataRemaining = totalDataRemaining();
161161

@@ -167,7 +167,7 @@ private void completeCurrentBodyAndCreateNewIfNeeded(ByteBuffer byteBuffer) {
167167

168168
if (shouldCreateNewDownstreamRequestBody) {
169169
long chunkSize = calculateChunkSize(dataRemaining);
170-
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
170+
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, nextChunk);
171171
}
172172
}
173173

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ClosableAsyncRequestBodyAdaptorTest.java

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,43 @@
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.Mockito.doNothing;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.when;
2326

2427
import io.reactivex.Flowable;
2528
import io.reactivex.FlowableSubscriber;
29+
import io.reactivex.internal.observers.BiConsumerSingleObserver;
30+
import java.nio.ByteBuffer;
31+
import java.nio.charset.StandardCharsets;
32+
import java.util.Arrays;
33+
import java.util.Observable;
34+
import java.util.Observer;
35+
import java.util.Optional;
36+
import java.util.concurrent.CompletableFuture;
2637
import java.util.function.Consumer;
38+
import org.junit.jupiter.api.BeforeEach;
2739
import org.junit.jupiter.api.Test;
2840
import org.mockito.ArgumentCaptor;
2941
import org.mockito.Mockito;
3042
import org.reactivestreams.Subscriber;
43+
import org.reactivestreams.Subscription;
3144
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
3245
import software.amazon.awssdk.core.exception.NonRetryableException;
3346

3447
public class ClosableAsyncRequestBodyAdaptorTest {
48+
private ClosableAsyncRequestBody closableAsyncRequestBody;
3549

36-
@Test
37-
void resubscribe_shouldThrowException() {
38-
ClosableAsyncRequestBody closableAsyncRequestBody = Mockito.mock(ClosableAsyncRequestBody.class);
50+
@BeforeEach
51+
public void setup() {
52+
closableAsyncRequestBody =Mockito.mock(ClosableAsyncRequestBody.class);
3953
Mockito.when(closableAsyncRequestBody.doAfterOnComplete(any(Runnable.class))).thenReturn(closableAsyncRequestBody);
4054
Mockito.when(closableAsyncRequestBody.doAfterOnCancel(any(Runnable.class))).thenReturn(closableAsyncRequestBody);
4155
Mockito.when(closableAsyncRequestBody.doAfterOnError(any(Consumer.class))).thenReturn(closableAsyncRequestBody);
56+
}
4257

58+
@Test
59+
void resubscribe_shouldThrowException() {
4360
ClosableAsyncRequestBodyAdaptor adaptor = new ClosableAsyncRequestBodyAdaptor(closableAsyncRequestBody);
4461
Subscriber subscriber = Mockito.mock(Subscriber.class);
4562
adaptor.subscribe(subscriber);
@@ -55,4 +72,99 @@ void resubscribe_shouldThrowException() {
5572
.hasMessageContaining("A retry was attempted");
5673
}
5774

75+
@Test
76+
void onComplete_shouldCloseAsyncRequestBody() {
77+
TestClosableAsyncRequestBody asyncRequestBody = new TestClosableAsyncRequestBody();
78+
ClosableAsyncRequestBodyAdaptor adaptor = new ClosableAsyncRequestBodyAdaptor(asyncRequestBody);
79+
CompletableFuture<byte[]> future = new CompletableFuture<>();
80+
Subscriber<ByteBuffer> subscriber = new ByteArrayAsyncResponseTransformer.BaosSubscriber(future);
81+
adaptor.subscribe(subscriber);
82+
assertThat(asyncRequestBody.closeInvoked).isTrue();
83+
}
84+
85+
@Test
86+
void cancel_shouldCloseAsyncRequestBody() {
87+
TestClosableAsyncRequestBody asyncRequestBody = new TestClosableAsyncRequestBody();
88+
ClosableAsyncRequestBodyAdaptor adaptor = new ClosableAsyncRequestBodyAdaptor(asyncRequestBody);
89+
Subscriber<ByteBuffer> subscriber = new Subscriber<ByteBuffer>() {
90+
@Override
91+
public void onSubscribe(Subscription s) {
92+
s.cancel();
93+
}
94+
95+
@Override
96+
public void onNext(ByteBuffer byteBuffer) {
97+
}
98+
99+
@Override
100+
public void onError(Throwable t) {
101+
}
102+
103+
@Override
104+
public void onComplete() {
105+
}
106+
};
107+
adaptor.subscribe(subscriber);
108+
assertThat(asyncRequestBody.closeInvoked).isTrue();
109+
}
110+
111+
@Test
112+
void onError_shouldCloseAsyncRequestBody() {
113+
OnErrorClosableAsyncRequestBody asyncRequestBody = new OnErrorClosableAsyncRequestBody();
114+
ClosableAsyncRequestBodyAdaptor adaptor = new ClosableAsyncRequestBodyAdaptor(asyncRequestBody);
115+
CompletableFuture<byte[]> future = new CompletableFuture<>();
116+
Subscriber<ByteBuffer> subscriber = new ByteArrayAsyncResponseTransformer.BaosSubscriber(future);
117+
adaptor.subscribe(subscriber);
118+
assertThat(asyncRequestBody.closeInvoked).isTrue();
119+
}
120+
121+
122+
private static class TestClosableAsyncRequestBody implements ClosableAsyncRequestBody {
123+
private boolean closeInvoked;
124+
125+
@Override
126+
public Optional<Long> contentLength() {
127+
return Optional.empty();
128+
}
129+
130+
@Override
131+
public void subscribe(Subscriber<? super ByteBuffer> s) {
132+
Flowable.just(ByteBuffer.wrap("foo bar".getBytes(StandardCharsets.UTF_8)))
133+
.subscribe(s);
134+
}
135+
136+
@Override
137+
public void close() {
138+
closeInvoked = true;
139+
}
140+
}
141+
142+
private static class OnErrorClosableAsyncRequestBody implements ClosableAsyncRequestBody {
143+
private boolean closeInvoked;
144+
145+
@Override
146+
public Optional<Long> contentLength() {
147+
return Optional.empty();
148+
}
149+
150+
@Override
151+
public void subscribe(Subscriber<? super ByteBuffer> s) {
152+
s.onSubscribe(new Subscription() {
153+
@Override
154+
public void request(long n) {
155+
s.onError(new IllegalStateException("foobar"));
156+
}
157+
158+
@Override
159+
public void cancel() {
160+
161+
}
162+
});
163+
}
164+
165+
@Override
166+
public void close() {
167+
closeInvoked = true;
168+
}
169+
}
58170
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ public void onNext(ClosableAsyncRequestBody asyncRequestBody) {
151151
}
152152

153153
int currentPartNum = partNumber.getAndIncrement();
154+
155+
log.debug(() -> String.format("Received asyncRequestBody for part number %d with length %s", currentPartNum,
156+
asyncRequestBody.contentLength()));
157+
154158
if (existingParts.containsKey(currentPartNum)) {
155159
asyncRequestBody.subscribe(new CancelledSubscriber<>());
156160
asyncRequestBody.contentLength().ifPresent(progressListener::subscriberOnNext);

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public void onNext(ClosableAsyncRequestBody asyncRequestBody) {
159159
return;
160160
}
161161
int currentPartNum = partNumber.incrementAndGet();
162-
log.debug(() -> String.format("Received asyncRequestBody for part number %d with length %d", currentPartNum,
162+
log.debug(() -> String.format("Received asyncRequestBody for part number %d with length %s", currentPartNum,
163163
asyncRequestBody.contentLength()));
164164
asyncRequestBodyInFlight.incrementAndGet();
165165

0 commit comments

Comments
 (0)