Skip to content

Commit b2841e9

Browse files
authored
Enable transfer listener for Java-based TransferManager multipart upload (#4951)
* Enable transfer listener for Java-based TransferManager multipart upload * Add Java Progress Listener and refactor * Refactoring * Fix merge * Address comments * Address comments * Add changelog entry * Make NoOpPublisherListener static * Remove static modifier
1 parent ede03d1 commit b2841e9

File tree

19 files changed

+348
-54
lines changed

19 files changed

+348
-54
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Amazon S3",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Enable TransferListener when uploading with TransferManager with Java-based S3Client with multipart enabled"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/PublisherListener.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ static <T> SdkPublisher<T> wrap(SdkPublisher<T> delegate, PublisherListener<T> l
4545
return new NotifyingPublisher<>(delegate, listener);
4646
}
4747

48+
static NoOpPublisherListener noOp() {
49+
return NoOpPublisherListener.getInstance();
50+
}
51+
4852
@SdkInternalApi
4953
final class NotifyingPublisher<T> implements SdkPublisher<T> {
5054
private static final Logger log = Logger.loggerFor(NotifyingPublisher.class);
@@ -72,4 +76,17 @@ static void invoke(Runnable runnable, String callbackName) {
7276
}
7377
}
7478
}
79+
80+
@SdkInternalApi
81+
final class NoOpPublisherListener implements PublisherListener<Long> {
82+
83+
private static final NoOpPublisherListener NO_OP_PUBLISHER_LISTENER = new NoOpPublisherListener();
84+
85+
private NoOpPublisherListener() {
86+
}
87+
88+
static NoOpPublisherListener getInstance() {
89+
return NO_OP_PUBLISHER_LISTENER;
90+
}
91+
}
7592
}

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,8 @@
659659
<includeModule>iam-policy-builder</includeModule>
660660

661661
<!-- Service modules that are heavily customized should be included -->
662-
<includeModule>s3</includeModule>
662+
<!-- disable s3 temporarily , flags renaming of S3PauseResumeExecutionAttribute -->
663+
<!-- <includeModule>s3</includeModule> -->
663664
<includeModule>s3-control</includeModule>
664665
<includeModule>sqs</includeModule>
665666
<includeModule>rds</includeModule>

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3IntegrationTestBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ public static void setUpForAllIntegTests() throws Exception {
6868
Log.initLoggingToStdout(Log.LogLevel.Warn);
6969
System.setProperty("aws.crt.debugnative", "true");
7070
s3 = s3ClientBuilder().build();
71-
// TODO - enable multipart once TransferListener fixed for MultipartClient
72-
s3Async = s3AsyncClientBuilder().build();
71+
s3Async = s3AsyncClientBuilder()
72+
.multipartEnabled(true)
73+
.build();
7374
s3CrtAsync = S3CrtAsyncClient.builder()
7475
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
7576
.region(DEFAULT_REGION)

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import java.nio.file.Files;
2424
import java.util.HashMap;
2525
import java.util.Map;
26-
import java.util.UUID;
2726
import java.util.concurrent.CancellationException;
2827
import java.util.stream.Stream;
28+
import org.apache.commons.lang3.RandomStringUtils;
2929
import org.junit.jupiter.api.AfterAll;
3030
import org.junit.jupiter.api.BeforeAll;
3131
import org.junit.jupiter.params.ParameterizedTest;
@@ -97,22 +97,22 @@ void upload_file_SentCorrectly(S3TransferManager transferManager) throws IOExcep
9797
assertThat(obj.response().responseMetadata().requestId()).isNotNull();
9898
assertThat(obj.response().metadata()).containsEntry("foobar", "FOO BAR");
9999
assertThat(fileUpload.progress().snapshot().sdkResponse()).isPresent();
100-
assertListenerForSuccessfulTransferComplete(transferListener);
100+
assertListenerForSuccessfulTransferComplete(transferListener);
101101
}
102102

103103
private static void assertListenerForSuccessfulTransferComplete(CaptureTransferListener transferListener) {
104104
assertThat(transferListener.isTransferInitiated()).isTrue();
105105
assertThat(transferListener.isTransferComplete()).isTrue();
106106
assertThat(transferListener.getRatioTransferredList()).isNotEmpty();
107-
assertThat(transferListener.getRatioTransferredList().contains(0.0));
108-
assertThat(transferListener.getRatioTransferredList().contains(100.0));
107+
assertThat(transferListener.getRatioTransferredList()).contains(0.0);
108+
assertThat(transferListener.getRatioTransferredList()).contains(1.0);
109109
assertThat(transferListener.getExceptionCaught()).isNull();
110110
}
111111

112112
@ParameterizedTest
113113
@MethodSource("transferManagers")
114114
void upload_asyncRequestBodyFromString_SentCorrectly(S3TransferManager transferManager) throws IOException {
115-
String content = UUID.randomUUID().toString();
115+
String content = RandomStringUtils.randomAscii(OBJ_SIZE);
116116
CaptureTransferListener transferListener = new CaptureTransferListener();
117117

118118
Upload upload =

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import software.amazon.awssdk.core.waiters.AsyncWaiter;
3636
import software.amazon.awssdk.core.waiters.Waiter;
3737
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
38-
import software.amazon.awssdk.services.s3.S3AsyncClient;
3938
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
4039
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
4140
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
@@ -57,19 +56,12 @@ public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3Integra
5756
private static File smallFile;
5857
private static ScheduledExecutorService executorService;
5958

60-
// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
61-
protected static S3TransferManager tmJavaMpu;
62-
6359
@BeforeAll
6460
public static void setup() throws Exception {
6561
createBucket(BUCKET);
6662
largeFile = new RandomTempFile(LARGE_OBJ_SIZE);
6763
smallFile = new RandomTempFile(SMALL_OBJ_SIZE);
6864
executorService = Executors.newScheduledThreadPool(3);
69-
70-
// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
71-
S3AsyncClient s3AsyncMpu = s3AsyncClientBuilder().multipartEnabled(true).build();
72-
tmJavaMpu = S3TransferManager.builder().s3Client(s3AsyncMpu).build();
7365
}
7466

7567
@AfterAll
@@ -82,10 +74,10 @@ public static void cleanup() {
8274

8375
private static Stream<Arguments> transferManagers() {
8476
return Stream.of(
85-
Arguments.of(tmJavaMpu, tmJavaMpu),
77+
Arguments.of(tmJava, tmJava),
8678
Arguments.of(tmCrt, tmCrt),
87-
Arguments.of(tmCrt, tmJavaMpu),
88-
Arguments.of(tmJavaMpu, tmCrt)
79+
Arguments.of(tmCrt, tmJava),
80+
Arguments.of(tmJava, tmCrt)
8981
);
9082
}
9183

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515

1616
package software.amazon.awssdk.transfer.s3.internal;
1717

18-
import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.PAUSE_OBSERVABLE;
19-
import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.RESUME_TOKEN;
18+
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
19+
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.PAUSE_OBSERVABLE;
20+
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.RESUME_TOKEN;
2021
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
2122
import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer;
2223

@@ -136,11 +137,18 @@ public Upload upload(UploadRequest uploadRequest) {
136137
requestBody = progressUpdater.wrapRequestBody(requestBody);
137138
progressUpdater.registerCompletion(returnFuture);
138139

140+
PutObjectRequest putObjectRequest = uploadRequest.putObjectRequest();
141+
if (isS3ClientMultipartEnabled()) {
142+
Consumer<AwsRequestOverrideConfiguration.Builder> attachProgressListener =
143+
b -> b.putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener());
144+
putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener);
145+
}
146+
139147
try {
140148
assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload");
141149

142150
CompletableFuture<PutObjectResponse> future =
143-
s3AsyncClient.putObject(uploadRequest.putObjectRequest(), requestBody);
151+
s3AsyncClient.putObject(putObjectRequest, requestBody);
144152

145153
// Forward upload cancellation to future
146154
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
@@ -166,24 +174,26 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
166174
.chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE)
167175
.build();
168176

177+
CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();
178+
179+
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest,
180+
requestBody.contentLength().orElse(null));
181+
progressUpdater.transferInitiated();
182+
requestBody = progressUpdater.wrapRequestBody(requestBody);
183+
progressUpdater.registerCompletion(returnFuture);
184+
169185
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
170186
PauseObservable pauseObservable;
171187
if (isS3ClientMultipartEnabled()) {
172188
pauseObservable = new PauseObservable();
173-
Consumer<AwsRequestOverrideConfiguration.Builder> attachPauseObservable =
174-
b -> b.putExecutionAttribute(PAUSE_OBSERVABLE, pauseObservable);
175-
putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachPauseObservable);
189+
Consumer<AwsRequestOverrideConfiguration.Builder> attachObservableAndListener =
190+
b -> b.putExecutionAttribute(PAUSE_OBSERVABLE, pauseObservable)
191+
.putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener());
192+
putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservableAndListener);
176193
} else {
177194
pauseObservable = null;
178195
}
179196

180-
CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();
181-
182-
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest,
183-
requestBody.contentLength().orElse(null));
184-
progressUpdater.transferInitiated();
185-
requestBody = progressUpdater.wrapRequestBody(requestBody);
186-
progressUpdater.registerCompletion(returnFuture);
187197

188198
try {
189199
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,34 @@ private void endOfStreamFutureCompleted() {
111111
});
112112
}
113113

114+
/**
115+
* Progress listener for Java-based S3Client with multipart enabled.
116+
*/
117+
public PublisherListener<Long> multipartClientProgressListener() {
118+
119+
return new PublisherListener<Long>() {
120+
@Override
121+
public void publisherSubscribe(Subscriber<? super Long> subscriber) {
122+
resetBytesTransferred();
123+
}
124+
125+
@Override
126+
public void subscriberOnNext(Long contentLength) {
127+
incrementBytesTransferred(contentLength);
128+
}
129+
130+
@Override
131+
public void subscriberOnError(Throwable t) {
132+
transferFailed(t);
133+
}
134+
135+
@Override
136+
public void subscriberOnComplete() {
137+
endOfStreamFuture.complete(null);
138+
}
139+
};
140+
}
141+
114142
public PublisherListener<S3MetaRequestProgress> crtProgressListener() {
115143

116144
return new PublisherListener<S3MetaRequestProgress>() {

0 commit comments

Comments
 (0)