Skip to content

Commit fd47cd6

Browse files
authored
Fix per async body buffer usage (#5939)
* Fix per async body buffer usage `bufferPerAsyncRequestBody` represents the read buffer used by each inflight `FileAsyncRequestBody`. This commit fixes the calculation so that we account for the user configured `bufferSizeInBytes` from the input `SplitConfiguration`. The issue is that `asyncRequestBody.chunkSizeInBytes()` may be greater than the desired `bufferSizeInBytes`. This also fixes the creation of the `FileAsyncRequestBody` for each chunk so that we configure `bufferPerAsyncRequestBody` for the `chunkSizeInBytes`. * Add changelog
1 parent cd0172d commit fd47cd6

File tree

3 files changed

+42
-1
lines changed

3 files changed

+42
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Fix an issue where `uploadFile` never completes for the Java based S3 TransferManager if the `apiCallBufferSizeInBytes` configured on the `MultipartConfiguration` is too small."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
2929
import software.amazon.awssdk.core.async.SdkPublisher;
3030
import software.amazon.awssdk.core.exception.SdkClientException;
31+
import software.amazon.awssdk.utils.NumericUtils;
3132
import software.amazon.awssdk.utils.Validate;
3233
import software.amazon.awssdk.utils.async.SimplePublisher;
3334

@@ -67,7 +68,8 @@ public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody,
6768
this.totalBufferSize = splitConfiguration.bufferSizeInBytes() == null ?
6869
AsyncRequestBodySplitConfiguration.defaultConfiguration().bufferSizeInBytes() :
6970
splitConfiguration.bufferSizeInBytes();
70-
this.bufferPerAsyncRequestBody = asyncRequestBody.chunkSizeInBytes();
71+
this.bufferPerAsyncRequestBody = Math.min(asyncRequestBody.chunkSizeInBytes(),
72+
NumericUtils.saturatedCast(totalBufferSize));
7173
}
7274

7375
public SdkPublisher<AsyncRequestBody> split() {
@@ -131,6 +133,7 @@ private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher<AsyncRequestBod
131133
.path(path)
132134
.position(position)
133135
.numBytesToRead(numBytesToReadForThisChunk)
136+
.chunkSizeInBytes(bufferPerAsyncRequestBody)
134137
.build();
135138
return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher);
136139
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,19 @@
2424
import java.util.HashMap;
2525
import java.util.Map;
2626
import java.util.concurrent.CancellationException;
27+
import java.util.concurrent.TimeUnit;
2728
import org.apache.commons.lang3.RandomStringUtils;
2829
import org.junit.jupiter.api.AfterAll;
2930
import org.junit.jupiter.api.BeforeAll;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.Timeout;
3033
import org.junit.jupiter.params.ParameterizedTest;
3134
import org.junit.jupiter.params.provider.MethodSource;
3235
import software.amazon.awssdk.core.ResponseInputStream;
3336
import software.amazon.awssdk.core.async.AsyncRequestBody;
3437
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3538
import software.amazon.awssdk.core.sync.ResponseTransformer;
39+
import software.amazon.awssdk.services.s3.S3AsyncClient;
3640
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
3741
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
3842
import software.amazon.awssdk.testutils.RandomTempFile;
@@ -93,6 +97,34 @@ void upload_file_SentCorrectly(S3TransferManager tm) throws IOException {
9397
assertListenerForSuccessfulTransferComplete(transferListener);
9498
}
9599

100+
// This is a test for an issue where the file upload hangs (no chunk
101+
// uploads are initiated) if apiCallBufferSizeInBytes is less than the
102+
// publisher chunk size.
103+
// Note: Only applicable to the Java based TM because file uploads are
104+
// done completely by CRT for the CRT based transfer manager, and does
105+
// not hit the same code path.
106+
@Test
107+
@Timeout(value = 10, unit = TimeUnit.MINUTES)
108+
public void uploadFile_apiBufferSizeLessThanFileAsyncPublisherChunkSize_sentCorrectly() {
109+
try (
110+
S3AsyncClient s3Async = s3AsyncClientBuilder()
111+
.multipartConfiguration(c -> c.apiCallBufferSizeInBytes(SizeConstant.KB))
112+
.build();
113+
114+
S3TransferManager tm = S3TransferManager.builder()
115+
.s3Client(s3Async)
116+
.build();
117+
) {
118+
FileUpload fileUpload =
119+
tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(TEST_BUCKET)
120+
.key(TEST_KEY))
121+
.source(testFile.toPath())
122+
.build());
123+
124+
fileUpload.completionFuture().join();
125+
}
126+
}
127+
96128
private static void assertListenerForSuccessfulTransferComplete(CaptureTransferListener transferListener) {
97129
assertThat(transferListener.isTransferInitiated()).isTrue();
98130
assertThat(transferListener.isTransferComplete()).isTrue();

0 commit comments

Comments
 (0)