Skip to content

Commit 1d44569

Browse files
committed
Add ChunkedEncodedPublisher
This publisher supports encoding binary streams using chunked encoding. This publisher fills the same purpose that ChunkedEncodedInputStream does, but for async bodies.
1 parent 9ec6d43 commit 1d44569

File tree

3 files changed

+646
-0
lines changed

3 files changed

+646
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding;
17+
18+
import java.nio.ByteBuffer;
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.function.Function;
24+
import java.util.stream.Collectors;
25+
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
27+
import software.amazon.awssdk.annotations.SdkInternalApi;
28+
import software.amazon.awssdk.utils.Pair;
29+
import software.amazon.awssdk.utils.async.AddingTrailingDataSubscriber;
30+
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
31+
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
32+
import software.amazon.awssdk.utils.internal.MappingSubscriber;
33+
34+
/**
35+
* An implementation of chunk-transfer encoding, but by wrapping a {@link Publisher} of {@link ByteBuffer}. This implementation
36+
* supports chunk-headers, chunk-extensions.
37+
* <p>
38+
* Per <a href="https://datatracker.ietf.org/doc/html/rfc7230#section-4.1">RFC-7230</a>, a chunk-transfer encoded message is
39+
* defined as:
40+
* <pre>
41+
* chunked-body = *chunk
42+
* last-chunk
43+
* trailer-part
44+
* CRLF
45+
* chunk = chunk-size [ chunk-ext ] CRLF
46+
* chunk-data CRLF
47+
* chunk-size = 1*HEXDIG
48+
* last-chunk = 1*("0") [ chunk-ext ] CRLF
49+
* chunk-data = 1*OCTET ; a sequence of chunk-size octets
50+
*
51+
* chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
52+
* chunk-ext-name = token
53+
* chunk-ext-val = token / quoted-string
54+
* </pre>
55+
*
56+
* @see ChunkedEncodedInputStream
57+
*/
58+
@SdkInternalApi
59+
public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
60+
private static final byte[] CRLF = {'\r', '\n'};
61+
private static final byte SEMICOLON = ';';
62+
private static final byte EQUALS = '=';
63+
64+
private final Publisher<ByteBuffer> wrapped;
65+
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
66+
private final int chunkSize;
67+
private ByteBuffer chunkBuffer;
68+
private final boolean addEmptyTrailingChunk;
69+
70+
public ChunkedEncodedPublisher(Builder b) {
71+
this.wrapped = b.publisher;
72+
this.chunkSize = b.chunkSize;
73+
this.extensions.addAll(b.extensions);
74+
this.addEmptyTrailingChunk = b.addEmptyTrailingChunk;
75+
}
76+
77+
@Override
78+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
79+
Publisher<Iterable<ByteBuffer>> chunked = chunk(wrapped);
80+
Publisher<Iterable<ByteBuffer>> trailingAdded = addTrailingChunks(chunked);
81+
Publisher<ByteBuffer> flattened = flatten(trailingAdded);
82+
Publisher<ByteBuffer> encoded = map(flattened, this::encodeChunk);
83+
84+
encoded.subscribe(subscriber);
85+
}
86+
87+
public static Builder builder() {
88+
return new Builder();
89+
}
90+
91+
private Iterable<Iterable<ByteBuffer>> getTrailingChunks() {
92+
List<ByteBuffer> trailing = new ArrayList<>();
93+
94+
if (chunkBuffer != null) {
95+
chunkBuffer.flip();
96+
if (chunkBuffer.hasRemaining()) {
97+
trailing.add(chunkBuffer);
98+
}
99+
}
100+
101+
if (addEmptyTrailingChunk) {
102+
trailing.add(ByteBuffer.allocate(0));
103+
}
104+
105+
return Collections.singletonList(trailing);
106+
}
107+
108+
private Publisher<Iterable<ByteBuffer>> chunk(Publisher<ByteBuffer> upstream) {
109+
return subscriber -> {
110+
upstream.subscribe(new ChunkingSubscriber(subscriber));
111+
};
112+
}
113+
114+
private Publisher<ByteBuffer> flatten(Publisher<Iterable<ByteBuffer>> upstream) {
115+
return subscriber -> upstream.subscribe(new FlatteningSubscriber<>(subscriber));
116+
}
117+
118+
public Publisher<Iterable<ByteBuffer>> addTrailingChunks(Publisher<Iterable<ByteBuffer>> upstream) {
119+
return subscriber -> {
120+
upstream.subscribe(new AddingTrailingDataSubscriber<>(subscriber, this::getTrailingChunks));
121+
};
122+
}
123+
124+
public Publisher<ByteBuffer> map(Publisher<ByteBuffer> upstream, Function<? super ByteBuffer, ? extends ByteBuffer> mapper) {
125+
return subscriber -> upstream.subscribe(MappingSubscriber.create(subscriber, mapper));
126+
}
127+
128+
// TODO: Trailing checksum
129+
private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
130+
int contentLen = byteBuffer.remaining();
131+
byte[] chunkSizeHex = Integer.toHexString(contentLen).getBytes(StandardCharsets.UTF_8);
132+
133+
List<Pair<byte[], byte[]>> chunkExtensions = this.extensions.stream()
134+
.map(e -> {
135+
ByteBuffer duplicate = byteBuffer.duplicate();
136+
return e.get(duplicate);
137+
}).collect(Collectors.toList());
138+
139+
int extensionsLength = calculateExtensionsLength(chunkExtensions);
140+
141+
int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + CRLF.length;
142+
143+
ByteBuffer encoded = ByteBuffer.allocate(encodedLen);
144+
encoded.put(chunkSizeHex);
145+
146+
chunkExtensions.forEach(p -> {
147+
encoded.put(SEMICOLON);
148+
encoded.put(p.left());
149+
if (p.right() != null && p.right().length > 0) {
150+
encoded.put(EQUALS);
151+
encoded.put(p.right());
152+
}
153+
});
154+
155+
encoded.put(CRLF);
156+
encoded.put(byteBuffer);
157+
encoded.put(CRLF);
158+
159+
encoded.flip();
160+
161+
return encoded;
162+
}
163+
164+
private int calculateExtensionsLength(List<Pair<byte[], byte[]>> chunkExtensions) {
165+
return chunkExtensions.stream()
166+
.mapToInt(p -> {
167+
int keyLen = p.left().length;
168+
byte[] value = p.right();
169+
if (value.length > 0) {
170+
return 1 + keyLen + 1 + value.length; // ';ext-key=ext-value'
171+
}
172+
// ';ext-key
173+
return 1 + keyLen;
174+
}).sum();
175+
}
176+
177+
private class ChunkingSubscriber extends DelegatingSubscriber<ByteBuffer, Iterable<ByteBuffer>> {
178+
protected ChunkingSubscriber(Subscriber<? super Iterable<ByteBuffer>> subscriber) {
179+
super(subscriber);
180+
}
181+
182+
@Override
183+
public void onNext(ByteBuffer byteBuffer) {
184+
if (chunkBuffer == null) {
185+
chunkBuffer = ByteBuffer.allocate(chunkSize);
186+
}
187+
188+
long totalBufferedBytes = (long) chunkBuffer.position() + byteBuffer.remaining();
189+
int nBufferedChunks = (int) (totalBufferedBytes / chunkSize);
190+
191+
List<ByteBuffer> chunks = new ArrayList<>(nBufferedChunks);
192+
193+
if (nBufferedChunks > 0) {
194+
for (int i = 0; i < nBufferedChunks; i++) {
195+
ByteBuffer slice = byteBuffer.slice();
196+
int maxBytesToCopy = Math.min(chunkBuffer.remaining(), slice.remaining());
197+
slice.limit(maxBytesToCopy);
198+
199+
chunkBuffer.put(slice);
200+
if (!chunkBuffer.hasRemaining()) {
201+
chunkBuffer.flip();
202+
chunks.add(chunkBuffer);
203+
chunkBuffer = ByteBuffer.allocate(chunkSize);
204+
}
205+
206+
byteBuffer.position(byteBuffer.position() + maxBytesToCopy);
207+
}
208+
209+
if (byteBuffer.hasRemaining()) {
210+
chunkBuffer.put(byteBuffer);
211+
}
212+
} else {
213+
chunkBuffer.put(byteBuffer);
214+
}
215+
216+
subscriber.onNext(chunks);
217+
}
218+
}
219+
220+
public static class Builder {
221+
private Publisher<ByteBuffer> publisher;
222+
private int chunkSize;
223+
private boolean addEmptyTrailingChunk;
224+
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
225+
226+
public Builder publisher(Publisher<ByteBuffer> publisher) {
227+
this.publisher = publisher;
228+
return this;
229+
}
230+
231+
public Builder chunkSize(int chunkSize) {
232+
this.chunkSize = chunkSize;
233+
return this;
234+
}
235+
236+
public Builder addEmptyTrailingChunk(boolean addEmptyTrailingChunk) {
237+
this.addEmptyTrailingChunk = addEmptyTrailingChunk;
238+
return this;
239+
}
240+
241+
public Builder addExtension(ChunkExtensionProvider extension) {
242+
this.extensions.add(extension);
243+
return this;
244+
}
245+
246+
public ChunkedEncodedPublisher build() {
247+
return new ChunkedEncodedPublisher(this);
248+
}
249+
}
250+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding;
17+
18+
import io.reactivex.Flowable;
19+
import java.nio.ByteBuffer;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.reactivestreams.Publisher;
23+
import org.reactivestreams.tck.PublisherVerification;
24+
import org.reactivestreams.tck.TestEnvironment;
25+
26+
public class ChunkedEncodedPublisherTckTest extends PublisherVerification<ByteBuffer> {
27+
private static final int INPUT_STREAM_ELEMENT_SIZE = 64;
28+
private static final int CHUNK_SIZE = 16 * 1024;
29+
30+
public ChunkedEncodedPublisherTckTest() {
31+
super(new TestEnvironment());
32+
}
33+
34+
@Override
35+
public Publisher<ByteBuffer> createPublisher(long l) {
36+
return createChunkedPublisher(l);
37+
}
38+
39+
@Override
40+
public Publisher<ByteBuffer> createFailedPublisher() {
41+
return null;
42+
}
43+
44+
@Override
45+
public long maxElementsFromPublisher() {
46+
return 512;
47+
}
48+
49+
private Publisher<ByteBuffer> createChunkedPublisher(long chunksToProduce) {
50+
// max of 8 MiB
51+
long totalSize = chunksToProduce * CHUNK_SIZE;
52+
53+
int totalElements = (int) (totalSize / INPUT_STREAM_ELEMENT_SIZE);
54+
55+
byte[] content = new byte[INPUT_STREAM_ELEMENT_SIZE];
56+
57+
List<ByteBuffer> elements = new ArrayList<>();
58+
for (int i = 0; i < totalElements; i++) {
59+
elements.add(ByteBuffer.wrap(content));
60+
}
61+
62+
Publisher<ByteBuffer> inputPublisher = Flowable.fromIterable(elements);
63+
64+
return ChunkedEncodedPublisher.builder()
65+
.chunkSize(CHUNK_SIZE)
66+
.publisher(inputPublisher)
67+
.addEmptyTrailingChunk(false)
68+
.build();
69+
}
70+
}

0 commit comments

Comments
 (0)