Skip to content

Commit 43f262c

Browse files
authored
Truncate to content in chunked publisher (#6304)
* Truncate to content in chunked publisher Add support for ensuring that the `ChunkedEncodedPublisher` on encodes a set number of bytes. * Review comments * Review comments * Review comments
1 parent 5eee54a commit 43f262c

File tree

6 files changed

+452
-18
lines changed

6 files changed

+452
-18
lines changed

core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@
2626
import org.reactivestreams.Subscriber;
2727
import software.amazon.awssdk.annotations.SdkInternalApi;
2828
import software.amazon.awssdk.utils.Pair;
29+
import software.amazon.awssdk.utils.Validate;
2930
import software.amazon.awssdk.utils.async.AddingTrailingDataSubscriber;
31+
import software.amazon.awssdk.utils.async.ContentLengthAwareSubscriber;
3032
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
3133
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
3234
import software.amazon.awssdk.utils.internal.MappingSubscriber;
3335

3436
/**
3537
* An implementation of chunk-transfer encoding, but by wrapping a {@link Publisher} of {@link ByteBuffer}. This implementation
36-
* supports chunk-headers, chunk-extensions.
38+
* supports chunk-headers, chunk-extensions, and trailer-part.
3739
* <p>
3840
* Per <a href="https://datatracker.ietf.org/doc/html/rfc7230#section-4.1">RFC-7230</a>, a chunk-transfer encoded message is
3941
* defined as:
@@ -66,6 +68,7 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
6668
private static final byte COMMA = ',';
6769

6870
private final Publisher<ByteBuffer> wrapped;
71+
private final long contentLength;
6972
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
7073
private final List<TrailerProvider> trailers = new ArrayList<>();
7174
private final int chunkSize;
@@ -74,6 +77,7 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
7477

7578
public ChunkedEncodedPublisher(Builder b) {
7679
this.wrapped = b.publisher;
80+
this.contentLength = Validate.notNull(b.contentLength, "contentLength must not be null");
7781
this.chunkSize = b.chunkSize;
7882
this.extensions.addAll(b.extensions);
7983
this.trailers.addAll(b.trailers);
@@ -82,7 +86,8 @@ public ChunkedEncodedPublisher(Builder b) {
8286

8387
@Override
8488
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
85-
Publisher<Iterable<ByteBuffer>> chunked = chunk(wrapped);
89+
Publisher<ByteBuffer> lengthEnforced = limitLength(wrapped, contentLength);
90+
Publisher<Iterable<ByteBuffer>> chunked = chunk(lengthEnforced);
8691
Publisher<Iterable<ByteBuffer>> trailingAdded = addTrailingChunks(chunked);
8792
Publisher<ByteBuffer> flattened = flatten(trailingAdded);
8893
Publisher<ByteBuffer> encoded = map(flattened, this::encodeChunk);
@@ -111,6 +116,10 @@ private Iterable<Iterable<ByteBuffer>> getTrailingChunks() {
111116
return Collections.singletonList(trailing);
112117
}
113118

119+
private Publisher<ByteBuffer> limitLength(Publisher<ByteBuffer> publisher, long length) {
120+
return subscriber -> publisher.subscribe(new ContentLengthAwareSubscriber(subscriber, length));
121+
}
122+
114123
private Publisher<Iterable<ByteBuffer>> chunk(Publisher<ByteBuffer> upstream) {
115124
return subscriber -> {
116125
upstream.subscribe(new ChunkingSubscriber(subscriber));
@@ -153,8 +162,7 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
153162
}
154163

155164
int trailerLen = trailerData.stream()
156-
// + 2 for each CRLF that ends the header-field
157-
.mapToInt(t -> t.remaining() + 2)
165+
.mapToInt(t -> t.remaining() + CRLF.length)
158166
.sum();
159167

160168
int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + trailerLen + CRLF.length;
@@ -188,11 +196,11 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
188196
encoded.put(t);
189197
encoded.put(CRLF);
190198
});
199+
// empty line ends the request body
191200
encoded.put(CRLF);
192201
}
193202

194203
encoded.flip();
195-
196204
return encoded;
197205
}
198206

@@ -294,6 +302,7 @@ public void onNext(ByteBuffer byteBuffer) {
294302

295303
public static class Builder {
296304
private Publisher<ByteBuffer> publisher;
305+
private Long contentLength;
297306
private int chunkSize;
298307
private boolean addEmptyTrailingChunk;
299308
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
@@ -304,6 +313,15 @@ public Builder publisher(Publisher<ByteBuffer> publisher) {
304313
return this;
305314
}
306315

316+
public Publisher<ByteBuffer> publisher() {
317+
return publisher;
318+
}
319+
320+
public Builder contentLength(long contentLength) {
321+
this.contentLength = contentLength;
322+
return this;
323+
}
324+
307325
public Builder chunkSize(int chunkSize) {
308326
this.chunkSize = chunkSize;
309327
return this;
@@ -324,6 +342,10 @@ public Builder addTrailer(TrailerProvider trailerProvider) {
324342
return this;
325343
}
326344

345+
public List<TrailerProvider> trailers() {
346+
return trailers;
347+
}
348+
327349
public ChunkedEncodedPublisher build() {
328350
return new ChunkedEncodedPublisher(this);
329351
}

core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisherTest.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
1919

2020
import io.reactivex.Flowable;
21+
import io.reactivex.subscribers.TestSubscriber;
2122
import java.nio.ByteBuffer;
2223
import java.nio.charset.StandardCharsets;
2324
import java.util.ArrayList;
@@ -26,6 +27,7 @@
2627
import java.util.List;
2728
import java.util.PrimitiveIterator;
2829
import java.util.Random;
30+
import java.util.concurrent.TimeUnit;
2931
import java.util.stream.Collectors;
3032
import java.util.stream.Stream;
3133
import org.junit.jupiter.api.BeforeEach;
@@ -56,6 +58,7 @@ public void subscribe_publisherEmpty_onlyProducesTrailer() {
5658
.addTrailer(() -> Pair.of("foo", Collections.singletonList("1")))
5759
.addTrailer(() -> Pair.of("bar", Collections.singletonList("2")))
5860
.addEmptyTrailingChunk(true)
61+
.contentLength(0)
5962
.build();
6063

6164
List<ByteBuffer> chunks = getAllElements(build);
@@ -73,12 +76,14 @@ public void subscribe_publisherEmpty_onlyProducesTrailer() {
7376

7477
@Test
7578
void subscribe_trailerProviderPresent_trailerPartAdded() {
79+
int contentLength = 8;
7680
TestPublisher upstream = randomPublisherOfLength(8);
7781

7882
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");
7983

8084
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
8185
.publisher(upstream)
86+
.contentLength(contentLength)
8287
.chunkSize(CHUNK_SIZE)
8388
.addEmptyTrailingChunk(true)
8489
.addTrailer(trailerProvider)
@@ -93,12 +98,14 @@ void subscribe_trailerProviderPresent_trailerPartAdded() {
9398

9499
@Test
95100
void subscribe_trailerProviderPresent_multipleValues_trailerPartAdded() {
96-
TestPublisher upstream = randomPublisherOfLength(8);
101+
int contentLength = 8;
102+
TestPublisher upstream = randomPublisherOfLength(contentLength);
97103

98104
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", Arrays.asList("bar1", "bar2", "bar3"));
99105

100106
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
101107
.publisher(upstream)
108+
.contentLength(contentLength)
102109
.chunkSize(CHUNK_SIZE)
103110
.addEmptyTrailingChunk(true)
104111
.addTrailer(trailerProvider)
@@ -113,14 +120,16 @@ void subscribe_trailerProviderPresent_multipleValues_trailerPartAdded() {
113120

114121
@Test
115122
void subscribe_trailerProviderPresent_onlyInvokedOnce() {
116-
TestPublisher upstream = randomPublisherOfLength(8);
123+
int contentLength = 8;
124+
TestPublisher upstream = randomPublisherOfLength(contentLength);
117125

118126
TrailerProvider trailerProvider = Mockito.spy(new StaticTrailerProvider("foo", "bar"));
119127

120128
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
121129
.publisher(upstream)
122130
.addEmptyTrailingChunk(true)
123131
.chunkSize(CHUNK_SIZE)
132+
.contentLength(contentLength)
124133
.addTrailer(trailerProvider).build();
125134

126135
getAllElements(chunkedPublisher);
@@ -130,13 +139,15 @@ void subscribe_trailerProviderPresent_onlyInvokedOnce() {
130139

131140
@Test
132141
void subscribe_trailerPresent_trailerFormattedCorrectly() {
133-
TestPublisher testPublisher = randomPublisherOfLength(32);
142+
int contentLength = 32;
143+
TestPublisher testPublisher = randomPublisherOfLength(contentLength);
134144

135145
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");
136146

137147
ChunkedEncodedPublisher chunkedPublisher = newChunkedBuilder(testPublisher)
138148
.addTrailer(trailerProvider)
139149
.addEmptyTrailingChunk(true)
150+
.contentLength(contentLength)
140151
.build();
141152

142153
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
@@ -152,11 +163,13 @@ void subscribe_trailerPresent_trailerFormattedCorrectly() {
152163

153164
@Test
154165
void subscribe_wrappedDoesNotFillBuffer_allDataInSingleChunk() {
155-
ByteBuffer element = ByteBuffer.wrap("hello world".getBytes(StandardCharsets.UTF_8));
166+
byte[] content = "hello world".getBytes(StandardCharsets.UTF_8);
167+
ByteBuffer element = ByteBuffer.wrap(content);
156168
Flowable<ByteBuffer> upstream = Flowable.just(element.duplicate());
157169

158170
ChunkedEncodedPublisher publisher = ChunkedEncodedPublisher.builder()
159171
.chunkSize(CHUNK_SIZE)
172+
.contentLength(content.length)
160173
.publisher(upstream)
161174
.build();
162175

@@ -169,7 +182,8 @@ void subscribe_wrappedDoesNotFillBuffer_allDataInSingleChunk() {
169182

170183
@Test
171184
void subscribe_extensionHasNoValue_formattedCorrectly() {
172-
TestPublisher testPublisher = randomPublisherOfLength(8);
185+
int contentLength = 8;
186+
TestPublisher testPublisher = randomPublisherOfLength(contentLength);
173187

174188
ChunkExtensionProvider extensionProvider = new StaticExtensionProvider("foo", "");
175189

@@ -178,6 +192,7 @@ void subscribe_extensionHasNoValue_formattedCorrectly() {
178192
.publisher(testPublisher)
179193
.addExtension(extensionProvider)
180194
.chunkSize(CHUNK_SIZE)
195+
.contentLength(contentLength)
181196
.build();
182197

183198
List<ByteBuffer> chunks = getAllElements(chunkPublisher);
@@ -187,11 +202,13 @@ void subscribe_extensionHasNoValue_formattedCorrectly() {
187202

188203
@Test
189204
void subscribe_multipleExtensions_formattedCorrectly() {
190-
TestPublisher testPublisher = randomPublisherOfLength(8);
205+
int contentLength = 8;
206+
TestPublisher testPublisher = randomPublisherOfLength(contentLength);
191207

192208
ChunkedEncodedPublisher.Builder chunkPublisher =
193209
ChunkedEncodedPublisher.builder()
194210
.publisher(testPublisher)
211+
.contentLength(contentLength)
195212
.chunkSize(CHUNK_SIZE);
196213

197214
Stream.of("1", "2", "3")
@@ -207,10 +224,12 @@ void subscribe_multipleExtensions_formattedCorrectly() {
207224
void subscribe_randomElementSizes_dataChunkedCorrectly() {
208225
for (int i = 0; i < 512; ++i) {
209226
int nChunks = 24;
210-
TestPublisher byteBufferPublisher = randomPublisherOfLength(CHUNK_SIZE * 24);
227+
int contentLength = nChunks * CHUNK_SIZE;
228+
TestPublisher byteBufferPublisher = randomPublisherOfLength(contentLength);
211229

212230
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
213231
.publisher(byteBufferPublisher)
232+
.contentLength(contentLength)
214233
.chunkSize(CHUNK_SIZE)
215234
.build();
216235

@@ -232,14 +251,16 @@ void subscribe_randomElementSizes_dataChunkedCorrectly() {
232251
void subscribe_randomElementSizes_chunksHaveExtensions_dataChunkedCorrectly() {
233252
for (int i = 0; i < 512; ++i) {
234253
int nChunks = 24;
235-
TestPublisher byteBufferPublisher = randomPublisherOfLength(CHUNK_SIZE * 24);
254+
int contentLength = CHUNK_SIZE * nChunks;
255+
TestPublisher byteBufferPublisher = randomPublisherOfLength(contentLength);
236256

237257
StaticExtensionProvider extensionProvider = Mockito.spy(new StaticExtensionProvider("foo", "bar"));
238258

239259
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
240260
.publisher(byteBufferPublisher)
241261
.addExtension(extensionProvider)
242262
.chunkSize(CHUNK_SIZE)
263+
.contentLength(contentLength)
243264
.build();
244265

245266
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
@@ -264,12 +285,14 @@ void subscribe_randomElementSizes_chunksHaveExtensions_dataChunkedCorrectly() {
264285

265286
@Test
266287
void subscribe_addTrailingChunkTrue_trailingChunkAdded() {
267-
TestPublisher testPublisher = randomPublisherOfLength(CHUNK_SIZE * 2);
288+
int contentLength = CHUNK_SIZE * 2;
289+
TestPublisher testPublisher = randomPublisherOfLength(contentLength);
268290

269291
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
270292
.publisher(testPublisher)
271293
.chunkSize(CHUNK_SIZE)
272294
.addEmptyTrailingChunk(true)
295+
.contentLength(contentLength)
273296
.build();
274297

275298
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
@@ -285,7 +308,12 @@ void subscribe_addTrailingChunkTrue_upstreamEmpty_trailingChunkAdded() {
285308
Publisher<ByteBuffer> empty = Flowable.empty();
286309

287310
ChunkedEncodedPublisher chunkedPublisher =
288-
ChunkedEncodedPublisher.builder().publisher(empty).chunkSize(CHUNK_SIZE).addEmptyTrailingChunk(true).build();
311+
ChunkedEncodedPublisher.builder()
312+
.publisher(empty)
313+
.chunkSize(CHUNK_SIZE)
314+
.addEmptyTrailingChunk(true)
315+
.contentLength(0)
316+
.build();
289317

290318
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
291319

@@ -297,10 +325,15 @@ void subscribe_extensionsPresent_extensionsInvokedForEachChunk() {
297325
ChunkExtensionProvider mockProvider = Mockito.spy(new StaticExtensionProvider("foo", "bar"));
298326

299327
int nChunks = 16;
300-
TestPublisher elements = randomPublisherOfLength(nChunks * CHUNK_SIZE);
328+
int contentLength = CHUNK_SIZE * nChunks;
329+
TestPublisher elements = randomPublisherOfLength(contentLength);
301330

302-
ChunkedEncodedPublisher chunkPublisher =
303-
ChunkedEncodedPublisher.builder().publisher(elements).chunkSize(CHUNK_SIZE).addExtension(mockProvider).build();
331+
ChunkedEncodedPublisher chunkPublisher = ChunkedEncodedPublisher.builder()
332+
.publisher(elements)
333+
.contentLength(contentLength)
334+
.chunkSize(CHUNK_SIZE)
335+
.addExtension(mockProvider)
336+
.build();
304337

305338
List<ByteBuffer> chunks = getAllElements(chunkPublisher);
306339

@@ -316,6 +349,28 @@ void subscribe_extensionsPresent_extensionsInvokedForEachChunk() {
316349
}
317350
}
318351

352+
@Test
353+
void subscribe_wrappedExceedsContentLength_dataTruncatedToLength() {
354+
int contentLength = CHUNK_SIZE * 4 - 1;
355+
TestPublisher elements = randomPublisherOfLength(contentLength * 2);
356+
357+
TestSubscriber<ByteBuffer> subscriber = new TestSubscriber<>();
358+
ChunkedEncodedPublisher chunkPublisher = newChunkedBuilder(elements).contentLength(contentLength)
359+
.build();
360+
361+
chunkPublisher.subscribe(subscriber);
362+
363+
subscriber.awaitTerminalEvent(30, TimeUnit.SECONDS);
364+
365+
int totalRemaining = subscriber.values()
366+
.stream()
367+
.map(this::stripEncoding)
368+
.mapToInt(ByteBuffer::remaining)
369+
.sum();
370+
371+
assertThat(totalRemaining).isEqualTo(contentLength);
372+
}
373+
319374
private static ChunkedEncodedPublisher.Builder newChunkedBuilder(Publisher<ByteBuffer> publisher) {
320375
return ChunkedEncodedPublisher.builder().publisher(publisher).chunkSize(CHUNK_SIZE);
321376
}
@@ -401,6 +456,10 @@ private ByteBuffer stripEncoding(ByteBuffer chunk) {
401456
return stripped;
402457
}
403458

459+
private long totalRemaining(List<ByteBuffer> buffers) {
460+
return buffers.stream().mapToLong(ByteBuffer::remaining).sum();
461+
}
462+
404463
private static class TestPublisher implements Publisher<ByteBuffer> {
405464
private final Publisher<ByteBuffer> wrapped;
406465
private final byte[] wrappedChecksum;

0 commit comments

Comments
 (0)