diff --git a/.changes/next-release/feature-S3TransferManager-ca5314b.json b/.changes/next-release/feature-S3TransferManager-ca5314b.json new file mode 100644 index 000000000000..80812b8f4202 --- /dev/null +++ b/.changes/next-release/feature-S3TransferManager-ca5314b.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "S3 Transfer Manager", + "contributor": "AndreKurait", + "description": "Extend [#5031](https://github.com/aws/aws-sdk-java-v2/pull/5031) by allowing user configuration of transferDirectoryMaxConcurrency during construction of S3TransferManager. This allows for more control of upper limits on JVM Heap memory allocation. See [#6330](https://github.com/aws/aws-sdk-java-v2/issues/6330)." +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java index 5bf9b55ed657..b5c0e823a91a 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java @@ -780,6 +780,18 @@ interface Builder { */ Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth); + /** + * Specifies the maximum number of concurrent file transfers that will be performed when + * uploading or downloading a directory. This setting controls the concurrency for an individual + * call to {@link S3TransferManager#uploadDirectory} or {@link S3TransferManager#downloadDirectory}. + *

+ * Default to 100 + * + * @param transferDirectoryMaxConcurrency the maximum number of concurrent file transfers + * @return This builder for method chaining. + */ + Builder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency); + /** * Builds an instance of {@link S3TransferManager} based on the settings supplied to this builder * diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java index 5343cb7e87f6..69f59473d060 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java @@ -16,7 +16,6 @@ package software.amazon.awssdk.transfer.s3.internal; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER; -import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX; import java.io.IOException; @@ -110,7 +109,9 @@ private void doDownloadDirectory(CompletableFuture r new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request, failedFileDownloads), allOfFutures, - DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY); + transferConfiguration.option( + TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY + )); listObjectsHelper.listS3ObjectsRecursively(request) .filter(downloadDirectoryRequest.filter()) .subscribe(asyncBufferingSubscriber); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java index 33d124eb7c13..39dd1da474be 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java @@ -32,6 +32,9 @@ public final class TransferConfigurationOption extends AttributeMap.Key { public static final TransferConfigurationOption UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS = new TransferConfigurationOption<>("UploadDirectoryFileVisitOption", Boolean.class); + public static final TransferConfigurationOption DIRECTORY_TRANSFER_MAX_CONCURRENCY = + new TransferConfigurationOption<>("TransferDirectoryMaxConcurrency", Integer.class); + public static final TransferConfigurationOption EXECUTOR = new TransferConfigurationOption<>("Executor", Executor.class); @@ -45,6 +48,7 @@ public final class TransferConfigurationOption extends AttributeMap.Key { .builder() .put(UPLOAD_DIRECTORY_MAX_DEPTH, DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH) .put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, false) + .put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY) .build(); private final String name; @@ -69,4 +73,3 @@ public String toString() { return name; } } - diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfiguration.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfiguration.java index a8f71c7fb59b..164dd59077f4 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfiguration.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfiguration.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.transfer.s3.internal; +import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.TRANSFER_MANAGER_DEFAULTS; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH; @@ -42,6 +43,7 @@ private TransferManagerConfiguration(Builder builder) { AttributeMap.Builder standardOptions = AttributeMap.builder(); standardOptions.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, builder.uploadDirectoryFollowSymbolicLinks); standardOptions.put(UPLOAD_DIRECTORY_MAX_DEPTH, builder.uploadDirectoryMaxDepth); + standardOptions.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, builder.transferDirectoryMaxConcurrency); finalizeExecutor(builder, standardOptions); options = standardOptions.build().merge(TRANSFER_MANAGER_DEFAULTS); } @@ -97,6 +99,7 @@ public static final class Builder { private Boolean uploadDirectoryFollowSymbolicLinks; private Integer uploadDirectoryMaxDepth; + private Integer transferDirectoryMaxConcurrency; private Executor executor; @@ -110,6 +113,11 @@ public Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth) { return this; } + public Builder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) { + this.transferDirectoryMaxConcurrency = transferDirectoryMaxConcurrency; + return this; + } + public Builder executor(Executor executor) { this.executor = executor; return this; diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java index 429d534e074e..a927eb500281 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java @@ -86,6 +86,7 @@ private static TransferManagerConfiguration resolveTransferManagerConfiguration( TransferManagerConfiguration.Builder transferConfigBuilder = TransferManagerConfiguration.builder(); transferConfigBuilder.uploadDirectoryFollowSymbolicLinks(tmBuilder.uploadDirectoryFollowSymbolicLinks); transferConfigBuilder.uploadDirectoryMaxDepth(tmBuilder.uploadDirectoryMaxDepth); + transferConfigBuilder.transferDirectoryMaxConcurrency(tmBuilder.transferDirectoryMaxConcurrency); transferConfigBuilder.executor(tmBuilder.executor); return transferConfigBuilder.build(); } @@ -95,6 +96,7 @@ public static final class DefaultBuilder implements S3TransferManager.Builder { private Executor executor; private Boolean uploadDirectoryFollowSymbolicLinks; private Integer uploadDirectoryMaxDepth; + private Integer transferDirectoryMaxConcurrency; @Override public DefaultBuilder s3Client(S3AsyncClient s3AsyncClient) { @@ -136,6 +138,20 @@ public Integer getUploadDirectoryMaxDepth() { return uploadDirectoryMaxDepth; } + @Override + public DefaultBuilder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) { + this.transferDirectoryMaxConcurrency = transferDirectoryMaxConcurrency; + return this; + } + + public void setTransferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) { + transferDirectoryMaxConcurrency(transferDirectoryMaxConcurrency); + } + + public Integer getTransferDirectoryMaxConcurrency() { + return transferDirectoryMaxConcurrency; + } + @Override public S3TransferManager build() { return createTransferManager(this); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java index a6d9c6f1a270..f5b1917bc89a 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java @@ -16,7 +16,6 @@ package software.amazon.awssdk.transfer.s3.internal; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER; -import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX; import java.io.IOException; @@ -102,7 +101,10 @@ private void doUploadDirectory(CompletableFuture retur AsyncBufferingSubscriber bufferingSubscriber = new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path), - allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY); + allOfFutures, + transferConfiguration.option( + TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY + )); iterablePublisher.subscribe(bufferingSubscriber); CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures); diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java index 7d78b482ee69..50a3cf4cb2a8 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java @@ -35,13 +35,18 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.time.Duration; import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.assertj.core.util.Sets; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -521,6 +526,114 @@ void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_fails assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception); } + @Test + void downloadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception { + // Create many S3 objects to test concurrency + int totalFiles = 50; + String[] keys = new String[totalFiles]; + for (int i = 0; i < totalFiles; i++) { + keys[i] = "file" + i + ".txt"; + } + stubSuccessfulListObjects(listObjectsHelper, keys); + + // Configuration with expected concurrency limit + int configuredMaxConcurrency = 5; + + // Track concurrent operations + AtomicInteger currentlyActive = new AtomicInteger(0); + AtomicInteger peakConcurrency = new AtomicInteger(0); + + // Use a Phaser to coordinate phases + Phaser phaser = new Phaser(1); // Start with test thread + + // Mock the single download function to track concurrent operations + when(singleDownloadFunction.apply(any())).thenAnswer(invocation -> { + phaser.register(); // Each operation registers + + CompletableFuture future = new CompletableFuture<>(); + + CompletableFuture.runAsync(() -> { + try { + // Track entry + int current = currentlyActive.incrementAndGet(); + peakConcurrency.updateAndGet(max -> Math.max(max, current)); + + // Wait at barrier - this forces all operations to be active simultaneously + phaser.arriveAndAwaitAdvance(); + + // Complete + future.complete(CompletedFileDownload.builder() + .response(GetObjectResponse.builder().eTag("test").build()) + .build()); + + } catch (Exception e) { + future.completeExceptionally(e); + } finally { + currentlyActive.decrementAndGet(); + phaser.arriveAndDeregister(); + } + }); + + return new DefaultFileDownload(future, + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder() + .transferredBytes(0L) + .build()), + () -> DownloadFileRequest.builder() + .getObjectRequest(GetObjectRequest.builder().build()) + .destination(Paths.get(".")) + .build(), + null); + }); + + // Configure with our expected limit + // To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is + // configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1 + TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder() + .transferDirectoryMaxConcurrency(configuredMaxConcurrency) + .build(); + + DownloadDirectoryHelper customHelper = new DownloadDirectoryHelper( + customConfig, listObjectsHelper, singleDownloadFunction); + + // Start download asynchronously + CompletableFuture downloadTask = CompletableFuture.runAsync(() -> { + DirectoryDownload download = customHelper.downloadDirectory( + DownloadDirectoryRequest.builder() + .destination(directory) + .bucket("bucket") + .build() + ); + download.completionFuture().join(); + }); + + // Wait for operations to register (but not complete the phase) + // We wait for more than the configured limit to ensure we'd catch violations + Duration maxWait = Duration.ofSeconds(5); + long deadline = System.nanoTime() + maxWait.toNanos(); + int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread + while (current < configuredMaxConcurrency) { + if (System.nanoTime() >= deadline) { + throw new AssertionError( + "Timed out waiting for registrations: current=" + current + + ", configuredMaxConcurrency=" + configuredMaxConcurrency); + } + LockSupport.parkNanos(10_000_000L); // ~10 ms + current = phaser.getRegisteredParties() - 1; + } + + // Check peak BEFORE releasing the phase + int observedPeak = peakConcurrency.get(); + assertThat(observedPeak) + .as("Implementation allowed %d concurrent operations but was configured for %d", + observedPeak, configuredMaxConcurrency) + .isEqualTo(configuredMaxConcurrency); + + // Release the phase to let operations complete + phaser.arriveAndDeregister(); + + // Complete the test + downloadTask.get(2, TimeUnit.SECONDS); + } private static DefaultFileDownload completedDownload() { return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder() diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfigurationTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfigurationTest.java index 7709a5812878..cf8635fde49c 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfigurationTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfigurationTest.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.EXECUTOR; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH; @@ -58,11 +59,34 @@ public void resolveFollowSymlinks_requestOverride_requestOverrideShouldTakePrece } @Test - public void noOverride_shouldUseDefaults() { - transferManagerConfiguration = TransferManagerConfiguration.builder().build(); - assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS)).isFalse(); - assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_MAX_DEPTH)).isEqualTo(Integer.MAX_VALUE); - assertThat(transferManagerConfiguration.option(EXECUTOR)).isNotNull(); + public void noOverride_orNullOverride_shouldUseDefaults() { + assertDefaultTransferManagerConfiguration( + TransferManagerConfiguration.builder().build() + ); + + assertDefaultTransferManagerConfiguration( + TransferManagerConfiguration.builder() + .transferDirectoryMaxConcurrency(null) + .executor(null) + .uploadDirectoryFollowSymbolicLinks(null) + .uploadDirectoryMaxDepth(null) + .build() + ); + } + + private void assertDefaultTransferManagerConfiguration(TransferManagerConfiguration config) { + assertThat(config.option(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS)).isFalse(); + assertThat(config.option(UPLOAD_DIRECTORY_MAX_DEPTH)).isEqualTo(Integer.MAX_VALUE); + assertThat(config.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(100); + assertThat(config.option(EXECUTOR)).isNotNull(); + } + + @Test + public void transferDirectoryMaxConcurrency_customValue_shouldBeStored() { + transferManagerConfiguration = TransferManagerConfiguration.builder() + .transferDirectoryMaxConcurrency(50) + .build(); + assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(50); } @Test diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java index 735f5b735653..ffc9a3e7b547 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java @@ -30,12 +30,16 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -458,6 +462,114 @@ public void uploadDirectory_requestTransformFunctionThrows_failsUpload() { assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception); } + @Test + void uploadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception { + // Create many files to test concurrency + Path testDir = jimfs.getPath("concurrencyTest"); + Files.createDirectory(testDir); + int totalFiles = 50; + for (int i = 0; i < totalFiles; i++) { + Files.createFile(jimfs.getPath("concurrencyTest/file" + i + ".txt")); + } + + // Configuration with expected concurrency limit + int configuredMaxConcurrency = 5; + + // Track concurrent operations + AtomicInteger currentlyActive = new AtomicInteger(0); + AtomicInteger peakConcurrency = new AtomicInteger(0); + + // Use a Phaser to coordinate phases + Phaser phaser = new Phaser(1); // Start with test thread + + // Mock the single upload function to track concurrent operations + when(singleUploadFunction.apply(any())).thenAnswer(invocation -> { + phaser.register(); // Each operation registers + + CompletableFuture future = new CompletableFuture<>(); + + CompletableFuture.runAsync(() -> { + try { + // Track entry + int current = currentlyActive.incrementAndGet(); + peakConcurrency.updateAndGet(max -> Math.max(max, current)); + + // Wait at barrier - this forces all operations to be active simultaneously + phaser.arriveAndAwaitAdvance(); + + // Complete + future.complete(CompletedFileUpload.builder() + .response(PutObjectResponse.builder().eTag("test").build()) + .build()); + + } catch (Exception e) { + future.completeExceptionally(e); + } finally { + currentlyActive.decrementAndGet(); + phaser.arriveAndDeregister(); + } + }); + + return new DefaultFileUpload(future, + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder() + .transferredBytes(0L) + .build()), + new PauseObservable(), + UploadFileRequest.builder() + .putObjectRequest(p -> p.key("key").bucket("bucket")) + .source(Paths.get("test.txt")) + .build()); + }); + + // Configure with our expected limit + // To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is + // configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1 + TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder() + .transferDirectoryMaxConcurrency(configuredMaxConcurrency) + .build(); + + UploadDirectoryHelper customHelper = new UploadDirectoryHelper(customConfig, singleUploadFunction); + + // Start upload asynchronously + CompletableFuture uploadTask = CompletableFuture.runAsync(() -> { + DirectoryUpload upload = customHelper.uploadDirectory( + UploadDirectoryRequest.builder() + .source(testDir) + .bucket("bucket") + .build() + ); + upload.completionFuture().join(); + }); + + // Wait for operations to register (but not complete the phase) + // We wait for more than the configured limit to ensure we'd catch violations + Duration maxWait = Duration.ofSeconds(5); + long deadline = System.nanoTime() + maxWait.toNanos(); + int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread + while (current < configuredMaxConcurrency) { + if (System.nanoTime() >= deadline) { + throw new AssertionError( + "Timed out waiting for registrations: current=" + current + + ", configuredMaxConcurrency=" + configuredMaxConcurrency); + } + LockSupport.parkNanos(10_000_000L); // ~10 ms + current = phaser.getRegisteredParties() - 1; + } + + // Check peak BEFORE releasing the phase + int observedPeak = peakConcurrency.get(); + assertThat(observedPeak) + .as("Implementation allowed %d concurrent operations but was configured for %d", + observedPeak, configuredMaxConcurrency) + .isEqualTo(configuredMaxConcurrency); + + // Release the phase to let operations complete + phaser.arriveAndDeregister(); + + // Complete the test + uploadTask.get(2, TimeUnit.SECONDS); + } + private DefaultFileUpload completedUpload() { return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder()