Skip to content

Commit 3aeb67a

Browse files
committed
Review comments
1 parent 24a1b68 commit 3aeb67a

File tree

4 files changed

+42
-16
lines changed

4 files changed

+42
-16
lines changed

core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import software.amazon.awssdk.annotations.SdkInternalApi;
2828
import software.amazon.awssdk.http.auth.aws.internal.signer.io.ContentLengthAwareSubscriber;
2929
import software.amazon.awssdk.utils.Pair;
30+
import software.amazon.awssdk.utils.Validate;
3031
import software.amazon.awssdk.utils.async.AddingTrailingDataSubscriber;
3132
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
3233
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
@@ -76,7 +77,7 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
7677

7778
public ChunkedEncodedPublisher(Builder b) {
7879
this.wrapped = b.publisher;
79-
this.contentLength = b.contentLength;
80+
this.contentLength = Validate.notNull(b.contentLength, "contentLength must not be null");
8081
this.chunkSize = b.chunkSize;
8182
this.extensions.addAll(b.extensions);
8283
this.trailers.addAll(b.trailers);
@@ -301,7 +302,7 @@ public void onNext(ByteBuffer byteBuffer) {
301302

302303
public static class Builder {
303304
private Publisher<ByteBuffer> publisher;
304-
private long contentLength;
305+
private Long contentLength;
305306
private int chunkSize;
306307
private boolean addEmptyTrailingChunk;
307308
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();

core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/ContentLengthAwareSubscriber.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ public class ContentLengthAwareSubscriber implements Subscriber<ByteBuffer> {
2929
private final Subscriber<? super ByteBuffer> subscriber;
3030
private Subscription subscription;
3131
private boolean subscriptionCancelled;
32-
private long contentLength;
32+
private long remaining;
3333

3434
public ContentLengthAwareSubscriber(Subscriber<? super ByteBuffer> subscriber, long contentLength) {
3535
this.subscriber = subscriber;
36-
this.contentLength = contentLength;
36+
this.remaining = contentLength;
3737
}
3838

3939
@Override
@@ -47,13 +47,13 @@ public void onSubscribe(Subscription subscription) {
4747

4848
@Override
4949
public void onNext(ByteBuffer byteBuffer) {
50-
if (contentLength > 0) {
51-
long bytesToRead = Math.min(contentLength, byteBuffer.remaining());
50+
if (remaining > 0) {
51+
long bytesToRead = Math.min(remaining, byteBuffer.remaining());
5252
// cast is safe, min of long and int is <= max_int
5353
byteBuffer.limit(byteBuffer.position() + (int) bytesToRead);
54-
contentLength -= bytesToRead;
54+
remaining -= bytesToRead;
5555
subscriber.onNext(byteBuffer);
56-
} else if (contentLength == 0 && !subscriptionCancelled) {
56+
} else if (remaining == 0 && !subscriptionCancelled) {
5757
subscriptionCancelled = true;
5858
subscription.cancel();
5959
onComplete();

core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisherTest.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public void subscribe_publisherEmpty_onlyProducesTrailer() {
5858
.addTrailer(() -> Pair.of("foo", Collections.singletonList("1")))
5959
.addTrailer(() -> Pair.of("bar", Collections.singletonList("2")))
6060
.addEmptyTrailingChunk(true)
61+
.contentLength(0)
6162
.build();
6263

6364
List<ByteBuffer> chunks = getAllElements(build);
@@ -119,14 +120,16 @@ void subscribe_trailerProviderPresent_multipleValues_trailerPartAdded() {
119120

120121
@Test
121122
void subscribe_trailerProviderPresent_onlyInvokedOnce() {
122-
TestPublisher upstream = randomPublisherOfLength(8);
123+
int contentLength = 8;
124+
TestPublisher upstream = randomPublisherOfLength(contentLength);
123125

124126
TrailerProvider trailerProvider = Mockito.spy(new StaticTrailerProvider("foo", "bar"));
125127

126128
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
127129
.publisher(upstream)
128130
.addEmptyTrailingChunk(true)
129131
.chunkSize(CHUNK_SIZE)
132+
.contentLength(contentLength)
130133
.addTrailer(trailerProvider).build();
131134

132135
getAllElements(chunkedPublisher);
@@ -136,13 +139,15 @@ void subscribe_trailerProviderPresent_onlyInvokedOnce() {
136139

137140
@Test
138141
void subscribe_trailerPresent_trailerFormattedCorrectly() {
139-
TestPublisher testPublisher = randomPublisherOfLength(32);
142+
int contentLength = 32;
143+
TestPublisher testPublisher = randomPublisherOfLength(contentLength);
140144

141145
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");
142146

143147
ChunkedEncodedPublisher chunkedPublisher = newChunkedBuilder(testPublisher)
144148
.addTrailer(trailerProvider)
145149
.addEmptyTrailingChunk(true)
150+
.contentLength(contentLength)
146151
.build();
147152

148153
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
@@ -197,11 +202,13 @@ void subscribe_extensionHasNoValue_formattedCorrectly() {
197202

198203
@Test
199204
void subscribe_multipleExtensions_formattedCorrectly() {
200-
TestPublisher testPublisher = randomPublisherOfLength(8);
205+
int contentLength = 8;
206+
TestPublisher testPublisher = randomPublisherOfLength(contentLength);
201207

202208
ChunkedEncodedPublisher.Builder chunkPublisher =
203209
ChunkedEncodedPublisher.builder()
204210
.publisher(testPublisher)
211+
.contentLength(contentLength)
205212
.chunkSize(CHUNK_SIZE);
206213

207214
Stream.of("1", "2", "3")
@@ -301,7 +308,12 @@ void subscribe_addTrailingChunkTrue_upstreamEmpty_trailingChunkAdded() {
301308
Publisher<ByteBuffer> empty = Flowable.empty();
302309

303310
ChunkedEncodedPublisher chunkedPublisher =
304-
ChunkedEncodedPublisher.builder().publisher(empty).chunkSize(CHUNK_SIZE).addEmptyTrailingChunk(true).build();
311+
ChunkedEncodedPublisher.builder()
312+
.publisher(empty)
313+
.chunkSize(CHUNK_SIZE)
314+
.addEmptyTrailingChunk(true)
315+
.contentLength(0)
316+
.build();
305317

306318
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
307319

core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/ContentLengthAwareSubscriberTest.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
public class ContentLengthAwareSubscriberTest {
3535
@Test
36-
public void subscribe_upstreamExceedsContentLength_correctlyTruncates() {
36+
void subscribe_upstreamExceedsContentLength_correctlyTruncates() {
3737
long contentLength = 64;
3838
Publisher<ByteBuffer> upstream = randomPublisherOfLength(8192, 8, 16);
3939

@@ -46,7 +46,7 @@ public void subscribe_upstreamExceedsContentLength_correctlyTruncates() {
4646
}
4747

4848
@Test
49-
public void subscribe_upstreamHasExactlyContentLength_signalsComplete() {
49+
void subscribe_upstreamHasExactlyContentLength_signalsComplete() {
5050
long contentLength = 8192;
5151
Publisher<ByteBuffer> upstream = randomPublisherOfLength((int) contentLength, 8, 16);
5252

@@ -59,7 +59,7 @@ public void subscribe_upstreamHasExactlyContentLength_signalsComplete() {
5959
}
6060

6161
@Test
62-
public void subscribe_upstreamExceedsContentLength_request1BufferAtATime_correctlyTruncates() throws Exception {
62+
void subscribe_upstreamExceedsContentLength_request1BufferAtATime_correctlyTruncates() throws Exception {
6363
long contentLength = 8192;
6464

6565
Publisher<ByteBuffer> upstream = randomPublisherOfLength((int) contentLength * 2, 8, 16);
@@ -104,7 +104,7 @@ public void onComplete() {
104104
}
105105

106106
@Test
107-
public void subscribe_upstreamExceedsContentLength_upstreamSubscriptionCancelledAfterContentLengthReached() {
107+
void subscribe_upstreamExceedsContentLength_upstreamSubscriptionCancelledAfterContentLengthReached() {
108108
long contentLength = 64;
109109
Publisher<ByteBuffer> upstream = randomPublisherOfLength((int) contentLength * 4, 8, 16);
110110

@@ -118,6 +118,19 @@ public void subscribe_upstreamExceedsContentLength_upstreamSubscriptionCancelled
118118
assertThat(totalRemaining(testSubscriber.values())).isEqualTo(contentLength);
119119
}
120120

121+
@Test
122+
void subscribe_upstreamHasContentAndContentLength0_signalsComplete() {
123+
Publisher<ByteBuffer> upstream = randomPublisherOfLength(128, 8, 16);
124+
125+
TestSubscriber<ByteBuffer> testSubscriber = new TestSubscriber<>();
126+
ContentLengthAwareSubscriber lengthAwareSubscriber = new ContentLengthAwareSubscriber(testSubscriber, 0L);
127+
upstream.subscribe(lengthAwareSubscriber);
128+
129+
testSubscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
130+
testSubscriber.assertComplete();
131+
assertThat(testSubscriber.values()).isEmpty();
132+
}
133+
121134
private static class TestSubscription implements Subscription {
122135
private final Subscription wrapped;
123136
private final AtomicLong cancelInvocations = new AtomicLong();

0 commit comments

Comments
 (0)