Skip to content

Commit 407d646

Browse files
authored
Add CrtS3TransferManager implementation of upload to support correct progress updates (#6341)
* Add crt implementation of upload to support progress * Refactors * Add changelog + tests * Added integ test
1 parent 4ceb5b2 commit 407d646

File tree

5 files changed

+106
-14
lines changed

5 files changed

+106
-14
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Fix bug in progress reporting in upload when using the CRT client."
6+
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.transfer.s3;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
1920
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
2021

2122
import java.io.IOException;
@@ -215,4 +216,31 @@ void upload_file_Interupted_CancelsTheListener(S3TransferManager tm) {
215216
assertThat(transferListener.getRatioTransferredList().get(transferListener.getRatioTransferredList().size() - 1))
216217
.isNotEqualTo(100.0);
217218
}
219+
220+
@ParameterizedTest
221+
@MethodSource("transferManagers")
222+
void upload_asyncRequestBody_ReportsProgressCorrectly(S3TransferManager tm) throws IOException {
223+
String content = RandomStringUtils.randomAscii(OBJ_SIZE);
224+
CaptureTransferListener transferListener = new CaptureTransferListener();
225+
226+
Upload upload =
227+
tm.upload(UploadRequest.builder()
228+
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
229+
.requestBody(AsyncRequestBody.fromString(content))
230+
.addTransferListener(LoggingTransferListener.create())
231+
.addTransferListener(transferListener)
232+
.build());
233+
234+
upload.completionFuture().join();
235+
ResponseInputStream<GetObjectResponse> obj = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
236+
ResponseTransformer.toInputStream());
237+
238+
assertThat(ChecksumUtils.computeCheckSum(content.getBytes(StandardCharsets.UTF_8)))
239+
.isEqualTo(ChecksumUtils.computeCheckSum(obj));
240+
241+
assertListenerForSuccessfulTransferComplete(transferListener);
242+
243+
// ensure intermediate progress is reported
244+
assertThat(transferListener.getRatioTransferredList()).hasSizeGreaterThan(2);
245+
}
218246
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,15 @@
3333
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
3434
import software.amazon.awssdk.transfer.s3.S3TransferManager;
3535
import software.amazon.awssdk.transfer.s3.internal.model.CrtFileUpload;
36+
import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload;
3637
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
3738
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
39+
import software.amazon.awssdk.transfer.s3.model.CompletedUpload;
3840
import software.amazon.awssdk.transfer.s3.model.FileUpload;
3941
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
42+
import software.amazon.awssdk.transfer.s3.model.Upload;
4043
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
44+
import software.amazon.awssdk.transfer.s3.model.UploadRequest;
4145
import software.amazon.awssdk.utils.CompletableFutureUtils;
4246
import software.amazon.awssdk.utils.Validate;
4347

@@ -54,6 +58,29 @@ class CrtS3TransferManager extends GenericS3TransferManager {
5458
this.s3AsyncClient = s3AsyncClient;
5559
}
5660

61+
@Override
62+
public final Upload upload(UploadRequest uploadRequest) {
63+
Validate.paramNotNull(uploadRequest, "uploadRequest");
64+
65+
AsyncRequestBody requestBody = uploadRequest.requestBody();
66+
67+
CompletableFuture<CompletedUpload> returnFuture = new CompletableFuture<>();
68+
69+
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest,
70+
requestBody.contentLength().orElse(null));
71+
progressUpdater.transferInitiated();
72+
progressUpdater.registerCompletion(returnFuture);
73+
74+
Consumer<SdkHttpExecutionAttributes.Builder> attachProgress =
75+
b -> b.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener());
76+
77+
PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadRequest.putObjectRequest(), attachProgress);
78+
79+
doUpload(putObjectRequest, requestBody, returnFuture);
80+
81+
return new DefaultUpload(returnFuture, progressUpdater.progress());
82+
}
83+
5784
@Override
5885
public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
5986
Validate.paramNotNull(uploadFileRequest, "uploadFileRequest");

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class GenericS3TransferManager implements S3TransferManager {
128128
}
129129

130130
@Override
131-
public final Upload upload(UploadRequest uploadRequest) {
131+
public Upload upload(UploadRequest uploadRequest) {
132132
Validate.paramNotNull(uploadRequest, "uploadRequest");
133133

134134
AsyncRequestBody requestBody = uploadRequest.requestBody();
@@ -148,24 +148,31 @@ public final Upload upload(UploadRequest uploadRequest) {
148148
putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener);
149149
}
150150

151+
doUpload(putObjectRequest, requestBody, returnFuture);
152+
153+
return new DefaultUpload(returnFuture, progressUpdater.progress());
154+
}
155+
156+
protected void doUpload(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody,
157+
CompletableFuture<CompletedUpload> returnFuture) {
151158
try {
152-
assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload");
159+
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");
153160

154161
CompletableFuture<PutObjectResponse> future =
155162
s3AsyncClient.putObject(putObjectRequest, requestBody);
156163

157164
// Forward upload cancellation to future
158165
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
159166

160-
CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture,
161-
r -> CompletedUpload.builder()
162-
.response(r)
163-
.build());
167+
CompletableFutureUtils.forwardTransformedResultTo(
168+
future,
169+
returnFuture,
170+
r -> CompletedUpload.builder()
171+
.response(r)
172+
.build());
164173
} catch (Throwable throwable) {
165174
returnFuture.completeExceptionally(throwable);
166175
}
167-
168-
return new DefaultUpload(returnFuture, progressUpdater.progress());
169176
}
170177

171178
/**

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.Mockito.verify;
2121
import static org.mockito.Mockito.when;
2222
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
23+
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
2324
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
2425

2526
import com.google.common.jimfs.Jimfs;
@@ -36,13 +37,16 @@
3637
import org.junit.jupiter.api.extension.ExtendWith;
3738
import org.mockito.ArgumentCaptor;
3839
import org.mockito.Mock;
40+
import org.mockito.exceptions.verification.WantedButNotInvoked;
3941
import org.mockito.junit.jupiter.MockitoExtension;
42+
import software.amazon.awssdk.core.async.AsyncRequestBody;
4043
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
4144
import software.amazon.awssdk.services.s3.S3AsyncClient;
4245
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
4346
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
4447
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
4548
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
49+
import software.amazon.awssdk.transfer.s3.model.UploadRequest;
4650

4751
@ExtendWith(MockitoExtension.class)
4852
public class CrtS3TransferManagerTest {
@@ -80,7 +84,7 @@ void uploadDirectory_shouldUseCrtUploadFile() {
8084
.completionFuture()
8185
.join();
8286

83-
verifyCrtInRequestAttributes();
87+
verifyCrtInRequestAttributes(true);
8488
}
8589

8690
@Test
@@ -93,18 +97,38 @@ void uploadFile_shouldUseCrtUploadFile() {
9397
.completionFuture()
9498
.join();
9599

96-
verifyCrtInRequestAttributes();
100+
verifyCrtInRequestAttributes(true);
97101
}
98102

99-
private void verifyCrtInRequestAttributes() {
100-
ArgumentCaptor<PutObjectRequest> requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
101103

102-
verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture());
104+
@Test
105+
void upload_shouldUseCrtUpload() {
106+
when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build()));
107+
transferManager.upload(UploadRequest.builder()
108+
.putObjectRequest(PutObjectRequest.builder().bucket("test").key("test").build())
109+
.requestBody(AsyncRequestBody.fromString("test"))
110+
.build())
111+
.completionFuture()
112+
.join();
113+
114+
verifyCrtInRequestAttributes(false);
115+
}
116+
117+
private void verifyCrtInRequestAttributes(boolean verifyObservable) {
118+
ArgumentCaptor<PutObjectRequest> requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
103119

120+
try {
121+
verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture());
122+
} catch (WantedButNotInvoked e) {
123+
verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(AsyncRequestBody.class).capture());
124+
}
104125
PutObjectRequest actual = requestArgumentCaptor.getValue();
105126
assertThat(actual.overrideConfiguration()).isPresent();
106127
SdkHttpExecutionAttributes attribute = actual.overrideConfiguration().get().executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES);
107128
assertThat(attribute).isNotNull();
108-
assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull();
129+
assertThat(attribute.getAttribute(CRT_PROGRESS_LISTENER)).isNotNull();
130+
if (verifyObservable) {
131+
assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull();
132+
}
109133
}
110134
}

0 commit comments

Comments
 (0)