Skip to content

Commit c974d90

Browse files
committed
Add integration style test to Download/Upload DirectoryHelperTest to verify maxConncurent downloads is respected
Signed-off-by: Andre Kurait <[email protected]>
1 parent 27863d7 commit c974d90

File tree

5 files changed

+232
-3
lines changed

5 files changed

+232
-3
lines changed

.changes/next-release/feature-S3TransferManager-ca5314b.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"type": "feature",
33
"category": "S3 Transfer Manager",
44
"contributor": "AndreKurait",
5-
"description": "Extend [#5031](https://github.com/aws/aws-sdk-java-v2/pull/5031) by allowing user configuration of directoryTransferMaxConcurrency during construction of S3TransferManager. This allows for more control of upper limits on JVM Heap memory allocation. See [#6330](https://github.com/aws/aws-sdk-java-v2/issues/6330)"
5+
"description": "Extend [#5031](https://github.com/aws/aws-sdk-java-v2/pull/5031) by allowing user configuration of directoryTransferMaxConcurrency during construction of S3TransferManager. This allows for more control of upper limits on JVM Heap memory allocation. See [#6330](https://github.com/aws/aws-sdk-java-v2/issues/6330)."
66
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
109109
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
110110
failedFileDownloads),
111111
allOfFutures,
112-
transferConfiguration.option(TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY));
112+
transferConfiguration.option(
113+
TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY
114+
));
113115
listObjectsHelper.listS3ObjectsRecursively(request)
114116
.filter(downloadDirectoryRequest.filter())
115117
.subscribe(asyncBufferingSubscriber);

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
102102
AsyncBufferingSubscriber<Path> bufferingSubscriber =
103103
new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path),
104104
allOfFutures,
105-
transferConfiguration.option(TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY));
105+
transferConfiguration.option(
106+
TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY
107+
));
106108

107109
iterablePublisher.subscribe(bufferingSubscriber);
108110
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,18 @@
3535
import java.util.Arrays;
3636
import java.util.Collection;
3737
import java.util.List;
38+
import java.time.Duration;
3839
import java.util.UUID;
3940
import java.util.concurrent.CancellationException;
4041
import java.util.concurrent.CompletableFuture;
42+
import java.util.concurrent.Phaser;
4143
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.atomic.AtomicInteger;
45+
import java.util.concurrent.locks.LockSupport;
4246
import java.util.function.Consumer;
4347
import java.util.function.Function;
4448
import java.util.stream.Collectors;
49+
import java.util.stream.IntStream;
4550
import org.assertj.core.util.Sets;
4651
import org.junit.jupiter.api.AfterEach;
4752
import org.junit.jupiter.api.BeforeEach;
@@ -521,6 +526,114 @@ void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_fails
521526

522527
assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception);
523528
}
529+
@Test
530+
void downloadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception {
531+
// Create many S3 objects to test concurrency
532+
int totalFiles = 50;
533+
String[] keys = new String[totalFiles];
534+
for (int i = 0; i < totalFiles; i++) {
535+
keys[i] = "file" + i + ".txt";
536+
}
537+
stubSuccessfulListObjects(listObjectsHelper, keys);
538+
539+
// Configuration with expected concurrency limit
540+
int configuredMaxConcurrency = 5;
541+
542+
// Track concurrent operations
543+
AtomicInteger currentlyActive = new AtomicInteger(0);
544+
AtomicInteger peakConcurrency = new AtomicInteger(0);
545+
546+
// Use a Phaser to coordinate phases
547+
Phaser phaser = new Phaser(1); // Start with test thread
548+
549+
// Mock the single download function to track concurrent operations
550+
when(singleDownloadFunction.apply(any())).thenAnswer(invocation -> {
551+
phaser.register(); // Each operation registers
552+
553+
CompletableFuture<CompletedFileDownload> future = new CompletableFuture<>();
554+
555+
CompletableFuture.runAsync(() -> {
556+
try {
557+
// Track entry
558+
int current = currentlyActive.incrementAndGet();
559+
peakConcurrency.updateAndGet(max -> Math.max(max, current));
560+
561+
// Wait at barrier - this forces all operations to be active simultaneously
562+
phaser.arriveAndAwaitAdvance();
563+
564+
// Complete
565+
future.complete(CompletedFileDownload.builder()
566+
.response(GetObjectResponse.builder().eTag("test").build())
567+
.build());
568+
569+
} catch (Exception e) {
570+
future.completeExceptionally(e);
571+
} finally {
572+
currentlyActive.decrementAndGet();
573+
phaser.arriveAndDeregister();
574+
}
575+
});
576+
577+
return new DefaultFileDownload(future,
578+
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder()
579+
.transferredBytes(0L)
580+
.build()),
581+
() -> DownloadFileRequest.builder()
582+
.getObjectRequest(GetObjectRequest.builder().build())
583+
.destination(Paths.get("."))
584+
.build(),
585+
null);
586+
});
587+
588+
// Configure with our expected limit
589+
// To verify test works as intended, verify test failure when directoryTransferMaxConcurrency is
590+
// configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1
591+
TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder()
592+
.directoryTransferMaxConcurrency(configuredMaxConcurrency)
593+
.build();
594+
595+
DownloadDirectoryHelper customHelper = new DownloadDirectoryHelper(
596+
customConfig, listObjectsHelper, singleDownloadFunction);
597+
598+
// Start download asynchronously
599+
CompletableFuture<Void> downloadTask = CompletableFuture.runAsync(() -> {
600+
DirectoryDownload download = customHelper.downloadDirectory(
601+
DownloadDirectoryRequest.builder()
602+
.destination(directory)
603+
.bucket("bucket")
604+
.build()
605+
);
606+
download.completionFuture().join();
607+
});
608+
609+
// Wait for operations to register (but not complete the phase)
610+
// We wait for more than the configured limit to ensure we'd catch violations
611+
Duration maxWait = Duration.ofSeconds(5);
612+
long deadline = System.nanoTime() + maxWait.toNanos();
613+
int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread
614+
while (current < configuredMaxConcurrency) {
615+
if (System.nanoTime() >= deadline) {
616+
throw new AssertionError(
617+
"Timed out waiting for registrations: current=" + current +
618+
", configuredMaxConcurrency=" + configuredMaxConcurrency);
619+
}
620+
LockSupport.parkNanos(10_000_000L); // ~10 ms
621+
current = phaser.getRegisteredParties() - 1;
622+
}
623+
624+
// Check peak BEFORE releasing the phase
625+
int observedPeak = peakConcurrency.get();
626+
assertThat(observedPeak)
627+
.as("Implementation allowed %d concurrent operations but was configured for %d",
628+
observedPeak, configuredMaxConcurrency)
629+
.isEqualTo(configuredMaxConcurrency);
630+
631+
// Release the phase to let operations complete
632+
phaser.arriveAndDeregister();
633+
634+
// Complete the test
635+
downloadTask.get(2, TimeUnit.SECONDS);
636+
}
524637

525638
private static DefaultFileDownload completedDownload() {
526639
return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder()

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@
3030
import java.nio.file.Files;
3131
import java.nio.file.Path;
3232
import java.nio.file.Paths;
33+
import java.time.Duration;
3334
import java.util.Arrays;
3435
import java.util.Collection;
3536
import java.util.List;
3637
import java.util.concurrent.CancellationException;
3738
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.Phaser;
3840
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.concurrent.locks.LockSupport;
3943
import java.util.function.Consumer;
4044
import java.util.function.Function;
4145
import java.util.stream.Collectors;
@@ -458,6 +462,114 @@ public void uploadDirectory_requestTransformFunctionThrows_failsUpload() {
458462
assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception);
459463
}
460464

465+
@Test
466+
void uploadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception {
467+
// Create many files to test concurrency
468+
Path testDir = jimfs.getPath("concurrencyTest");
469+
Files.createDirectory(testDir);
470+
int totalFiles = 50;
471+
for (int i = 0; i < totalFiles; i++) {
472+
Files.createFile(jimfs.getPath("concurrencyTest/file" + i + ".txt"));
473+
}
474+
475+
// Configuration with expected concurrency limit
476+
int configuredMaxConcurrency = 5;
477+
478+
// Track concurrent operations
479+
AtomicInteger currentlyActive = new AtomicInteger(0);
480+
AtomicInteger peakConcurrency = new AtomicInteger(0);
481+
482+
// Use a Phaser to coordinate phases
483+
Phaser phaser = new Phaser(1); // Start with test thread
484+
485+
// Mock the single upload function to track concurrent operations
486+
when(singleUploadFunction.apply(any())).thenAnswer(invocation -> {
487+
phaser.register(); // Each operation registers
488+
489+
CompletableFuture<CompletedFileUpload> future = new CompletableFuture<>();
490+
491+
CompletableFuture.runAsync(() -> {
492+
try {
493+
// Track entry
494+
int current = currentlyActive.incrementAndGet();
495+
peakConcurrency.updateAndGet(max -> Math.max(max, current));
496+
497+
// Wait at barrier - this forces all operations to be active simultaneously
498+
phaser.arriveAndAwaitAdvance();
499+
500+
// Complete
501+
future.complete(CompletedFileUpload.builder()
502+
.response(PutObjectResponse.builder().eTag("test").build())
503+
.build());
504+
505+
} catch (Exception e) {
506+
future.completeExceptionally(e);
507+
} finally {
508+
currentlyActive.decrementAndGet();
509+
phaser.arriveAndDeregister();
510+
}
511+
});
512+
513+
return new DefaultFileUpload(future,
514+
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder()
515+
.transferredBytes(0L)
516+
.build()),
517+
new PauseObservable(),
518+
UploadFileRequest.builder()
519+
.putObjectRequest(p -> p.key("key").bucket("bucket"))
520+
.source(Paths.get("test.txt"))
521+
.build());
522+
});
523+
524+
// Configure with our expected limit
525+
// To verify test works as intended, verify test failure when directoryTransferMaxConcurrency is
526+
// configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1
527+
TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder()
528+
.directoryTransferMaxConcurrency(configuredMaxConcurrency)
529+
.build();
530+
531+
UploadDirectoryHelper customHelper = new UploadDirectoryHelper(customConfig, singleUploadFunction);
532+
533+
// Start upload asynchronously
534+
CompletableFuture<Void> uploadTask = CompletableFuture.runAsync(() -> {
535+
DirectoryUpload upload = customHelper.uploadDirectory(
536+
UploadDirectoryRequest.builder()
537+
.source(testDir)
538+
.bucket("bucket")
539+
.build()
540+
);
541+
upload.completionFuture().join();
542+
});
543+
544+
// Wait for operations to register (but not complete the phase)
545+
// We wait for more than the configured limit to ensure we'd catch violations
546+
Duration maxWait = Duration.ofSeconds(5);
547+
long deadline = System.nanoTime() + maxWait.toNanos();
548+
int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread
549+
while (current < configuredMaxConcurrency) {
550+
if (System.nanoTime() >= deadline) {
551+
throw new AssertionError(
552+
"Timed out waiting for registrations: current=" + current +
553+
", configuredMaxConcurrency=" + configuredMaxConcurrency);
554+
}
555+
LockSupport.parkNanos(10_000_000L); // ~10 ms
556+
current = phaser.getRegisteredParties() - 1;
557+
}
558+
559+
// Check peak BEFORE releasing the phase
560+
int observedPeak = peakConcurrency.get();
561+
assertThat(observedPeak)
562+
.as("Implementation allowed %d concurrent operations but was configured for %d",
563+
observedPeak, configuredMaxConcurrency)
564+
.isEqualTo(configuredMaxConcurrency);
565+
566+
// Release the phase to let operations complete
567+
phaser.arriveAndDeregister();
568+
569+
// Complete the test
570+
uploadTask.get(2, TimeUnit.SECONDS);
571+
}
572+
461573

462574
private DefaultFileUpload completedUpload() {
463575
return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder()

0 commit comments

Comments
 (0)