@@ -526,114 +526,6 @@ void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_fails
526
526
527
527
assertThatThrownBy (downloadDirectory .completionFuture ()::join ).hasCause (exception );
528
528
}
529
- @ Test
530
- void downloadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations () throws Exception {
531
- // Create many S3 objects to test concurrency
532
- int totalFiles = 50 ;
533
- String [] keys = new String [totalFiles ];
534
- for (int i = 0 ; i < totalFiles ; i ++) {
535
- keys [i ] = "file" + i + ".txt" ;
536
- }
537
- stubSuccessfulListObjects (listObjectsHelper , keys );
538
-
539
- // Configuration with expected concurrency limit
540
- int configuredMaxConcurrency = 5 ;
541
-
542
- // Track concurrent operations
543
- AtomicInteger currentlyActive = new AtomicInteger (0 );
544
- AtomicInteger peakConcurrency = new AtomicInteger (0 );
545
-
546
- // Use a Phaser to coordinate phases
547
- Phaser phaser = new Phaser (1 ); // Start with test thread
548
-
549
- // Mock the single download function to track concurrent operations
550
- when (singleDownloadFunction .apply (any ())).thenAnswer (invocation -> {
551
- phaser .register (); // Each operation registers
552
-
553
- CompletableFuture <CompletedFileDownload > future = new CompletableFuture <>();
554
-
555
- CompletableFuture .runAsync (() -> {
556
- try {
557
- // Track entry
558
- int current = currentlyActive .incrementAndGet ();
559
- peakConcurrency .updateAndGet (max -> Math .max (max , current ));
560
-
561
- // Wait at barrier - this forces all operations to be active simultaneously
562
- phaser .arriveAndAwaitAdvance ();
563
-
564
- // Complete
565
- future .complete (CompletedFileDownload .builder ()
566
- .response (GetObjectResponse .builder ().eTag ("test" ).build ())
567
- .build ());
568
-
569
- } catch (Exception e ) {
570
- future .completeExceptionally (e );
571
- } finally {
572
- currentlyActive .decrementAndGet ();
573
- phaser .arriveAndDeregister ();
574
- }
575
- });
576
-
577
- return new DefaultFileDownload (future ,
578
- new DefaultTransferProgress (DefaultTransferProgressSnapshot .builder ()
579
- .transferredBytes (0L )
580
- .build ()),
581
- () -> DownloadFileRequest .builder ()
582
- .getObjectRequest (GetObjectRequest .builder ().build ())
583
- .destination (Paths .get ("." ))
584
- .build (),
585
- null );
586
- });
587
-
588
- // Configure with our expected limit
589
- // To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is
590
- // configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1
591
- TransferManagerConfiguration customConfig = TransferManagerConfiguration .builder ()
592
- .transferDirectoryMaxConcurrency (configuredMaxConcurrency )
593
- .build ();
594
-
595
- DownloadDirectoryHelper customHelper = new DownloadDirectoryHelper (
596
- customConfig , listObjectsHelper , singleDownloadFunction );
597
-
598
- // Start download asynchronously
599
- CompletableFuture <Void > downloadTask = CompletableFuture .runAsync (() -> {
600
- DirectoryDownload download = customHelper .downloadDirectory (
601
- DownloadDirectoryRequest .builder ()
602
- .destination (directory )
603
- .bucket ("bucket" )
604
- .build ()
605
- );
606
- download .completionFuture ().join ();
607
- });
608
-
609
- // Wait for operations to register (but not complete the phase)
610
- // We wait for more than the configured limit to ensure we'd catch violations
611
- Duration maxWait = Duration .ofSeconds (5 );
612
- long deadline = System .nanoTime () + maxWait .toNanos ();
613
- int current = phaser .getRegisteredParties () - 1 ; // Subtract 1 for main thread
614
- while (current < configuredMaxConcurrency ) {
615
- if (System .nanoTime () >= deadline ) {
616
- throw new AssertionError (
617
- "Timed out waiting for registrations: current=" + current +
618
- ", configuredMaxConcurrency=" + configuredMaxConcurrency );
619
- }
620
- LockSupport .parkNanos (10_000_000L ); // ~10 ms
621
- current = phaser .getRegisteredParties () - 1 ;
622
- }
623
-
624
- // Check peak BEFORE releasing the phase
625
- int observedPeak = peakConcurrency .get ();
626
- assertThat (observedPeak )
627
- .as ("Implementation allowed %d concurrent operations but was configured for %d" ,
628
- observedPeak , configuredMaxConcurrency )
629
- .isEqualTo (configuredMaxConcurrency );
630
-
631
- // Release the phase to let operations complete
632
- phaser .arriveAndDeregister ();
633
-
634
- // Complete the test
635
- downloadTask .get (2 , TimeUnit .SECONDS );
636
- }
637
529
638
530
private static DefaultFileDownload completedDownload () {
639
531
return new DefaultFileDownload (CompletableFuture .completedFuture (CompletedFileDownload .builder ()
0 commit comments