diff --git a/.changes/next-release/bugfix-S3TransferManager-77ad713.json b/.changes/next-release/bugfix-S3TransferManager-77ad713.json new file mode 100644 index 00000000000..5998a5d3e1f --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-77ad713.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Fix bug in progress reporting in upload when using the CRT client." +} diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java index eb8102d3a47..c2d2f88f662 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.transfer.s3; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.IOException; @@ -215,4 +216,31 @@ void upload_file_Interupted_CancelsTheListener(S3TransferManager tm) { assertThat(transferListener.getRatioTransferredList().get(transferListener.getRatioTransferredList().size() - 1)) .isNotEqualTo(100.0); } + + @ParameterizedTest + @MethodSource("transferManagers") + void upload_asyncRequestBody_ReportsProgressCorrectly(S3TransferManager tm) throws IOException { + String content = RandomStringUtils.randomAscii(OBJ_SIZE); + CaptureTransferListener transferListener = new CaptureTransferListener(); + + Upload upload = + tm.upload(UploadRequest.builder() + .putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY)) + .requestBody(AsyncRequestBody.fromString(content)) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .build()); + + upload.completionFuture().join(); + ResponseInputStream obj = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream()); + + assertThat(ChecksumUtils.computeCheckSum(content.getBytes(StandardCharsets.UTF_8))) + .isEqualTo(ChecksumUtils.computeCheckSum(obj)); + + assertListenerForSuccessfulTransferComplete(transferListener); + + // ensure intermediate progress is reported + assertThat(transferListener.getRatioTransferredList()).hasSizeGreaterThan(2); + } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java index d9dded426e3..d099a41398e 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java @@ -33,11 +33,15 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.internal.model.CrtFileUpload; +import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload; import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater; import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; +import software.amazon.awssdk.transfer.s3.model.CompletedUpload; import software.amazon.awssdk.transfer.s3.model.FileUpload; import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload; +import software.amazon.awssdk.transfer.s3.model.Upload; import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import software.amazon.awssdk.transfer.s3.model.UploadRequest; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Validate; @@ -54,6 +58,29 @@ class CrtS3TransferManager extends GenericS3TransferManager { this.s3AsyncClient = s3AsyncClient; } + @Override + public final Upload upload(UploadRequest uploadRequest) { + Validate.paramNotNull(uploadRequest, "uploadRequest"); + + AsyncRequestBody requestBody = uploadRequest.requestBody(); + + CompletableFuture returnFuture = new CompletableFuture<>(); + + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, + requestBody.contentLength().orElse(null)); + progressUpdater.transferInitiated(); + progressUpdater.registerCompletion(returnFuture); + + Consumer attachProgress = + b -> b.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener()); + + PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadRequest.putObjectRequest(), attachProgress); + + doUpload(putObjectRequest, requestBody, returnFuture); + + return new DefaultUpload(returnFuture, progressUpdater.progress()); + } + @Override public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { Validate.paramNotNull(uploadFileRequest, "uploadFileRequest"); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java index 29bcdc34d93..de7ffc6ca94 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java @@ -128,7 +128,7 @@ class GenericS3TransferManager implements S3TransferManager { } @Override - public final Upload upload(UploadRequest uploadRequest) { + public Upload upload(UploadRequest uploadRequest) { Validate.paramNotNull(uploadRequest, "uploadRequest"); AsyncRequestBody requestBody = uploadRequest.requestBody(); @@ -148,8 +148,15 @@ public final Upload upload(UploadRequest uploadRequest) { putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener); } + doUpload(putObjectRequest, requestBody, returnFuture); + + return new DefaultUpload(returnFuture, progressUpdater.progress()); + } + + protected void doUpload(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody, + CompletableFuture returnFuture) { try { - assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload"); + assertNotUnsupportedArn(putObjectRequest.bucket(), "upload"); CompletableFuture future = s3AsyncClient.putObject(putObjectRequest, requestBody); @@ -157,15 +164,15 @@ public final Upload upload(UploadRequest uploadRequest) { // Forward upload cancellation to future CompletableFutureUtils.forwardExceptionTo(returnFuture, future); - CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture, - r -> CompletedUpload.builder() - .response(r) - .build()); + CompletableFutureUtils.forwardTransformedResultTo( + future, + returnFuture, + r -> CompletedUpload.builder() + .response(r) + .build()); } catch (Throwable throwable) { returnFuture.completeExceptionally(throwable); } - - return new DefaultUpload(returnFuture, progressUpdater.progress()); } /** diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java index b3628d18ce2..454cd2b7d98 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES; +import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; import com.google.common.jimfs.Jimfs; @@ -36,13 +37,16 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.exceptions.verification.WantedButNotInvoked; import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.http.SdkHttpExecutionAttributes; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import software.amazon.awssdk.transfer.s3.model.UploadRequest; @ExtendWith(MockitoExtension.class) public class CrtS3TransferManagerTest { @@ -80,7 +84,7 @@ void uploadDirectory_shouldUseCrtUploadFile() { .completionFuture() .join(); - verifyCrtInRequestAttributes(); + verifyCrtInRequestAttributes(true); } @Test @@ -93,18 +97,38 @@ void uploadFile_shouldUseCrtUploadFile() { .completionFuture() .join(); - verifyCrtInRequestAttributes(); + verifyCrtInRequestAttributes(true); } - private void verifyCrtInRequestAttributes() { - ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); - verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture()); + @Test + void upload_shouldUseCrtUpload() { + when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build())); + transferManager.upload(UploadRequest.builder() + .putObjectRequest(PutObjectRequest.builder().bucket("test").key("test").build()) + .requestBody(AsyncRequestBody.fromString("test")) + .build()) + .completionFuture() + .join(); + + verifyCrtInRequestAttributes(false); + } + + private void verifyCrtInRequestAttributes(boolean verifyObservable) { + ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + try { + verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture()); + } catch (WantedButNotInvoked e) { + verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(AsyncRequestBody.class).capture()); + } PutObjectRequest actual = requestArgumentCaptor.getValue(); assertThat(actual.overrideConfiguration()).isPresent(); SdkHttpExecutionAttributes attribute = actual.overrideConfiguration().get().executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES); assertThat(attribute).isNotNull(); - assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull(); + assertThat(attribute.getAttribute(CRT_PROGRESS_LISTENER)).isNotNull(); + if (verifyObservable) { + assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull(); + } } }