Skip to content

Commit 5b2f021

Browse files
committed
Fix empty checksum body not published
This commit fixes an issue with ChecksumCalculatingAsyncRequestBody where an explicit `contentLengthHeader` configuration of 0L leads to no checksum chunk being published at all if the input (pre checksum) body is *NOT* empty. The issue is that internally, the input body is split into chunks by `SynchronousChunkBuffer`, which is aware of the how many bytes total will be sent to the service. If this value is 0, it nevers calls onNext on the downstream publisher. The fix is to move the usage of `alwaysInvokeOnNext` to wrap the publisher after we have mapped chunked up the input data.
1 parent c71226b commit 5b2f021

File tree

3 files changed

+91
-38
lines changed

3 files changed

+91
-38
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fix an issue where the trailing checksum of a request body is not sent when the `Content-Length` header is explicitly set to `0`."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,10 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
179179
if (sdkChecksum != null) {
180180
sdkChecksum.reset();
181181
}
182+
182183
SynchronousChunkBuffer synchronousChunkBuffer = new SynchronousChunkBuffer(totalBytes);
183-
alwaysInvokeOnNext(wrapped).flatMapIterable(synchronousChunkBuffer::buffer)
184-
.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes));
184+
alwaysInvokeOnNext(wrapped.flatMapIterable(synchronousChunkBuffer::buffer))
185+
.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes));
185186
}
186187

187188
private SdkPublisher<ByteBuffer> alwaysInvokeOnNext(SdkPublisher<ByteBuffer> source) {

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBodyTest.java

Lines changed: 82 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,12 @@
3434
import org.assertj.core.util.Lists;
3535
import org.junit.jupiter.api.Test;
3636
import org.junit.jupiter.params.ParameterizedTest;
37-
import org.junit.jupiter.params.provider.Arguments;
3837
import org.junit.jupiter.params.provider.MethodSource;
3938
import org.reactivestreams.Publisher;
4039
import org.reactivestreams.Subscriber;
4140
import org.reactivestreams.Subscription;
4241
import software.amazon.awssdk.checksums.DefaultChecksumAlgorithm;
4342
import software.amazon.awssdk.core.async.AsyncRequestBody;
44-
import software.amazon.awssdk.core.checksums.Algorithm;
4543
import software.amazon.awssdk.core.internal.util.Mimetype;
4644
import software.amazon.awssdk.http.async.SimpleSubscriber;
4745
import software.amazon.awssdk.utils.BinaryUtils;
@@ -74,36 +72,36 @@ public class ChecksumCalculatingAsyncRequestBodyTest {
7472
}
7573
}
7674

77-
private static Stream<Arguments> publishers() {
75+
private static Stream<TestCase> contentPublishers() {
7876
return Stream.of(
79-
Arguments.of("RequestBody from string, test string",
80-
checksumPublisher(AsyncRequestBody.fromString(testString)),
81-
expectedTestString),
82-
Arguments.of("RequestBody from file, test string",
83-
checksumPublisher(AsyncRequestBody.fromFile(path)),
84-
expectedTestString),
85-
Arguments.of("RequestBody from buffer, 0 pos, test string",
86-
checksumPublisher(AsyncRequestBody.fromRemainingByteBuffer(posZeroByteBuffer(testString))),
87-
expectedTestString),
88-
Arguments.of("RequestBody from buffer, random pos, test string",
89-
checksumPublisher(AsyncRequestBody.fromRemainingByteBufferUnsafe(nonPosZeroByteBuffer(testString))),
90-
expectedTestString),
91-
Arguments.of("RequestBody from string, empty string",
92-
checksumPublisher(AsyncRequestBody.fromString(emptyString)),
93-
expectedEmptyString),
77+
new TestCase().description("RequestBody from string, test string")
78+
.requestBody(AsyncRequestBody.fromString(testString))
79+
.expectedBody(expectedTestString),
80+
new TestCase().description("RequestBody from file, test string")
81+
.requestBody(AsyncRequestBody.fromFile(path))
82+
.expectedBody(expectedTestString),
83+
new TestCase().description("RequestBody from buffer, 0 pos, test string")
84+
.requestBody(AsyncRequestBody.fromRemainingByteBuffer(posZeroByteBuffer(testString)))
85+
.expectedBody(expectedTestString),
86+
new TestCase().description("RequestBody from buffer, random pos, test string")
87+
.requestBody(AsyncRequestBody.fromRemainingByteBufferUnsafe(nonPosZeroByteBuffer(testString)))
88+
.expectedBody(expectedTestString),
89+
new TestCase().description("RequestBody from string, empty string")
90+
.requestBody(AsyncRequestBody.fromString(emptyString))
91+
.expectedBody(expectedEmptyString),
9492
//Note: FileAsyncRequestBody with empty file does not call onNext, only onComplete()
95-
Arguments.of("RequestBody from file, empty string",
96-
checksumPublisher(AsyncRequestBody.fromFile(pathToEmpty)),
97-
expectedEmptyString),
98-
Arguments.of("RequestBody from buffer, 0 pos, empty string",
99-
checksumPublisher(AsyncRequestBody.fromRemainingByteBuffer(posZeroByteBuffer(emptyString))),
100-
expectedEmptyString),
101-
Arguments.of("RequestBody from string, random pos, empty string",
102-
checksumPublisher(AsyncRequestBody.fromRemainingByteBufferUnsafe(nonPosZeroByteBuffer(emptyString))),
103-
expectedEmptyString),
104-
Arguments.of("EmptyBufferPublisher, test string",
105-
checksumPublisher(new EmptyBufferPublisher(testString)),
106-
expectedTestString));
93+
new TestCase().description("RequestBody from file, empty string")
94+
.requestBody(AsyncRequestBody.fromFile(pathToEmpty))
95+
.expectedBody(expectedEmptyString),
96+
new TestCase().description("RequestBody from buffer, 0 pos, empty string")
97+
.requestBody(AsyncRequestBody.fromRemainingByteBuffer(posZeroByteBuffer(emptyString)))
98+
.expectedBody(expectedEmptyString),
99+
new TestCase().description("RequestBody from string, random pos, empty string")
100+
.requestBody(AsyncRequestBody.fromRemainingByteBufferUnsafe(nonPosZeroByteBuffer(emptyString)))
101+
.expectedBody(expectedEmptyString),
102+
new TestCase().description("EmptyBufferPublisher, test string")
103+
.requestBody(new EmptyBufferPublisher(testString))
104+
.expectedBody(expectedTestString));
107105
}
108106

109107
private static ChecksumCalculatingAsyncRequestBody checksumPublisher(AsyncRequestBody sourcePublisher) {
@@ -133,10 +131,8 @@ private static ByteBuffer nonPosZeroByteBuffer(String content) {
133131
}
134132

135133
@ParameterizedTest(name = "{index} {0}")
136-
@MethodSource("publishers")
137-
public void publish_differentAsyncRequestBodiesAndSources_produceCorrectData(String description,
138-
AsyncRequestBody provider,
139-
String expectedContent) throws InterruptedException {
134+
@MethodSource("contentPublishers")
135+
public void publish_differentAsyncRequestBodiesAndSources_produceCorrectData(TestCase tc) throws InterruptedException {
140136
StringBuilder sb = new StringBuilder();
141137
CountDownLatch done = new CountDownLatch(1);
142138

@@ -157,11 +153,14 @@ public void onComplete() {
157153
done.countDown();
158154
}
159155
};
156+
157+
AsyncRequestBody provider = checksumPublisher(tc.requestBody);
158+
160159
provider.subscribe(subscriber);
161160
done.await(10, TimeUnit.SECONDS);
162161

163-
assertThat(provider.contentLength()).hasValue((long) expectedContent.length());
164-
assertThat(sb).hasToString(expectedContent);
162+
assertThat(provider.contentLength()).hasValue((long) tc.expectedBody.length());
163+
assertThat(sb).hasToString(tc.expectedBody);
165164
}
166165

167166
@Test
@@ -281,6 +280,27 @@ public void fromBytes_byteArrayNotNullChecksumSupplied() {
281280
assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(expected);
282281
}
283282

283+
@ParameterizedTest(name = "{index} {0}")
284+
@MethodSource("contentPublishers")
285+
public void explicit0ContentLength_containsEmptyStringTrailingChecksum(TestCase tc) {
286+
ChecksumCalculatingAsyncRequestBody checksumBody =
287+
ChecksumCalculatingAsyncRequestBody.builder()
288+
.contentLengthHeader(0L)
289+
.trailerHeader("x-amz-checksum-crc32")
290+
.algorithm(DefaultChecksumAlgorithm.CRC32)
291+
.asyncRequestBody(tc.requestBody)
292+
.build();
293+
294+
StringBuilder sb = new StringBuilder();
295+
for (ByteBuffer byteBuffer : Flowable.fromPublisher(checksumBody).toList().blockingGet()) {
296+
sb.append(StandardCharsets.UTF_8.decode(byteBuffer));
297+
}
298+
299+
// Note: we ignore tc.expectedBody, since we expect the checksum to always be the empty body because of the 0 content
300+
// length.
301+
assertThat(sb.toString()).isEqualTo(expectedEmptyString);
302+
}
303+
284304
static class EmptyBufferPublisher implements AsyncRequestBody {
285305

286306
private final ByteBuffer[] buffers = new ByteBuffer[2];
@@ -316,4 +336,30 @@ public Optional<Long> contentLength() {
316336
return Optional.of((long) payload.length());
317337
}
318338
}
339+
340+
private static class TestCase {
341+
private String description;
342+
private AsyncRequestBody requestBody;
343+
private String expectedBody;
344+
345+
public TestCase description(String description) {
346+
this.description = description;
347+
return this;
348+
}
349+
350+
public TestCase requestBody(AsyncRequestBody requestBody) {
351+
this.requestBody = requestBody;
352+
return this;
353+
}
354+
355+
public TestCase expectedBody(String expectedBody) {
356+
this.expectedBody = expectedBody;
357+
return this;
358+
}
359+
360+
@Override
361+
public String toString() {
362+
return description;
363+
}
364+
}
319365
}

0 commit comments

Comments
 (0)