Skip to content

Add CrtS3TransferManager implementation of upload to support correct progress updates #6341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-77ad713.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fix bug in progress reporting in upload when using the CRT client."
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.io.IOException;
Expand Down Expand Up @@ -215,4 +216,31 @@ void upload_file_Interupted_CancelsTheListener(S3TransferManager tm) {
assertThat(transferListener.getRatioTransferredList().get(transferListener.getRatioTransferredList().size() - 1))
.isNotEqualTo(100.0);
}

@ParameterizedTest
@MethodSource("transferManagers")
void upload_asyncRequestBody_ReportsProgressCorrectly(S3TransferManager tm) throws IOException {
String content = RandomStringUtils.randomAscii(OBJ_SIZE);
CaptureTransferListener transferListener = new CaptureTransferListener();

Upload upload =
tm.upload(UploadRequest.builder()
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
.requestBody(AsyncRequestBody.fromString(content))
.addTransferListener(LoggingTransferListener.create())
.addTransferListener(transferListener)
.build());

upload.completionFuture().join();
ResponseInputStream<GetObjectResponse> obj = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
ResponseTransformer.toInputStream());

assertThat(ChecksumUtils.computeCheckSum(content.getBytes(StandardCharsets.UTF_8)))
.isEqualTo(ChecksumUtils.computeCheckSum(obj));

assertListenerForSuccessfulTransferComplete(transferListener);

// ensure intermediate progress is reported
assertThat(transferListener.getRatioTransferredList()).hasSizeGreaterThan(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.internal.model.CrtFileUpload;
import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload;
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.CompletedUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
import software.amazon.awssdk.transfer.s3.model.Upload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Validate;

Expand All @@ -54,6 +58,29 @@ class CrtS3TransferManager extends GenericS3TransferManager {
this.s3AsyncClient = s3AsyncClient;
}

@Override
public final Upload upload(UploadRequest uploadRequest) {
Validate.paramNotNull(uploadRequest, "uploadRequest");

AsyncRequestBody requestBody = uploadRequest.requestBody();

CompletableFuture<CompletedUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest,
requestBody.contentLength().orElse(null));
progressUpdater.transferInitiated();
progressUpdater.registerCompletion(returnFuture);

Consumer<SdkHttpExecutionAttributes.Builder> attachProgress =
b -> b.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener());

PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadRequest.putObjectRequest(), attachProgress);

doUpload(putObjectRequest, requestBody, returnFuture);

return new DefaultUpload(returnFuture, progressUpdater.progress());
}

@Override
public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
Validate.paramNotNull(uploadFileRequest, "uploadFileRequest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class GenericS3TransferManager implements S3TransferManager {
}

@Override
public final Upload upload(UploadRequest uploadRequest) {
public Upload upload(UploadRequest uploadRequest) {
Validate.paramNotNull(uploadRequest, "uploadRequest");

AsyncRequestBody requestBody = uploadRequest.requestBody();
Expand All @@ -148,24 +148,31 @@ public final Upload upload(UploadRequest uploadRequest) {
putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener);
}

doUpload(putObjectRequest, requestBody, returnFuture);

return new DefaultUpload(returnFuture, progressUpdater.progress());
}

protected void doUpload(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody,
CompletableFuture<CompletedUpload> returnFuture) {
try {
assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload");
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");

CompletableFuture<PutObjectResponse> future =
s3AsyncClient.putObject(putObjectRequest, requestBody);

// Forward upload cancellation to future
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);

CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture,
r -> CompletedUpload.builder()
.response(r)
.build());
CompletableFutureUtils.forwardTransformedResultTo(
future,
returnFuture,
r -> CompletedUpload.builder()
.response(r)
.build());
} catch (Throwable throwable) {
returnFuture.completeExceptionally(throwable);
}

return new DefaultUpload(returnFuture, progressUpdater.progress());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;

import com.google.common.jimfs.Jimfs;
Expand All @@ -36,13 +37,16 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.exceptions.verification.WantedButNotInvoked;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;

@ExtendWith(MockitoExtension.class)
public class CrtS3TransferManagerTest {
Expand Down Expand Up @@ -80,7 +84,7 @@ void uploadDirectory_shouldUseCrtUploadFile() {
.completionFuture()
.join();

verifyCrtInRequestAttributes();
verifyCrtInRequestAttributes(true);
}

@Test
Expand All @@ -93,18 +97,38 @@ void uploadFile_shouldUseCrtUploadFile() {
.completionFuture()
.join();

verifyCrtInRequestAttributes();
verifyCrtInRequestAttributes(true);
}

private void verifyCrtInRequestAttributes() {
ArgumentCaptor<PutObjectRequest> requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);

verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture());
@Test
void upload_shouldUseCrtUpload() {
when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build()));
transferManager.upload(UploadRequest.builder()
.putObjectRequest(PutObjectRequest.builder().bucket("test").key("test").build())
.requestBody(AsyncRequestBody.fromString("test"))
.build())
.completionFuture()
.join();

verifyCrtInRequestAttributes(false);
}

private void verifyCrtInRequestAttributes(boolean verifyObservable) {
ArgumentCaptor<PutObjectRequest> requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);

try {
verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture());
} catch (WantedButNotInvoked e) {
verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(AsyncRequestBody.class).capture());
}
PutObjectRequest actual = requestArgumentCaptor.getValue();
assertThat(actual.overrideConfiguration()).isPresent();
SdkHttpExecutionAttributes attribute = actual.overrideConfiguration().get().executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES);
assertThat(attribute).isNotNull();
assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull();
assertThat(attribute.getAttribute(CRT_PROGRESS_LISTENER)).isNotNull();
if (verifyObservable) {
assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull();
}
}
}
Loading