Skip to content

Truncate to content in chunked publisher #6304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.Pair;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.AddingTrailingDataSubscriber;
import software.amazon.awssdk.utils.async.ContentLengthAwareSubscriber;
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
import software.amazon.awssdk.utils.internal.MappingSubscriber;

/**
* An implementation of chunk-transfer encoding, but by wrapping a {@link Publisher} of {@link ByteBuffer}. This implementation
* supports chunk-headers, chunk-extensions.
* supports chunk-headers, chunk-extensions, and trailer-part.
* <p>
* Per <a href="https://datatracker.ietf.org/doc/html/rfc7230#section-4.1">RFC-7230</a>, a chunk-transfer encoded message is
* defined as:
Expand Down Expand Up @@ -66,6 +68,7 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
private static final byte COMMA = ',';

private final Publisher<ByteBuffer> wrapped;
private final long contentLength;
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
private final List<TrailerProvider> trailers = new ArrayList<>();
private final int chunkSize;
Expand All @@ -74,6 +77,7 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {

public ChunkedEncodedPublisher(Builder b) {
this.wrapped = b.publisher;
this.contentLength = Validate.notNull(b.contentLength, "contentLength must not be null");
this.chunkSize = b.chunkSize;
this.extensions.addAll(b.extensions);
this.trailers.addAll(b.trailers);
Expand All @@ -82,7 +86,8 @@ public ChunkedEncodedPublisher(Builder b) {

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
Publisher<Iterable<ByteBuffer>> chunked = chunk(wrapped);
Publisher<ByteBuffer> lengthEnforced = limitLength(wrapped, contentLength);
Publisher<Iterable<ByteBuffer>> chunked = chunk(lengthEnforced);
Publisher<Iterable<ByteBuffer>> trailingAdded = addTrailingChunks(chunked);
Publisher<ByteBuffer> flattened = flatten(trailingAdded);
Publisher<ByteBuffer> encoded = map(flattened, this::encodeChunk);
Expand Down Expand Up @@ -111,6 +116,10 @@ private Iterable<Iterable<ByteBuffer>> getTrailingChunks() {
return Collections.singletonList(trailing);
}

private Publisher<ByteBuffer> limitLength(Publisher<ByteBuffer> publisher, long length) {
return subscriber -> publisher.subscribe(new ContentLengthAwareSubscriber(subscriber, length));
}

private Publisher<Iterable<ByteBuffer>> chunk(Publisher<ByteBuffer> upstream) {
return subscriber -> {
upstream.subscribe(new ChunkingSubscriber(subscriber));
Expand Down Expand Up @@ -153,8 +162,7 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
}

int trailerLen = trailerData.stream()
// + 2 for each CRLF that ends the header-field
.mapToInt(t -> t.remaining() + 2)
.mapToInt(t -> t.remaining() + CRLF.length)
.sum();

int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + trailerLen + CRLF.length;
Expand Down Expand Up @@ -188,11 +196,11 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
encoded.put(t);
encoded.put(CRLF);
});
// empty line ends the request body
encoded.put(CRLF);
}

encoded.flip();

return encoded;
}

Expand Down Expand Up @@ -294,6 +302,7 @@ public void onNext(ByteBuffer byteBuffer) {

public static class Builder {
private Publisher<ByteBuffer> publisher;
private Long contentLength;
private int chunkSize;
private boolean addEmptyTrailingChunk;
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
Expand All @@ -304,6 +313,15 @@ public Builder publisher(Publisher<ByteBuffer> publisher) {
return this;
}

public Publisher<ByteBuffer> publisher() {
return publisher;
}

public Builder contentLength(long contentLength) {
this.contentLength = contentLength;
return this;
}

public Builder chunkSize(int chunkSize) {
this.chunkSize = chunkSize;
return this;
Expand All @@ -324,6 +342,10 @@ public Builder addTrailer(TrailerProvider trailerProvider) {
return this;
}

public List<TrailerProvider> trailers() {
return trailers;
}

public ChunkedEncodedPublisher build() {
return new ChunkedEncodedPublisher(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import io.reactivex.Flowable;
import io.reactivex.subscribers.TestSubscriber;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -26,6 +27,7 @@
import java.util.List;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -56,6 +58,7 @@ public void subscribe_publisherEmpty_onlyProducesTrailer() {
.addTrailer(() -> Pair.of("foo", Collections.singletonList("1")))
.addTrailer(() -> Pair.of("bar", Collections.singletonList("2")))
.addEmptyTrailingChunk(true)
.contentLength(0)
.build();

List<ByteBuffer> chunks = getAllElements(build);
Expand All @@ -73,12 +76,14 @@ public void subscribe_publisherEmpty_onlyProducesTrailer() {

@Test
void subscribe_trailerProviderPresent_trailerPartAdded() {
int contentLength = 8;
TestPublisher upstream = randomPublisherOfLength(8);

TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");

ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
.publisher(upstream)
.contentLength(contentLength)
.chunkSize(CHUNK_SIZE)
.addEmptyTrailingChunk(true)
.addTrailer(trailerProvider)
Expand All @@ -93,12 +98,14 @@ void subscribe_trailerProviderPresent_trailerPartAdded() {

@Test
void subscribe_trailerProviderPresent_multipleValues_trailerPartAdded() {
TestPublisher upstream = randomPublisherOfLength(8);
int contentLength = 8;
TestPublisher upstream = randomPublisherOfLength(contentLength);

TrailerProvider trailerProvider = new StaticTrailerProvider("foo", Arrays.asList("bar1", "bar2", "bar3"));

ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
.publisher(upstream)
.contentLength(contentLength)
.chunkSize(CHUNK_SIZE)
.addEmptyTrailingChunk(true)
.addTrailer(trailerProvider)
Expand All @@ -113,14 +120,16 @@ void subscribe_trailerProviderPresent_multipleValues_trailerPartAdded() {

@Test
void subscribe_trailerProviderPresent_onlyInvokedOnce() {
TestPublisher upstream = randomPublisherOfLength(8);
int contentLength = 8;
TestPublisher upstream = randomPublisherOfLength(contentLength);

TrailerProvider trailerProvider = Mockito.spy(new StaticTrailerProvider("foo", "bar"));

ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
.publisher(upstream)
.addEmptyTrailingChunk(true)
.chunkSize(CHUNK_SIZE)
.contentLength(contentLength)
.addTrailer(trailerProvider).build();

getAllElements(chunkedPublisher);
Expand All @@ -130,13 +139,15 @@ void subscribe_trailerProviderPresent_onlyInvokedOnce() {

@Test
void subscribe_trailerPresent_trailerFormattedCorrectly() {
TestPublisher testPublisher = randomPublisherOfLength(32);
int contentLength = 32;
TestPublisher testPublisher = randomPublisherOfLength(contentLength);

TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");

ChunkedEncodedPublisher chunkedPublisher = newChunkedBuilder(testPublisher)
.addTrailer(trailerProvider)
.addEmptyTrailingChunk(true)
.contentLength(contentLength)
.build();

List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
Expand All @@ -152,11 +163,13 @@ void subscribe_trailerPresent_trailerFormattedCorrectly() {

@Test
void subscribe_wrappedDoesNotFillBuffer_allDataInSingleChunk() {
ByteBuffer element = ByteBuffer.wrap("hello world".getBytes(StandardCharsets.UTF_8));
byte[] content = "hello world".getBytes(StandardCharsets.UTF_8);
ByteBuffer element = ByteBuffer.wrap(content);
Flowable<ByteBuffer> upstream = Flowable.just(element.duplicate());

ChunkedEncodedPublisher publisher = ChunkedEncodedPublisher.builder()
.chunkSize(CHUNK_SIZE)
.contentLength(content.length)
.publisher(upstream)
.build();

Expand All @@ -169,7 +182,8 @@ void subscribe_wrappedDoesNotFillBuffer_allDataInSingleChunk() {

@Test
void subscribe_extensionHasNoValue_formattedCorrectly() {
TestPublisher testPublisher = randomPublisherOfLength(8);
int contentLength = 8;
TestPublisher testPublisher = randomPublisherOfLength(contentLength);

ChunkExtensionProvider extensionProvider = new StaticExtensionProvider("foo", "");

Expand All @@ -178,6 +192,7 @@ void subscribe_extensionHasNoValue_formattedCorrectly() {
.publisher(testPublisher)
.addExtension(extensionProvider)
.chunkSize(CHUNK_SIZE)
.contentLength(contentLength)
.build();

List<ByteBuffer> chunks = getAllElements(chunkPublisher);
Expand All @@ -187,11 +202,13 @@ void subscribe_extensionHasNoValue_formattedCorrectly() {

@Test
void subscribe_multipleExtensions_formattedCorrectly() {
TestPublisher testPublisher = randomPublisherOfLength(8);
int contentLength = 8;
TestPublisher testPublisher = randomPublisherOfLength(contentLength);

ChunkedEncodedPublisher.Builder chunkPublisher =
ChunkedEncodedPublisher.builder()
.publisher(testPublisher)
.contentLength(contentLength)
.chunkSize(CHUNK_SIZE);

Stream.of("1", "2", "3")
Expand All @@ -207,10 +224,12 @@ void subscribe_multipleExtensions_formattedCorrectly() {
void subscribe_randomElementSizes_dataChunkedCorrectly() {
for (int i = 0; i < 512; ++i) {
int nChunks = 24;
TestPublisher byteBufferPublisher = randomPublisherOfLength(CHUNK_SIZE * 24);
int contentLength = nChunks * CHUNK_SIZE;
TestPublisher byteBufferPublisher = randomPublisherOfLength(contentLength);

ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
.publisher(byteBufferPublisher)
.contentLength(contentLength)
.chunkSize(CHUNK_SIZE)
.build();

Expand All @@ -232,14 +251,16 @@ void subscribe_randomElementSizes_dataChunkedCorrectly() {
void subscribe_randomElementSizes_chunksHaveExtensions_dataChunkedCorrectly() {
for (int i = 0; i < 512; ++i) {
int nChunks = 24;
TestPublisher byteBufferPublisher = randomPublisherOfLength(CHUNK_SIZE * 24);
int contentLength = CHUNK_SIZE * nChunks;
TestPublisher byteBufferPublisher = randomPublisherOfLength(contentLength);

StaticExtensionProvider extensionProvider = Mockito.spy(new StaticExtensionProvider("foo", "bar"));

ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
.publisher(byteBufferPublisher)
.addExtension(extensionProvider)
.chunkSize(CHUNK_SIZE)
.contentLength(contentLength)
.build();

List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
Expand All @@ -264,12 +285,14 @@ void subscribe_randomElementSizes_chunksHaveExtensions_dataChunkedCorrectly() {

@Test
void subscribe_addTrailingChunkTrue_trailingChunkAdded() {
TestPublisher testPublisher = randomPublisherOfLength(CHUNK_SIZE * 2);
int contentLength = CHUNK_SIZE * 2;
TestPublisher testPublisher = randomPublisherOfLength(contentLength);

ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
.publisher(testPublisher)
.chunkSize(CHUNK_SIZE)
.addEmptyTrailingChunk(true)
.contentLength(contentLength)
.build();

List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
Expand All @@ -285,7 +308,12 @@ void subscribe_addTrailingChunkTrue_upstreamEmpty_trailingChunkAdded() {
Publisher<ByteBuffer> empty = Flowable.empty();

ChunkedEncodedPublisher chunkedPublisher =
ChunkedEncodedPublisher.builder().publisher(empty).chunkSize(CHUNK_SIZE).addEmptyTrailingChunk(true).build();
ChunkedEncodedPublisher.builder()
.publisher(empty)
.chunkSize(CHUNK_SIZE)
.addEmptyTrailingChunk(true)
.contentLength(0)
.build();

List<ByteBuffer> chunks = getAllElements(chunkedPublisher);

Expand All @@ -297,10 +325,15 @@ void subscribe_extensionsPresent_extensionsInvokedForEachChunk() {
ChunkExtensionProvider mockProvider = Mockito.spy(new StaticExtensionProvider("foo", "bar"));

int nChunks = 16;
TestPublisher elements = randomPublisherOfLength(nChunks * CHUNK_SIZE);
int contentLength = CHUNK_SIZE * nChunks;
TestPublisher elements = randomPublisherOfLength(contentLength);

ChunkedEncodedPublisher chunkPublisher =
ChunkedEncodedPublisher.builder().publisher(elements).chunkSize(CHUNK_SIZE).addExtension(mockProvider).build();
ChunkedEncodedPublisher chunkPublisher = ChunkedEncodedPublisher.builder()
.publisher(elements)
.contentLength(contentLength)
.chunkSize(CHUNK_SIZE)
.addExtension(mockProvider)
.build();

List<ByteBuffer> chunks = getAllElements(chunkPublisher);

Expand All @@ -316,6 +349,28 @@ void subscribe_extensionsPresent_extensionsInvokedForEachChunk() {
}
}

@Test
void subscribe_wrappedExceedsContentLength_dataTruncatedToLength() {
int contentLength = CHUNK_SIZE * 4 - 1;
TestPublisher elements = randomPublisherOfLength(contentLength * 2);

TestSubscriber<ByteBuffer> subscriber = new TestSubscriber<>();
ChunkedEncodedPublisher chunkPublisher = newChunkedBuilder(elements).contentLength(contentLength)
.build();

chunkPublisher.subscribe(subscriber);

subscriber.awaitTerminalEvent(30, TimeUnit.SECONDS);

int totalRemaining = subscriber.values()
.stream()
.map(this::stripEncoding)
.mapToInt(ByteBuffer::remaining)
.sum();

assertThat(totalRemaining).isEqualTo(contentLength);
}

private static ChunkedEncodedPublisher.Builder newChunkedBuilder(Publisher<ByteBuffer> publisher) {
return ChunkedEncodedPublisher.builder().publisher(publisher).chunkSize(CHUNK_SIZE);
}
Expand Down Expand Up @@ -401,6 +456,10 @@ private ByteBuffer stripEncoding(ByteBuffer chunk) {
return stripped;
}

private long totalRemaining(List<ByteBuffer> buffers) {
return buffers.stream().mapToLong(ByteBuffer::remaining).sum();
}

private static class TestPublisher implements Publisher<ByteBuffer> {
private final Publisher<ByteBuffer> wrapped;
private final byte[] wrappedChecksum;
Expand Down
Loading
Loading