Skip to content

Commit 2b11a2b

Browse files
authored
Implement TransferListener for COPY for TransferManager with Multipar… (#5059)
* Implement TransferListener for COPY for TransferManager with MultipartS3AsyncClient * Uncomment test cases * Add clarifying comments
1 parent fdce6e1 commit 2b11a2b

File tree

5 files changed

+252
-14
lines changed

5 files changed

+252
-14
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Amazon S3",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Implement TransferListener for copy operations when using TransferManager with Java-based S3Client with multipart enabled"
6+
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerCopyIntegrationTest.java

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
import static software.amazon.awssdk.transfer.s3.util.ChecksumUtils.computeCheckSum;
2222

2323
import java.util.concurrent.ThreadLocalRandom;
24+
import java.util.stream.Stream;
2425
import org.junit.jupiter.api.AfterAll;
2526
import org.junit.jupiter.api.BeforeAll;
26-
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.params.ParameterizedTest;
28+
import org.junit.jupiter.params.provider.Arguments;
29+
import org.junit.jupiter.params.provider.MethodSource;
2730
import software.amazon.awssdk.core.ResponseBytes;
2831
import software.amazon.awssdk.core.sync.RequestBody;
2932
import software.amazon.awssdk.core.sync.ResponseTransformer;
@@ -50,19 +53,34 @@ public static void teardown() throws Exception {
5053
deleteBucketAndAllContents(BUCKET);
5154
}
5255

53-
@Test
54-
void copy_copiedObject_hasSameContent() {
56+
enum TmType{
57+
JAVA, CRT
58+
}
59+
60+
private static Stream<Arguments> transferManagers() {
61+
return Stream.of(
62+
Arguments.of(TmType.JAVA),
63+
Arguments.of(TmType.CRT)
64+
);
65+
}
66+
67+
@ParameterizedTest
68+
@MethodSource("transferManagers")
69+
void copy_copiedObject_hasSameContent(TmType tmType) throws Exception {
70+
CaptureTransferListener transferListener = new CaptureTransferListener();
5571
byte[] originalContent = randomBytes(OBJ_SIZE);
5672
createOriginalObject(originalContent, ORIGINAL_OBJ);
57-
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
73+
copyObject(ORIGINAL_OBJ, COPIED_OBJ, transferListener, tmType);
5874
validateCopiedObject(originalContent, ORIGINAL_OBJ);
5975
}
6076

61-
@Test
62-
void copy_specialCharacters_hasSameContent() {
77+
@ParameterizedTest
78+
@MethodSource("transferManagers")
79+
void copy_specialCharacters_hasSameContent(TmType tmType) throws Exception {
80+
CaptureTransferListener transferListener = new CaptureTransferListener();
6381
byte[] originalContent = randomBytes(OBJ_SIZE);
6482
createOriginalObject(originalContent, ORIGINAL_OBJ_SPECIAL_CHARACTER);
65-
copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER);
83+
copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER, transferListener, tmType);
6684
validateCopiedObject(originalContent, COPIED_OBJ_SPECIAL_CHARACTER);
6785
}
6886

@@ -72,18 +90,34 @@ private void createOriginalObject(byte[] originalContent, String originalKey) {
7290
RequestBody.fromBytes(originalContent));
7391
}
7492

75-
private void copyObject(String original, String destination) {
76-
Copy copy = tmCrt.copy(c -> c
93+
private void copyObject(String original, String destination, CaptureTransferListener transferListener, TmType tmType) throws Exception {
94+
S3TransferManager tm = tmType == TmType.JAVA ? tmJava : tmCrt;
95+
Copy copy = tm.copy(c -> c
7796
.copyObjectRequest(r -> r
7897
.sourceBucket(BUCKET)
7998
.sourceKey(original)
8099
.destinationBucket(BUCKET)
81100
.destinationKey(destination))
82-
.addTransferListener(LoggingTransferListener.create()));
101+
.addTransferListener(LoggingTransferListener.create())
102+
.addTransferListener(transferListener));
83103

84104
CompletedCopy completedCopy = copy.completionFuture().join();
85105
assertThat(completedCopy.response().responseMetadata().requestId()).isNotNull();
86106
assertThat(completedCopy.response().sdkHttpResponse()).isNotNull();
107+
108+
if (tmType == TmType.JAVA) {
109+
Thread.sleep(500);
110+
assertListenerForSuccessfulTransferComplete(transferListener);
111+
}
112+
}
113+
114+
private static void assertListenerForSuccessfulTransferComplete(CaptureTransferListener transferListener) {
115+
assertThat(transferListener.isTransferInitiated()).isTrue();
116+
assertThat(transferListener.isTransferComplete()).isTrue();
117+
assertThat(transferListener.getRatioTransferredList()).isNotEmpty();
118+
assertThat(transferListener.getRatioTransferredList()).contains(0.0);
119+
assertThat(transferListener.getRatioTransferredList()).contains(1.0);
120+
assertThat(transferListener.getExceptionCaught()).isNull();
87121
}
88122

89123
private void validateCopiedObject(byte[] originalContent, String originalKey) {

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.awssdk.services.s3.internal.resource.S3AccessPointResource;
4141
import software.amazon.awssdk.services.s3.internal.resource.S3ArnConverter;
4242
import software.amazon.awssdk.services.s3.internal.resource.S3Resource;
43+
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
4344
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
4445
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
4546
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
@@ -286,6 +287,20 @@ private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
286287
.build();
287288
}
288289

290+
private CopyObjectRequest attachSdkAttribute(CopyObjectRequest copyObjectRequest,
291+
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
292+
AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
293+
copyObjectRequest.overrideConfiguration()
294+
.map(o -> o.toBuilder().applyMutation(builderMutation).build())
295+
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
296+
.applyMutation(builderMutation)
297+
.build());
298+
299+
return copyObjectRequest.toBuilder()
300+
.overrideConfiguration(modifiedRequestOverrideConfig)
301+
.build();
302+
}
303+
289304
@Override
290305
public final DirectoryUpload uploadDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
291306
Validate.paramNotNull(uploadDirectoryRequest, "uploadDirectoryRequest");
@@ -457,9 +472,20 @@ public final Copy copy(CopyRequest copyRequest) {
457472

458473
CompletableFuture<CompletedCopy> returnFuture = new CompletableFuture<>();
459474

460-
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(copyRequest, null);
461-
progressUpdater.transferInitiated();
462-
progressUpdater.registerCompletion(returnFuture);
475+
// set length to 10000 as reference value, since we don't make HeadObject call yet
476+
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(copyRequest, 10000L);
477+
478+
// TransferListener is not supported for CRT-based client, so we'll only initiate and register completion when using
479+
// the Java-based client with multipart enabled
480+
if (isS3ClientMultipartEnabled()) {
481+
Consumer<AwsRequestOverrideConfiguration.Builder> attachProgressListener =
482+
b -> b.putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener());
483+
CopyObjectRequest copyObjectRequest = attachSdkAttribute(copyRequest.copyObjectRequest(), attachProgressListener);
484+
copyRequest = copyRequest.toBuilder().copyObjectRequest(copyObjectRequest).build();
485+
486+
progressUpdater.transferInitiated();
487+
progressUpdater.registerCompletion(returnFuture);
488+
}
463489

464490
try {
465491
assertNotUnsupportedArn(copyRequest.copyObjectRequest().sourceBucket(), "copy sourceBucket");

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3JavaMultipartTransferProgressListenerTest.java

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
1919
import static com.github.tomakehurst.wiremock.client.WireMock.any;
2020
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
21+
import static com.github.tomakehurst.wiremock.client.WireMock.head;
2122
import static com.github.tomakehurst.wiremock.client.WireMock.post;
23+
import static com.github.tomakehurst.wiremock.client.WireMock.put;
2224
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
2325
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
2426
import static org.assertj.core.api.Assertions.assertThat;
@@ -33,6 +35,7 @@
3335
import java.util.concurrent.CancellationException;
3436
import java.util.concurrent.CompletionException;
3537
import org.junit.jupiter.api.BeforeAll;
38+
import org.junit.jupiter.api.Test;
3639
import org.junit.jupiter.params.ParameterizedTest;
3740
import org.junit.jupiter.params.provider.ValueSource;
3841
import org.mockito.ArgumentMatchers;
@@ -42,10 +45,12 @@
4245
import software.amazon.awssdk.regions.Region;
4346
import software.amazon.awssdk.services.s3.S3AsyncClient;
4447
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
48+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
4549
import software.amazon.awssdk.services.s3.model.S3Exception;
4650
import software.amazon.awssdk.testutils.RandomTempFile;
4751
import software.amazon.awssdk.transfer.s3.CaptureTransferListener;
4852
import software.amazon.awssdk.transfer.s3.S3TransferManager;
53+
import software.amazon.awssdk.transfer.s3.model.Copy;
4954
import software.amazon.awssdk.transfer.s3.model.FileUpload;
5055
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
5156
import software.amazon.awssdk.transfer.s3.progress.TransferListener;
@@ -173,7 +178,6 @@ void listeners_reports_ErrorsWhenCancelled(boolean multipartEnabled) throws Inte
173178
assertThat(transferListener.isTransferComplete()).isFalse();
174179
assertThat(transferListener.isTransferInitiated()).isTrue();
175180
assertMockOnFailure(transferListenerMock);
176-
177181
}
178182

179183
@ParameterizedTest
@@ -209,4 +213,141 @@ void listeners_reports_ProgressWhenSuccess(boolean multipartEnabled) throws Inte
209213
int numTimesBytesTransferred = multipartEnabled ? 2 : 1;
210214
Mockito.verify(transferListenerMock, times(numTimesBytesTransferred)).bytesTransferred(ArgumentMatchers.any());
211215
}
216+
217+
@Test
218+
void copyWithJavaBasedClient_listeners_reports_ErrorsWithValidPayload() throws InterruptedException {
219+
S3AsyncClient s3Async = s3AsyncClient(true);
220+
221+
TransferListener transferListenerMock = mock(TransferListener.class);
222+
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(404).withBody(ERROR_BODY)));
223+
S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
224+
mock(TransferManagerConfiguration.class),
225+
mock(DownloadDirectoryHelper.class));
226+
CaptureTransferListener transferListener = new CaptureTransferListener();
227+
228+
Copy copy =
229+
tm.copy(u -> u.copyObjectRequest(p -> p
230+
.sourceBucket(EXAMPLE_BUCKET)
231+
.sourceKey(TEST_KEY)
232+
.destinationBucket(EXAMPLE_BUCKET)
233+
.destinationKey("copiedObj"))
234+
.addTransferListener(LoggingTransferListener.create())
235+
.addTransferListener(transferListener)
236+
.addTransferListener(transferListenerMock)
237+
.build());
238+
239+
assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> copy.completionFuture().join());
240+
Thread.sleep(500);
241+
assertThat(transferListener.getExceptionCaught()).isInstanceOf(NoSuchKeyException.class);
242+
assertThat(transferListener.isTransferComplete()).isFalse();
243+
assertThat(transferListener.isTransferInitiated()).isTrue();
244+
assertMockOnFailure(transferListenerMock);
245+
}
246+
247+
@Test
248+
void copyWithJavaBasedClient_listeners_reports_ErrorsWithValidInValidPayload() throws InterruptedException {
249+
S3AsyncClient s3Async = s3AsyncClient(true);
250+
251+
TransferListener transferListenerMock = mock(TransferListener.class);
252+
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(404).withBody("?")));
253+
S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
254+
mock(TransferManagerConfiguration.class),
255+
mock(DownloadDirectoryHelper.class));
256+
CaptureTransferListener transferListener = new CaptureTransferListener();
257+
258+
Copy copy =
259+
tm.copy(u -> u.copyObjectRequest(p -> p
260+
.sourceBucket(EXAMPLE_BUCKET)
261+
.sourceKey(TEST_KEY)
262+
.destinationBucket(EXAMPLE_BUCKET)
263+
.destinationKey("copiedObj"))
264+
.addTransferListener(LoggingTransferListener.create())
265+
.addTransferListener(transferListener)
266+
.addTransferListener(transferListenerMock)
267+
.build());
268+
269+
assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> copy.completionFuture().join());
270+
Thread.sleep(500);
271+
assertThat(transferListener.getExceptionCaught()).isInstanceOf(S3Exception.class);
272+
assertThat(transferListener.isTransferComplete()).isFalse();
273+
assertThat(transferListener.isTransferInitiated()).isTrue();
274+
assertMockOnFailure(transferListenerMock);
275+
}
276+
277+
@Test
278+
void copyWithJavaBasedClient_listeners_reports_ErrorsWhenCancelled() throws InterruptedException {
279+
S3AsyncClient s3Async = s3AsyncClient(true);
280+
281+
TransferListener transferListenerMock = mock(TransferListener.class);
282+
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody("{}")));
283+
S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
284+
mock(TransferManagerConfiguration.class),
285+
mock(DownloadDirectoryHelper.class));
286+
CaptureTransferListener transferListener = new CaptureTransferListener();
287+
288+
tm.copy(u -> u.copyObjectRequest(p -> p
289+
.sourceBucket(EXAMPLE_BUCKET)
290+
.sourceKey(TEST_KEY)
291+
.destinationBucket(EXAMPLE_BUCKET)
292+
.destinationKey("copiedObj"))
293+
.addTransferListener(LoggingTransferListener.create())
294+
.addTransferListener(transferListener)
295+
.addTransferListener(transferListenerMock)
296+
.build()).completionFuture().cancel(true);
297+
298+
Thread.sleep(500);
299+
assertThat(transferListener.getExceptionCaught()).isInstanceOf(CancellationException.class);
300+
assertThat(transferListener.isTransferComplete()).isFalse();
301+
assertThat(transferListener.isTransferInitiated()).isTrue();
302+
assertMockOnFailure(transferListenerMock);
303+
}
304+
305+
@Test
306+
void copyWithJavaBasedClient_listeners_reports_ProgressWhenSuccess_copy() throws InterruptedException {
307+
String destinationKey = "copiedObj";
308+
S3AsyncClient s3Async = s3AsyncClient(true);
309+
310+
TransferListener transferListenerMock = mock(TransferListener.class);
311+
312+
stubFor(head(anyUrl()).willReturn(aResponse().withStatus(200).withHeader("Content-Length", "16777216")));
313+
314+
String createMpuUrl = "/" + EXAMPLE_BUCKET + "/" + destinationKey + "?uploads";
315+
String createMpuResponse = "<CreateMultipartUploadResult><UploadId>1234</UploadId></CreateMultipartUploadResult>";
316+
stubFor(post(urlEqualTo(createMpuUrl)).willReturn(aResponse().withStatus(200).withBody(createMpuResponse)));
317+
318+
String copyObjectUrl = "/" + EXAMPLE_BUCKET + "/" + destinationKey + "?uploadId=1234";
319+
String copyObjectUrl1 = "/" + EXAMPLE_BUCKET + "/" + destinationKey + "?partNumber=1&uploadId=1234";
320+
String copyObjectUrl2 = "/" + EXAMPLE_BUCKET + "/" + destinationKey + "?partNumber=2&uploadId=1234";
321+
322+
String copyObjectResponse = "<CopyPartResult><ETag>test-etag</ETag></CopyPartResult>";
323+
stubFor(post(copyObjectUrl).willReturn(aResponse().withStatus(200).withBody(copyObjectResponse)));
324+
stubFor(put(copyObjectUrl1).willReturn(aResponse().withStatus(200).withBody(copyObjectResponse)));
325+
stubFor(put(copyObjectUrl2).willReturn(aResponse().withStatus(200).withBody(copyObjectResponse)));
326+
327+
S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
328+
mock(TransferManagerConfiguration.class),
329+
mock(DownloadDirectoryHelper.class));
330+
CaptureTransferListener transferListener = new CaptureTransferListener();
331+
332+
tm.copy(u -> u.copyObjectRequest(p -> p
333+
.sourceBucket(EXAMPLE_BUCKET)
334+
.sourceKey(TEST_KEY)
335+
.destinationBucket(EXAMPLE_BUCKET)
336+
.destinationKey(destinationKey))
337+
.addTransferListener(LoggingTransferListener.create())
338+
.addTransferListener(transferListener)
339+
.addTransferListener(transferListenerMock)
340+
.build());
341+
342+
Thread.sleep(500);
343+
assertThat(transferListener.getExceptionCaught()).isNull();
344+
assertThat(transferListener.isTransferComplete()).isTrue();
345+
assertThat(transferListener.isTransferInitiated()).isTrue();
346+
Mockito.verify(transferListenerMock, times(0)).transferFailed(ArgumentMatchers.any());
347+
Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
348+
Mockito.verify(transferListenerMock, times(1)).transferComplete(ArgumentMatchers.any());
349+
350+
int numTimesBytesTransferred = 2;
351+
Mockito.verify(transferListenerMock, times(numTimesBytesTransferred)).bytesTransferred(ArgumentMatchers.any());
352+
}
212353
}

0 commit comments

Comments
 (0)