Skip to content

Commit 03e5a62

Browse files
committed
Support trailers in chunked publisher
This commit adds support adding the `trailer-part` of the chunked encoded format.
1 parent 8724d3a commit 03e5a62

File tree

2 files changed

+183
-7
lines changed

2 files changed

+183
-7
lines changed

core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
* chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
5252
* chunk-ext-name = token
5353
* chunk-ext-val = token / quoted-string
54+
*
55+
* trailer-part = *( header-field CRLF )
5456
* </pre>
5557
*
5658
* @see ChunkedEncodedInputStream
@@ -60,9 +62,12 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
6062
private static final byte[] CRLF = {'\r', '\n'};
6163
private static final byte SEMICOLON = ';';
6264
private static final byte EQUALS = '=';
65+
private static final byte COLON = ':';
66+
private static final byte COMMA = ',';
6367

6468
private final Publisher<ByteBuffer> wrapped;
6569
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
70+
private final List<TrailerProvider> trailers = new ArrayList<>();
6671
private final int chunkSize;
6772
private ByteBuffer chunkBuffer;
6873
private final boolean addEmptyTrailingChunk;
@@ -71,6 +76,7 @@ public ChunkedEncodedPublisher(Builder b) {
7176
this.wrapped = b.publisher;
7277
this.chunkSize = b.chunkSize;
7378
this.extensions.addAll(b.extensions);
79+
this.trailers.addAll(b.trailers);
7480
this.addEmptyTrailingChunk = b.addEmptyTrailingChunk;
7581
}
7682

@@ -125,7 +131,6 @@ public Publisher<ByteBuffer> map(Publisher<ByteBuffer> upstream, Function<? supe
125131
return subscriber -> upstream.subscribe(MappingSubscriber.create(subscriber, mapper));
126132
}
127133

128-
// TODO: Trailing checksum
129134
private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
130135
int contentLen = byteBuffer.remaining();
131136
byte[] chunkSizeHex = Integer.toHexString(contentLen).getBytes(StandardCharsets.UTF_8);
@@ -138,24 +143,46 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
138143

139144
int extensionsLength = calculateExtensionsLength(chunkExtensions);
140145

141-
int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + CRLF.length;
146+
boolean isTrailerChunk = contentLen == 0;
147+
148+
List<ByteBuffer> trailerData;
149+
if (isTrailerChunk) {
150+
trailerData = getTrailerData();
151+
} else {
152+
trailerData = Collections.emptyList();
153+
}
154+
155+
int trailerLen = trailerData.stream().mapToInt(t -> t.remaining() + 2).sum();
156+
157+
int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + trailerLen + CRLF.length;
142158

143159
ByteBuffer encoded = ByteBuffer.allocate(encodedLen);
144-
encoded.put(chunkSizeHex);
145160

146-
chunkExtensions.forEach(p -> {
161+
encoded.put(chunkSizeHex); // chunk-size
162+
chunkExtensions.forEach(p -> { // chunk-ext
147163
encoded.put(SEMICOLON);
148164
encoded.put(p.left());
149165
if (p.right() != null && p.right().length > 0) {
150166
encoded.put(EQUALS);
151167
encoded.put(p.right());
152168
}
153169
});
154-
155-
encoded.put(CRLF);
156-
encoded.put(byteBuffer);
157170
encoded.put(CRLF);
158171

172+
// chunk-data
173+
if (byteBuffer.hasRemaining()) {
174+
encoded.put(byteBuffer);
175+
encoded.put(CRLF);
176+
}
177+
178+
if (isTrailerChunk) {
179+
// trailer-part
180+
trailerData.forEach(t -> {
181+
encoded.put(t);
182+
encoded.put(CRLF);
183+
});
184+
}
185+
159186
encoded.flip();
160187

161188
return encoded;
@@ -174,6 +201,41 @@ private int calculateExtensionsLength(List<Pair<byte[], byte[]>> chunkExtensions
174201
}).sum();
175202
}
176203

204+
private List<ByteBuffer> getTrailerData() {
205+
List<ByteBuffer> data = new ArrayList<>();
206+
207+
for (TrailerProvider provider : trailers) {
208+
Pair<String, List<String>> trailer = provider.get();
209+
210+
byte[] key = trailer.left().getBytes(StandardCharsets.UTF_8);
211+
List<byte[]> values = trailer.right()
212+
.stream().map(v -> v.getBytes(StandardCharsets.UTF_8))
213+
.collect(Collectors.toList());
214+
int valuesLen = values.stream().mapToInt(v -> v.length).sum();
215+
// name:value1,value2,..
216+
int size = key.length
217+
+ 1 // colon
218+
+ valuesLen
219+
+ values.size() - 1; // commas
220+
221+
ByteBuffer trailerData = ByteBuffer.allocate(size);
222+
223+
trailerData.put(key);
224+
trailerData.put(COLON);
225+
226+
for (int i = 0; i < values.size(); ++i) {
227+
trailerData.put(values.get(i));
228+
if (i + 1 != values.size()) {
229+
trailerData.put(COMMA);
230+
}
231+
}
232+
233+
trailerData.flip();
234+
data.add(trailerData);
235+
}
236+
return data;
237+
}
238+
177239
private class ChunkingSubscriber extends DelegatingSubscriber<ByteBuffer, Iterable<ByteBuffer>> {
178240
protected ChunkingSubscriber(Subscriber<? super Iterable<ByteBuffer>> subscriber) {
179241
super(subscriber);
@@ -222,6 +284,7 @@ public static class Builder {
222284
private int chunkSize;
223285
private boolean addEmptyTrailingChunk;
224286
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
287+
private final List<TrailerProvider> trailers = new ArrayList<>();
225288

226289
public Builder publisher(Publisher<ByteBuffer> publisher) {
227290
this.publisher = publisher;
@@ -243,6 +306,11 @@ public Builder addExtension(ChunkExtensionProvider extension) {
243306
return this;
244307
}
245308

309+
public Builder addTrailer(TrailerProvider trailerProvider) {
310+
this.trailers.add(trailerProvider);
311+
return this;
312+
}
313+
246314
public ChunkedEncodedPublisher build() {
247315
return new ChunkedEncodedPublisher(this);
248316
}

core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisherTest.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.nio.ByteBuffer;
2222
import java.nio.charset.StandardCharsets;
2323
import java.util.ArrayList;
24+
import java.util.Arrays;
25+
import java.util.Collections;
2426
import java.util.List;
2527
import java.util.PrimitiveIterator;
2628
import java.util.Random;
@@ -46,6 +48,84 @@ public void setup() {
4648
CRC32.reset();
4749
}
4850

51+
@Test
52+
void subscribe_trailerProviderPresent_trailerPartAdded() {
53+
TestPublisher upstream = randomPublisherOfLength(8);
54+
55+
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");
56+
57+
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
58+
.publisher(upstream)
59+
.chunkSize(CHUNK_SIZE)
60+
.addEmptyTrailingChunk(true)
61+
.addTrailer(trailerProvider)
62+
.build();
63+
64+
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
65+
66+
String expectedTrailer = "foo:bar";
67+
String trailerAsString = StandardCharsets.UTF_8.decode(chunks.get(1)).toString().trim();
68+
assertThat(trailerAsString).endsWith(expectedTrailer);
69+
}
70+
71+
@Test
72+
void subscribe_trailerProviderPresent_multipleValues_trailerPartAdded() {
73+
TestPublisher upstream = randomPublisherOfLength(8);
74+
75+
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", Arrays.asList("bar1", "bar2", "bar3"));
76+
77+
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
78+
.publisher(upstream)
79+
.chunkSize(CHUNK_SIZE)
80+
.addEmptyTrailingChunk(true)
81+
.addTrailer(trailerProvider)
82+
.build();
83+
84+
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
85+
86+
String expectedTrailer = "foo:bar1,bar2,bar3";
87+
String trailerAsString = StandardCharsets.UTF_8.decode(chunks.get(1)).toString().trim();
88+
assertThat(trailerAsString).endsWith(expectedTrailer);
89+
}
90+
91+
@Test
92+
void subscribe_trailerProviderPresent_onlyInvokedOnce() {
93+
TestPublisher upstream = randomPublisherOfLength(8);
94+
95+
TrailerProvider trailerProvider = Mockito.spy(new StaticTrailerProvider("foo", "bar"));
96+
97+
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
98+
.publisher(upstream)
99+
.addEmptyTrailingChunk(true)
100+
.chunkSize(CHUNK_SIZE)
101+
.addTrailer(trailerProvider).build();
102+
103+
getAllElements(chunkedPublisher);
104+
105+
Mockito.verify(trailerProvider, Mockito.times(1)).get();
106+
}
107+
108+
@Test
109+
void subscribe_trailerPresent_trailerFormattedCorrectly() {
110+
TestPublisher testPublisher = randomPublisherOfLength(32);
111+
112+
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");
113+
114+
ChunkedEncodedPublisher chunkedPublisher = newChunkedBuilder(testPublisher)
115+
.addTrailer(trailerProvider)
116+
.addEmptyTrailingChunk(true)
117+
.build();
118+
119+
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
120+
121+
ByteBuffer last = chunks.get(chunks.size() - 1);
122+
123+
String expected = "0\r\n" +
124+
"foo:bar\r\n";
125+
126+
assertThat(chunkAsString(last)).isEqualTo(expected);
127+
}
128+
49129
@Test
50130
void subscribe_wrappedDoesNotFillBuffer_allDataInSingleChunk() {
51131
ByteBuffer element = ByteBuffer.wrap("hello world".getBytes(StandardCharsets.UTF_8));
@@ -212,6 +292,10 @@ void subscribe_extensionsPresent_extensionsInvokedForEachChunk() {
212292
}
213293
}
214294

295+
private static ChunkedEncodedPublisher.Builder newChunkedBuilder(Publisher<ByteBuffer> publisher) {
296+
return ChunkedEncodedPublisher.builder().publisher(publisher).chunkSize(CHUNK_SIZE);
297+
}
298+
215299
private TestPublisher randomPublisherOfLength(int bytes) {
216300
List<ByteBuffer> elements = new ArrayList<>();
217301

@@ -239,6 +323,10 @@ private List<ByteBuffer> getAllElements(Publisher<ByteBuffer> publisher) {
239323
return Flowable.fromPublisher(publisher).toList().blockingGet();
240324
}
241325

326+
private String chunkAsString(ByteBuffer chunk) {
327+
return StandardCharsets.UTF_8.decode(chunk).toString();
328+
}
329+
242330
private String getHeaderAsString(ByteBuffer chunk) {
243331
return StandardCharsets.UTF_8.decode(getHeader(chunk)).toString();
244332
}
@@ -323,4 +411,24 @@ public Pair<byte[], byte[]> get(ByteBuffer chunk) {
323411
return Pair.of(key, value);
324412
}
325413
}
414+
415+
private static class StaticTrailerProvider implements TrailerProvider {
416+
private final String key;
417+
private final List<String> values;
418+
419+
public StaticTrailerProvider(String key, String value) {
420+
this.key = key;
421+
this.values = Collections.singletonList(value);
422+
}
423+
424+
public StaticTrailerProvider(String key, List<String> values) {
425+
this.key = key;
426+
this.values = values;
427+
}
428+
429+
@Override
430+
public Pair<String, List<String>> get() {
431+
return Pair.of(key, values);
432+
}
433+
}
326434
}

0 commit comments

Comments
 (0)