diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index a4b7b68e75b..3e64783561e 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -98,6 +98,7 @@ void differentNumberOfStrings_shouldCompleteSuccessfully(int numberOfStrings) th assertThat(numRequestsInFlightSampling).contains(MAX_CONCURRENT_EXECUTIONS); } disposable.dispose(); + numRequestsInFlightSampling.forEach(maxConcurrency -> assertThat(maxConcurrency).isLessThanOrEqualTo(MAX_CONCURRENT_EXECUTIONS)); } @Test diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java index 50a3cf4cb2a..dc8536e8fc6 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java @@ -526,114 +526,6 @@ void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_fails assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception); } - @Test - void downloadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception { - // Create many S3 objects to test concurrency - int totalFiles = 50; - String[] keys = new String[totalFiles]; - for (int i = 0; i < totalFiles; i++) { - keys[i] = "file" + i + ".txt"; - } - stubSuccessfulListObjects(listObjectsHelper, keys); - - // Configuration with expected concurrency limit - int configuredMaxConcurrency = 5; - - // Track concurrent operations - AtomicInteger currentlyActive = new AtomicInteger(0); - AtomicInteger peakConcurrency = new AtomicInteger(0); - - // Use a Phaser to coordinate phases - Phaser phaser = new Phaser(1); // Start with test thread - - // Mock the single download function to track concurrent operations - when(singleDownloadFunction.apply(any())).thenAnswer(invocation -> { - phaser.register(); // Each operation registers - - CompletableFuture future = new CompletableFuture<>(); - - CompletableFuture.runAsync(() -> { - try { - // Track entry - int current = currentlyActive.incrementAndGet(); - peakConcurrency.updateAndGet(max -> Math.max(max, current)); - - // Wait at barrier - this forces all operations to be active simultaneously - phaser.arriveAndAwaitAdvance(); - - // Complete - future.complete(CompletedFileDownload.builder() - .response(GetObjectResponse.builder().eTag("test").build()) - .build()); - - } catch (Exception e) { - future.completeExceptionally(e); - } finally { - currentlyActive.decrementAndGet(); - phaser.arriveAndDeregister(); - } - }); - - return new DefaultFileDownload(future, - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder() - .transferredBytes(0L) - .build()), - () -> DownloadFileRequest.builder() - .getObjectRequest(GetObjectRequest.builder().build()) - .destination(Paths.get(".")) - .build(), - null); - }); - - // Configure with our expected limit - // To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is - // configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1 - TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder() - .transferDirectoryMaxConcurrency(configuredMaxConcurrency) - .build(); - - DownloadDirectoryHelper customHelper = new DownloadDirectoryHelper( - customConfig, listObjectsHelper, singleDownloadFunction); - - // Start download asynchronously - CompletableFuture downloadTask = CompletableFuture.runAsync(() -> { - DirectoryDownload download = customHelper.downloadDirectory( - DownloadDirectoryRequest.builder() - .destination(directory) - .bucket("bucket") - .build() - ); - download.completionFuture().join(); - }); - - // Wait for operations to register (but not complete the phase) - // We wait for more than the configured limit to ensure we'd catch violations - Duration maxWait = Duration.ofSeconds(5); - long deadline = System.nanoTime() + maxWait.toNanos(); - int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread - while (current < configuredMaxConcurrency) { - if (System.nanoTime() >= deadline) { - throw new AssertionError( - "Timed out waiting for registrations: current=" + current + - ", configuredMaxConcurrency=" + configuredMaxConcurrency); - } - LockSupport.parkNanos(10_000_000L); // ~10 ms - current = phaser.getRegisteredParties() - 1; - } - - // Check peak BEFORE releasing the phase - int observedPeak = peakConcurrency.get(); - assertThat(observedPeak) - .as("Implementation allowed %d concurrent operations but was configured for %d", - observedPeak, configuredMaxConcurrency) - .isEqualTo(configuredMaxConcurrency); - - // Release the phase to let operations complete - phaser.arriveAndDeregister(); - - // Complete the test - downloadTask.get(2, TimeUnit.SECONDS); - } private static DefaultFileDownload completedDownload() { return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder() diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java index 9c2fd9dacdc..7b2c613efe0 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java @@ -462,115 +462,6 @@ public void uploadDirectory_requestTransformFunctionThrows_failsUpload() { assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception); } - @Test - void uploadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception { - // Create many files to test concurrency - Path testDir = jimfs.getPath("concurrencyTest"); - Files.createDirectory(testDir); - int totalFiles = 50; - for (int i = 0; i < totalFiles; i++) { - Files.createFile(jimfs.getPath("concurrencyTest/file" + i + ".txt")); - } - - // Configuration with expected concurrency limit - int configuredMaxConcurrency = 5; - - // Track concurrent operations - AtomicInteger currentlyActive = new AtomicInteger(0); - AtomicInteger peakConcurrency = new AtomicInteger(0); - - // Use a Phaser to coordinate phases - Phaser phaser = new Phaser(1); // Start with test thread - - // Mock the single upload function to track concurrent operations - when(singleUploadFunction.apply(any())).thenAnswer(invocation -> { - phaser.register(); // Each operation registers - - CompletableFuture future = new CompletableFuture<>(); - - CompletableFuture.runAsync(() -> { - try { - // Track entry - int current = currentlyActive.incrementAndGet(); - peakConcurrency.updateAndGet(max -> Math.max(max, current)); - - // Wait at barrier - this forces all operations to be active simultaneously - phaser.arriveAndAwaitAdvance(); - - // Complete - future.complete(CompletedFileUpload.builder() - .response(PutObjectResponse.builder().eTag("test").build()) - .build()); - - } catch (Exception e) { - future.completeExceptionally(e); - } finally { - currentlyActive.decrementAndGet(); - phaser.arriveAndDeregister(); - } - }); - - return new DefaultFileUpload(future, - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder() - .transferredBytes(0L) - .build()), - new PauseObservable(), - UploadFileRequest.builder() - .putObjectRequest(p -> p.key("key").bucket("bucket")) - .source(Paths.get("test.txt")) - .build()); - }); - - // Configure with our expected limit - // To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is - // configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1 - TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder() - .transferDirectoryMaxConcurrency(configuredMaxConcurrency) - .build(); - - UploadDirectoryHelper customHelper = new UploadDirectoryHelper(customConfig, singleUploadFunction); - - // Start upload asynchronously - CompletableFuture uploadTask = CompletableFuture.runAsync(() -> { - DirectoryUpload upload = customHelper.uploadDirectory( - UploadDirectoryRequest.builder() - .source(testDir) - .bucket("bucket") - .build() - ); - upload.completionFuture().join(); - }); - - // Wait for operations to register (but not complete the phase) - // We wait for more than the configured limit to ensure we'd catch violations - Duration maxWait = Duration.ofSeconds(5); - long deadline = System.nanoTime() + maxWait.toNanos(); - int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread - while (current < configuredMaxConcurrency) { - if (System.nanoTime() >= deadline) { - throw new AssertionError( - "Timed out waiting for registrations: current=" + current + - ", configuredMaxConcurrency=" + configuredMaxConcurrency); - } - LockSupport.parkNanos(10_000_000L); // ~10 ms - current = phaser.getRegisteredParties() - 1; - } - - // Check peak BEFORE releasing the phase - int observedPeak = peakConcurrency.get(); - assertThat(observedPeak) - .as("Implementation allowed %d concurrent operations but was configured for %d", - observedPeak, configuredMaxConcurrency) - .isLessThanOrEqualTo(configuredMaxConcurrency); - - // Release the phase to let operations complete - phaser.arriveAndDeregister(); - - // Complete the test - uploadTask.get(2, TimeUnit.SECONDS); - } - - private DefaultFileUpload completedUpload() { return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder() .response(PutObjectResponse.builder().build())