Skip to content

Commit 24a1b68

Browse files
committed
Truncate to content in chunked publisher
Add support for ensuring that the `ChunkedEncodedPublisher` on encodes a set number of bytes.
1 parent 2120ac6 commit 24a1b68

File tree

6 files changed

+420
-14
lines changed

6 files changed

+420
-14
lines changed

core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.reactivestreams.Publisher;
2626
import org.reactivestreams.Subscriber;
2727
import software.amazon.awssdk.annotations.SdkInternalApi;
28+
import software.amazon.awssdk.http.auth.aws.internal.signer.io.ContentLengthAwareSubscriber;
2829
import software.amazon.awssdk.utils.Pair;
2930
import software.amazon.awssdk.utils.async.AddingTrailingDataSubscriber;
3031
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
@@ -33,7 +34,7 @@
3334

3435
/**
3536
* An implementation of chunk-transfer encoding, but by wrapping a {@link Publisher} of {@link ByteBuffer}. This implementation
36-
* supports chunk-headers, chunk-extensions.
37+
* supports chunk-headers, chunk-extensions, and trailer-part.
3738
* <p>
3839
* Per <a href="https://datatracker.ietf.org/doc/html/rfc7230#section-4.1">RFC-7230</a>, a chunk-transfer encoded message is
3940
* defined as:
@@ -66,6 +67,7 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
6667
private static final byte COMMA = ',';
6768

6869
private final Publisher<ByteBuffer> wrapped;
70+
private final long contentLength;
6971
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
7072
private final List<TrailerProvider> trailers = new ArrayList<>();
7173
private final int chunkSize;
@@ -74,6 +76,7 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
7476

7577
public ChunkedEncodedPublisher(Builder b) {
7678
this.wrapped = b.publisher;
79+
this.contentLength = b.contentLength;
7780
this.chunkSize = b.chunkSize;
7881
this.extensions.addAll(b.extensions);
7982
this.trailers.addAll(b.trailers);
@@ -82,7 +85,8 @@ public ChunkedEncodedPublisher(Builder b) {
8285

8386
@Override
8487
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
85-
Publisher<Iterable<ByteBuffer>> chunked = chunk(wrapped);
88+
Publisher<ByteBuffer> lengthEnforced = limitLength(wrapped, contentLength);
89+
Publisher<Iterable<ByteBuffer>> chunked = chunk(lengthEnforced);
8690
Publisher<Iterable<ByteBuffer>> trailingAdded = addTrailingChunks(chunked);
8791
Publisher<ByteBuffer> flattened = flatten(trailingAdded);
8892
Publisher<ByteBuffer> encoded = map(flattened, this::encodeChunk);
@@ -111,6 +115,10 @@ private Iterable<Iterable<ByteBuffer>> getTrailingChunks() {
111115
return Collections.singletonList(trailing);
112116
}
113117

118+
private Publisher<ByteBuffer> limitLength(Publisher<ByteBuffer> publisher, long length) {
119+
return subscriber -> publisher.subscribe(new ContentLengthAwareSubscriber(subscriber, length));
120+
}
121+
114122
private Publisher<Iterable<ByteBuffer>> chunk(Publisher<ByteBuffer> upstream) {
115123
return subscriber -> {
116124
upstream.subscribe(new ChunkingSubscriber(subscriber));
@@ -153,8 +161,7 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
153161
}
154162

155163
int trailerLen = trailerData.stream()
156-
// + 2 for each CRLF that ends the header-field
157-
.mapToInt(t -> t.remaining() + 2)
164+
.mapToInt(t -> t.remaining() + CRLF.length)
158165
.sum();
159166

160167
int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + trailerLen + CRLF.length;
@@ -188,11 +195,11 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
188195
encoded.put(t);
189196
encoded.put(CRLF);
190197
});
198+
// empty line ends the request body
191199
encoded.put(CRLF);
192200
}
193201

194202
encoded.flip();
195-
196203
return encoded;
197204
}
198205

@@ -294,6 +301,7 @@ public void onNext(ByteBuffer byteBuffer) {
294301

295302
public static class Builder {
296303
private Publisher<ByteBuffer> publisher;
304+
private long contentLength;
297305
private int chunkSize;
298306
private boolean addEmptyTrailingChunk;
299307
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
@@ -304,6 +312,15 @@ public Builder publisher(Publisher<ByteBuffer> publisher) {
304312
return this;
305313
}
306314

315+
public Publisher<ByteBuffer> publisher() {
316+
return publisher;
317+
}
318+
319+
public Builder contentLength(long contentLength) {
320+
this.contentLength = contentLength;
321+
return this;
322+
}
323+
307324
public Builder chunkSize(int chunkSize) {
308325
this.chunkSize = chunkSize;
309326
return this;
@@ -324,6 +341,10 @@ public Builder addTrailer(TrailerProvider trailerProvider) {
324341
return this;
325342
}
326343

344+
public List<TrailerProvider> trailers() {
345+
return trailers;
346+
}
347+
327348
public ChunkedEncodedPublisher build() {
328349
return new ChunkedEncodedPublisher(this);
329350
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.auth.aws.internal.signer.io;
17+
18+
import java.nio.ByteBuffer;
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
23+
/**
24+
* Decorator subscriber that limits the number of bytes sent to the wrapped subscriber to at most {@code contentLength}. Once
25+
* the given content length is reached, the upstream subscription is cancelled, and the wrapped subscriber is completed.
26+
*/
27+
@SdkInternalApi
28+
public class ContentLengthAwareSubscriber implements Subscriber<ByteBuffer> {
29+
private final Subscriber<? super ByteBuffer> subscriber;
30+
private Subscription subscription;
31+
private boolean subscriptionCancelled;
32+
private long contentLength;
33+
34+
public ContentLengthAwareSubscriber(Subscriber<? super ByteBuffer> subscriber, long contentLength) {
35+
this.subscriber = subscriber;
36+
this.contentLength = contentLength;
37+
}
38+
39+
@Override
40+
public void onSubscribe(Subscription subscription) {
41+
if (subscription == null) {
42+
throw new NullPointerException("subscription must not be null");
43+
}
44+
this.subscription = subscription;
45+
subscriber.onSubscribe(subscription);
46+
}
47+
48+
@Override
49+
public void onNext(ByteBuffer byteBuffer) {
50+
if (contentLength > 0) {
51+
long bytesToRead = Math.min(contentLength, byteBuffer.remaining());
52+
// cast is safe, min of long and int is <= max_int
53+
byteBuffer.limit(byteBuffer.position() + (int) bytesToRead);
54+
contentLength -= bytesToRead;
55+
subscriber.onNext(byteBuffer);
56+
} else if (contentLength == 0 && !subscriptionCancelled) {
57+
subscriptionCancelled = true;
58+
subscription.cancel();
59+
onComplete();
60+
}
61+
}
62+
63+
@Override
64+
public void onError(Throwable throwable) {
65+
if (throwable == null) {
66+
throw new NullPointerException("throwable cannot be null");
67+
}
68+
subscriber.onError(throwable);
69+
}
70+
71+
@Override
72+
public void onComplete() {
73+
subscriber.onComplete();
74+
}
75+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.auth.aws;
17+
18+
import io.reactivex.Flowable;
19+
import java.nio.ByteBuffer;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.PrimitiveIterator;
23+
import java.util.Random;
24+
import org.reactivestreams.Publisher;
25+
26+
public final class PublisherUtils {
27+
private static final Random RNG = new Random();
28+
29+
private PublisherUtils() {
30+
}
31+
32+
public static Publisher<ByteBuffer> randomPublisherOfLength(int bytes, int min, int max) {
33+
List<ByteBuffer> elements = new ArrayList<>();
34+
35+
PrimitiveIterator.OfInt sizeIter = RNG.ints(min, max).iterator();
36+
37+
while (bytes > 0) {
38+
int elementSize = sizeIter.next();
39+
elementSize = Math.min(elementSize, bytes);
40+
41+
bytes -= elementSize;
42+
43+
byte[] elementContent = new byte[elementSize];
44+
RNG.nextBytes(elementContent);
45+
elements.add(ByteBuffer.wrap(elementContent));
46+
}
47+
48+
return Flowable.fromIterable(elements);
49+
}
50+
}

0 commit comments

Comments
 (0)