Skip to content

Commit d49f14c

Browse files
authored
Support trailers in chunked publisher (#6264)
* Support trailers in chunked publisher This commit adds support adding the `trailer-part` of the chunked encoded format. * Review comments * Review comments
1 parent dd3571f commit d49f14c

File tree

2 files changed

+221
-8
lines changed

2 files changed

+221
-8
lines changed

core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
* chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
5252
* chunk-ext-name = token
5353
* chunk-ext-val = token / quoted-string
54+
*
55+
* trailer-part = *( header-field CRLF )
5456
* </pre>
5557
*
5658
* @see ChunkedEncodedInputStream
@@ -60,9 +62,12 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
6062
private static final byte[] CRLF = {'\r', '\n'};
6163
private static final byte SEMICOLON = ';';
6264
private static final byte EQUALS = '=';
65+
private static final byte COLON = ':';
66+
private static final byte COMMA = ',';
6367

6468
private final Publisher<ByteBuffer> wrapped;
6569
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
70+
private final List<TrailerProvider> trailers = new ArrayList<>();
6671
private final int chunkSize;
6772
private ByteBuffer chunkBuffer;
6873
private final boolean addEmptyTrailingChunk;
@@ -71,6 +76,7 @@ public ChunkedEncodedPublisher(Builder b) {
7176
this.wrapped = b.publisher;
7277
this.chunkSize = b.chunkSize;
7378
this.extensions.addAll(b.extensions);
79+
this.trailers.addAll(b.trailers);
7480
this.addEmptyTrailingChunk = b.addEmptyTrailingChunk;
7581
}
7682

@@ -125,10 +131,9 @@ public Publisher<ByteBuffer> map(Publisher<ByteBuffer> upstream, Function<? supe
125131
return subscriber -> upstream.subscribe(MappingSubscriber.create(subscriber, mapper));
126132
}
127133

128-
// TODO: Trailing checksum
129134
private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
130135
int contentLen = byteBuffer.remaining();
131-
byte[] chunkSizeHex = Integer.toHexString(contentLen).getBytes(StandardCharsets.UTF_8);
136+
byte[] chunkSizeHex = Integer.toHexString(byteBuffer.remaining()).getBytes(StandardCharsets.UTF_8);
132137

133138
List<Pair<byte[], byte[]>> chunkExtensions = this.extensions.stream()
134139
.map(e -> {
@@ -138,24 +143,54 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
138143

139144
int extensionsLength = calculateExtensionsLength(chunkExtensions);
140145

141-
int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + CRLF.length;
146+
boolean isTrailerChunk = contentLen == 0;
147+
148+
List<ByteBuffer> trailerData;
149+
if (isTrailerChunk) {
150+
trailerData = getTrailerData();
151+
} else {
152+
trailerData = Collections.emptyList();
153+
}
154+
155+
int trailerLen = trailerData.stream()
156+
// + 2 for each CRLF that ends the header-field
157+
.mapToInt(t -> t.remaining() + 2)
158+
.sum();
159+
160+
int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + trailerLen + CRLF.length;
161+
162+
if (isTrailerChunk) {
163+
encodedLen += CRLF.length;
164+
}
142165

143166
ByteBuffer encoded = ByteBuffer.allocate(encodedLen);
144-
encoded.put(chunkSizeHex);
145167

146-
chunkExtensions.forEach(p -> {
168+
encoded.put(chunkSizeHex); // chunk-size
169+
chunkExtensions.forEach(p -> { // chunk-ext
147170
encoded.put(SEMICOLON);
148171
encoded.put(p.left());
149172
if (p.right() != null && p.right().length > 0) {
150173
encoded.put(EQUALS);
151174
encoded.put(p.right());
152175
}
153176
});
154-
155-
encoded.put(CRLF);
156-
encoded.put(byteBuffer);
157177
encoded.put(CRLF);
158178

179+
// chunk-data
180+
if (byteBuffer.hasRemaining()) {
181+
encoded.put(byteBuffer);
182+
encoded.put(CRLF);
183+
}
184+
185+
if (isTrailerChunk) {
186+
// trailer-part
187+
trailerData.forEach(t -> {
188+
encoded.put(t);
189+
encoded.put(CRLF);
190+
});
191+
encoded.put(CRLF);
192+
}
193+
159194
encoded.flip();
160195

161196
return encoded;
@@ -174,6 +209,46 @@ private int calculateExtensionsLength(List<Pair<byte[], byte[]>> chunkExtensions
174209
}).sum();
175210
}
176211

212+
private List<ByteBuffer> getTrailerData() {
213+
List<ByteBuffer> data = new ArrayList<>();
214+
215+
for (TrailerProvider provider : trailers) {
216+
Pair<String, List<String>> trailer = provider.get();
217+
218+
byte[] key = trailer.left().getBytes(StandardCharsets.UTF_8);
219+
List<byte[]> values = trailer.right()
220+
.stream().map(v -> v.getBytes(StandardCharsets.UTF_8))
221+
.collect(Collectors.toList());
222+
223+
if (values.isEmpty()) {
224+
throw new RuntimeException(String.format("Trailing header '%s' has no values", trailer.left()));
225+
}
226+
227+
int valuesLen = values.stream().mapToInt(v -> v.length).sum();
228+
// name:value1,value2,..
229+
int size = key.length
230+
+ 1 // colon
231+
+ valuesLen
232+
+ values.size() - 1; // commas
233+
234+
ByteBuffer trailerData = ByteBuffer.allocate(size);
235+
236+
trailerData.put(key);
237+
trailerData.put(COLON);
238+
239+
for (int i = 0; i < values.size(); ++i) {
240+
trailerData.put(values.get(i));
241+
if (i + 1 != values.size()) {
242+
trailerData.put(COMMA);
243+
}
244+
}
245+
246+
trailerData.flip();
247+
data.add(trailerData);
248+
}
249+
return data;
250+
}
251+
177252
private class ChunkingSubscriber extends DelegatingSubscriber<ByteBuffer, Iterable<ByteBuffer>> {
178253
protected ChunkingSubscriber(Subscriber<? super Iterable<ByteBuffer>> subscriber) {
179254
super(subscriber);
@@ -222,6 +297,7 @@ public static class Builder {
222297
private int chunkSize;
223298
private boolean addEmptyTrailingChunk;
224299
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
300+
private final List<TrailerProvider> trailers = new ArrayList<>();
225301

226302
public Builder publisher(Publisher<ByteBuffer> publisher) {
227303
this.publisher = publisher;
@@ -243,6 +319,11 @@ public Builder addExtension(ChunkExtensionProvider extension) {
243319
return this;
244320
}
245321

322+
public Builder addTrailer(TrailerProvider trailerProvider) {
323+
this.trailers.add(trailerProvider);
324+
return this;
325+
}
326+
246327
public ChunkedEncodedPublisher build() {
247328
return new ChunkedEncodedPublisher(this);
248329
}

core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisherTest.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.nio.ByteBuffer;
2222
import java.nio.charset.StandardCharsets;
2323
import java.util.ArrayList;
24+
import java.util.Arrays;
25+
import java.util.Collections;
2426
import java.util.List;
2527
import java.util.PrimitiveIterator;
2628
import java.util.Random;
@@ -46,6 +48,108 @@ public void setup() {
4648
CRC32.reset();
4749
}
4850

51+
@Test
52+
public void subscribe_publisherEmpty_onlyProducesTrailer() {
53+
Publisher<ByteBuffer> emptyPublisher = Flowable.empty();
54+
55+
ChunkedEncodedPublisher build = newChunkedBuilder(emptyPublisher)
56+
.addTrailer(() -> Pair.of("foo", Collections.singletonList("1")))
57+
.addTrailer(() -> Pair.of("bar", Collections.singletonList("2")))
58+
.addEmptyTrailingChunk(true)
59+
.build();
60+
61+
List<ByteBuffer> chunks = getAllElements(build);
62+
63+
assertThat(chunks.size()).isEqualTo(1);
64+
65+
String trailerAsString = StandardCharsets.UTF_8.decode(chunks.get(0)).toString();
66+
67+
assertThat(trailerAsString).isEqualTo(
68+
"0\r\n" +
69+
"foo:1\r\n" +
70+
"bar:2\r\n" +
71+
"\r\n");
72+
}
73+
74+
@Test
75+
void subscribe_trailerProviderPresent_trailerPartAdded() {
76+
TestPublisher upstream = randomPublisherOfLength(8);
77+
78+
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");
79+
80+
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
81+
.publisher(upstream)
82+
.chunkSize(CHUNK_SIZE)
83+
.addEmptyTrailingChunk(true)
84+
.addTrailer(trailerProvider)
85+
.build();
86+
87+
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
88+
89+
String expectedTrailer = "foo:bar";
90+
String trailerAsString = StandardCharsets.UTF_8.decode(chunks.get(1)).toString().trim();
91+
assertThat(trailerAsString).endsWith(expectedTrailer);
92+
}
93+
94+
@Test
95+
void subscribe_trailerProviderPresent_multipleValues_trailerPartAdded() {
96+
TestPublisher upstream = randomPublisherOfLength(8);
97+
98+
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", Arrays.asList("bar1", "bar2", "bar3"));
99+
100+
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
101+
.publisher(upstream)
102+
.chunkSize(CHUNK_SIZE)
103+
.addEmptyTrailingChunk(true)
104+
.addTrailer(trailerProvider)
105+
.build();
106+
107+
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
108+
109+
String expectedTrailer = "foo:bar1,bar2,bar3";
110+
String trailerAsString = StandardCharsets.UTF_8.decode(chunks.get(1)).toString().trim();
111+
assertThat(trailerAsString).endsWith(expectedTrailer);
112+
}
113+
114+
@Test
115+
void subscribe_trailerProviderPresent_onlyInvokedOnce() {
116+
TestPublisher upstream = randomPublisherOfLength(8);
117+
118+
TrailerProvider trailerProvider = Mockito.spy(new StaticTrailerProvider("foo", "bar"));
119+
120+
ChunkedEncodedPublisher chunkedPublisher = ChunkedEncodedPublisher.builder()
121+
.publisher(upstream)
122+
.addEmptyTrailingChunk(true)
123+
.chunkSize(CHUNK_SIZE)
124+
.addTrailer(trailerProvider).build();
125+
126+
getAllElements(chunkedPublisher);
127+
128+
Mockito.verify(trailerProvider, Mockito.times(1)).get();
129+
}
130+
131+
@Test
132+
void subscribe_trailerPresent_trailerFormattedCorrectly() {
133+
TestPublisher testPublisher = randomPublisherOfLength(32);
134+
135+
TrailerProvider trailerProvider = new StaticTrailerProvider("foo", "bar");
136+
137+
ChunkedEncodedPublisher chunkedPublisher = newChunkedBuilder(testPublisher)
138+
.addTrailer(trailerProvider)
139+
.addEmptyTrailingChunk(true)
140+
.build();
141+
142+
List<ByteBuffer> chunks = getAllElements(chunkedPublisher);
143+
144+
ByteBuffer last = chunks.get(chunks.size() - 1);
145+
146+
String expected = "0\r\n" +
147+
"foo:bar\r\n" +
148+
"\r\n";
149+
150+
assertThat(chunkAsString(last)).isEqualTo(expected);
151+
}
152+
49153
@Test
50154
void subscribe_wrappedDoesNotFillBuffer_allDataInSingleChunk() {
51155
ByteBuffer element = ByteBuffer.wrap("hello world".getBytes(StandardCharsets.UTF_8));
@@ -212,6 +316,10 @@ void subscribe_extensionsPresent_extensionsInvokedForEachChunk() {
212316
}
213317
}
214318

319+
private static ChunkedEncodedPublisher.Builder newChunkedBuilder(Publisher<ByteBuffer> publisher) {
320+
return ChunkedEncodedPublisher.builder().publisher(publisher).chunkSize(CHUNK_SIZE);
321+
}
322+
215323
private TestPublisher randomPublisherOfLength(int bytes) {
216324
List<ByteBuffer> elements = new ArrayList<>();
217325

@@ -239,6 +347,10 @@ private List<ByteBuffer> getAllElements(Publisher<ByteBuffer> publisher) {
239347
return Flowable.fromPublisher(publisher).toList().blockingGet();
240348
}
241349

350+
private String chunkAsString(ByteBuffer chunk) {
351+
return StandardCharsets.UTF_8.decode(chunk).toString();
352+
}
353+
242354
private String getHeaderAsString(ByteBuffer chunk) {
243355
return StandardCharsets.UTF_8.decode(getHeader(chunk)).toString();
244356
}
@@ -323,4 +435,24 @@ public Pair<byte[], byte[]> get(ByteBuffer chunk) {
323435
return Pair.of(key, value);
324436
}
325437
}
438+
439+
private static class StaticTrailerProvider implements TrailerProvider {
440+
private final String key;
441+
private final List<String> values;
442+
443+
public StaticTrailerProvider(String key, String value) {
444+
this.key = key;
445+
this.values = Collections.singletonList(value);
446+
}
447+
448+
public StaticTrailerProvider(String key, List<String> values) {
449+
this.key = key;
450+
this.values = values;
451+
}
452+
453+
@Override
454+
public Pair<String, List<String>> get() {
455+
return Pair.of(key, values);
456+
}
457+
}
326458
}

0 commit comments

Comments
 (0)