Skip to content

Commit 8966845

Browse files
authored
Add ability to specify transferDirectoryMaxConcurrency in S3TransferManager (#6329)
* Add ability to specify directoryTransferMaxConcurrency in S3TransferManager creation to limit concurrency on download and upload directory to control memory use Signed-off-by: Andre Kurait <[email protected]> * Add changelog Signed-off-by: Andre Kurait <[email protected]> * Add integration style test to Download/Upload DirectoryHelperTest to verify maxConncurent downloads is respected Signed-off-by: Andre Kurait <[email protected]> * Remove unnecissary test for directoryTransferMaxConcurrency default value Signed-off-by: Andre Kurait <[email protected]> * Rename to TransferDirectoryMaxConcurrency Signed-off-by: Andre Kurait <[email protected]> --------- Signed-off-by: Andre Kurait <[email protected]>
1 parent 240dc52 commit 8966845

File tree

10 files changed

+307
-10
lines changed

10 files changed

+307
-10
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "AndreKurait",
5+
"description": "Extend [#5031](https://github.com/aws/aws-sdk-java-v2/pull/5031) by allowing user configuration of transferDirectoryMaxConcurrency during construction of S3TransferManager. This allows for more control of upper limits on JVM Heap memory allocation. See [#6330](https://github.com/aws/aws-sdk-java-v2/issues/6330)."
6+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,18 @@ interface Builder {
780780
*/
781781
Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth);
782782

783+
/**
784+
* Specifies the maximum number of concurrent file transfers that will be performed when
785+
* uploading or downloading a directory. This setting controls the concurrency for an individual
786+
* call to {@link S3TransferManager#uploadDirectory} or {@link S3TransferManager#downloadDirectory}.
787+
* <p>
788+
* Default to 100
789+
*
790+
* @param transferDirectoryMaxConcurrency the maximum number of concurrent file transfers
791+
* @return This builder for method chaining.
792+
*/
793+
Builder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency);
794+
783795
/**
784796
* Builds an instance of {@link S3TransferManager} based on the settings supplied to this builder
785797
*

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package software.amazon.awssdk.transfer.s3.internal;
1717

1818
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
19-
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
2019
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;
2120

2221
import java.io.IOException;
@@ -110,7 +109,9 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
110109
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
111110
failedFileDownloads),
112111
allOfFutures,
113-
DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
112+
transferConfiguration.option(
113+
TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY
114+
));
114115
listObjectsHelper.listS3ObjectsRecursively(request)
115116
.filter(downloadDirectoryRequest.filter())
116117
.subscribe(asyncBufferingSubscriber);

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {
3232
public static final TransferConfigurationOption<Boolean> UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS =
3333
new TransferConfigurationOption<>("UploadDirectoryFileVisitOption", Boolean.class);
3434

35+
public static final TransferConfigurationOption<Integer> DIRECTORY_TRANSFER_MAX_CONCURRENCY =
36+
new TransferConfigurationOption<>("TransferDirectoryMaxConcurrency", Integer.class);
37+
3538
public static final TransferConfigurationOption<Executor> EXECUTOR =
3639
new TransferConfigurationOption<>("Executor", Executor.class);
3740

@@ -45,6 +48,7 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {
4548
.builder()
4649
.put(UPLOAD_DIRECTORY_MAX_DEPTH, DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH)
4750
.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, false)
51+
.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY)
4852
.build();
4953

5054
private final String name;
@@ -69,4 +73,3 @@ public String toString() {
6973
return name;
7074
}
7175
}
72-

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.transfer.s3.internal;
1717

18+
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY;
1819
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.TRANSFER_MANAGER_DEFAULTS;
1920
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS;
2021
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH;
@@ -42,6 +43,7 @@ private TransferManagerConfiguration(Builder builder) {
4243
AttributeMap.Builder standardOptions = AttributeMap.builder();
4344
standardOptions.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, builder.uploadDirectoryFollowSymbolicLinks);
4445
standardOptions.put(UPLOAD_DIRECTORY_MAX_DEPTH, builder.uploadDirectoryMaxDepth);
46+
standardOptions.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, builder.transferDirectoryMaxConcurrency);
4547
finalizeExecutor(builder, standardOptions);
4648
options = standardOptions.build().merge(TRANSFER_MANAGER_DEFAULTS);
4749
}
@@ -97,6 +99,7 @@ public static final class Builder {
9799

98100
private Boolean uploadDirectoryFollowSymbolicLinks;
99101
private Integer uploadDirectoryMaxDepth;
102+
private Integer transferDirectoryMaxConcurrency;
100103
private Executor executor;
101104

102105

@@ -110,6 +113,11 @@ public Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth) {
110113
return this;
111114
}
112115

116+
public Builder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) {
117+
this.transferDirectoryMaxConcurrency = transferDirectoryMaxConcurrency;
118+
return this;
119+
}
120+
113121
public Builder executor(Executor executor) {
114122
this.executor = executor;
115123
return this;

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ private static TransferManagerConfiguration resolveTransferManagerConfiguration(
8686
TransferManagerConfiguration.Builder transferConfigBuilder = TransferManagerConfiguration.builder();
8787
transferConfigBuilder.uploadDirectoryFollowSymbolicLinks(tmBuilder.uploadDirectoryFollowSymbolicLinks);
8888
transferConfigBuilder.uploadDirectoryMaxDepth(tmBuilder.uploadDirectoryMaxDepth);
89+
transferConfigBuilder.transferDirectoryMaxConcurrency(tmBuilder.transferDirectoryMaxConcurrency);
8990
transferConfigBuilder.executor(tmBuilder.executor);
9091
return transferConfigBuilder.build();
9192
}
@@ -95,6 +96,7 @@ public static final class DefaultBuilder implements S3TransferManager.Builder {
9596
private Executor executor;
9697
private Boolean uploadDirectoryFollowSymbolicLinks;
9798
private Integer uploadDirectoryMaxDepth;
99+
private Integer transferDirectoryMaxConcurrency;
98100

99101
@Override
100102
public DefaultBuilder s3Client(S3AsyncClient s3AsyncClient) {
@@ -136,6 +138,20 @@ public Integer getUploadDirectoryMaxDepth() {
136138
return uploadDirectoryMaxDepth;
137139
}
138140

141+
@Override
142+
public DefaultBuilder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) {
143+
this.transferDirectoryMaxConcurrency = transferDirectoryMaxConcurrency;
144+
return this;
145+
}
146+
147+
public void setTransferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) {
148+
transferDirectoryMaxConcurrency(transferDirectoryMaxConcurrency);
149+
}
150+
151+
public Integer getTransferDirectoryMaxConcurrency() {
152+
return transferDirectoryMaxConcurrency;
153+
}
154+
139155
@Override
140156
public S3TransferManager build() {
141157
return createTransferManager(this);

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package software.amazon.awssdk.transfer.s3.internal;
1717

1818
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
19-
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
2019
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;
2120

2221
import java.io.IOException;
@@ -102,7 +101,10 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
102101

103102
AsyncBufferingSubscriber<Path> bufferingSubscriber =
104103
new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path),
105-
allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
104+
allOfFutures,
105+
transferConfiguration.option(
106+
TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY
107+
));
106108

107109
iterablePublisher.subscribe(bufferingSubscriber);
108110
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,18 @@
3535
import java.util.Arrays;
3636
import java.util.Collection;
3737
import java.util.List;
38+
import java.time.Duration;
3839
import java.util.UUID;
3940
import java.util.concurrent.CancellationException;
4041
import java.util.concurrent.CompletableFuture;
42+
import java.util.concurrent.Phaser;
4143
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.atomic.AtomicInteger;
45+
import java.util.concurrent.locks.LockSupport;
4246
import java.util.function.Consumer;
4347
import java.util.function.Function;
4448
import java.util.stream.Collectors;
49+
import java.util.stream.IntStream;
4550
import org.assertj.core.util.Sets;
4651
import org.junit.jupiter.api.AfterEach;
4752
import org.junit.jupiter.api.BeforeEach;
@@ -521,6 +526,114 @@ void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_fails
521526

522527
assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception);
523528
}
529+
@Test
530+
void downloadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception {
531+
// Create many S3 objects to test concurrency
532+
int totalFiles = 50;
533+
String[] keys = new String[totalFiles];
534+
for (int i = 0; i < totalFiles; i++) {
535+
keys[i] = "file" + i + ".txt";
536+
}
537+
stubSuccessfulListObjects(listObjectsHelper, keys);
538+
539+
// Configuration with expected concurrency limit
540+
int configuredMaxConcurrency = 5;
541+
542+
// Track concurrent operations
543+
AtomicInteger currentlyActive = new AtomicInteger(0);
544+
AtomicInteger peakConcurrency = new AtomicInteger(0);
545+
546+
// Use a Phaser to coordinate phases
547+
Phaser phaser = new Phaser(1); // Start with test thread
548+
549+
// Mock the single download function to track concurrent operations
550+
when(singleDownloadFunction.apply(any())).thenAnswer(invocation -> {
551+
phaser.register(); // Each operation registers
552+
553+
CompletableFuture<CompletedFileDownload> future = new CompletableFuture<>();
554+
555+
CompletableFuture.runAsync(() -> {
556+
try {
557+
// Track entry
558+
int current = currentlyActive.incrementAndGet();
559+
peakConcurrency.updateAndGet(max -> Math.max(max, current));
560+
561+
// Wait at barrier - this forces all operations to be active simultaneously
562+
phaser.arriveAndAwaitAdvance();
563+
564+
// Complete
565+
future.complete(CompletedFileDownload.builder()
566+
.response(GetObjectResponse.builder().eTag("test").build())
567+
.build());
568+
569+
} catch (Exception e) {
570+
future.completeExceptionally(e);
571+
} finally {
572+
currentlyActive.decrementAndGet();
573+
phaser.arriveAndDeregister();
574+
}
575+
});
576+
577+
return new DefaultFileDownload(future,
578+
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder()
579+
.transferredBytes(0L)
580+
.build()),
581+
() -> DownloadFileRequest.builder()
582+
.getObjectRequest(GetObjectRequest.builder().build())
583+
.destination(Paths.get("."))
584+
.build(),
585+
null);
586+
});
587+
588+
// Configure with our expected limit
589+
// To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is
590+
// configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1
591+
TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder()
592+
.transferDirectoryMaxConcurrency(configuredMaxConcurrency)
593+
.build();
594+
595+
DownloadDirectoryHelper customHelper = new DownloadDirectoryHelper(
596+
customConfig, listObjectsHelper, singleDownloadFunction);
597+
598+
// Start download asynchronously
599+
CompletableFuture<Void> downloadTask = CompletableFuture.runAsync(() -> {
600+
DirectoryDownload download = customHelper.downloadDirectory(
601+
DownloadDirectoryRequest.builder()
602+
.destination(directory)
603+
.bucket("bucket")
604+
.build()
605+
);
606+
download.completionFuture().join();
607+
});
608+
609+
// Wait for operations to register (but not complete the phase)
610+
// We wait for more than the configured limit to ensure we'd catch violations
611+
Duration maxWait = Duration.ofSeconds(5);
612+
long deadline = System.nanoTime() + maxWait.toNanos();
613+
int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread
614+
while (current < configuredMaxConcurrency) {
615+
if (System.nanoTime() >= deadline) {
616+
throw new AssertionError(
617+
"Timed out waiting for registrations: current=" + current +
618+
", configuredMaxConcurrency=" + configuredMaxConcurrency);
619+
}
620+
LockSupport.parkNanos(10_000_000L); // ~10 ms
621+
current = phaser.getRegisteredParties() - 1;
622+
}
623+
624+
// Check peak BEFORE releasing the phase
625+
int observedPeak = peakConcurrency.get();
626+
assertThat(observedPeak)
627+
.as("Implementation allowed %d concurrent operations but was configured for %d",
628+
observedPeak, configuredMaxConcurrency)
629+
.isEqualTo(configuredMaxConcurrency);
630+
631+
// Release the phase to let operations complete
632+
phaser.arriveAndDeregister();
633+
634+
// Complete the test
635+
downloadTask.get(2, TimeUnit.SECONDS);
636+
}
524637

525638
private static DefaultFileDownload completedDownload() {
526639
return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder()

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfigurationTest.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.mockito.Mockito.never;
2020
import static org.mockito.Mockito.verify;
21+
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY;
2122
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.EXECUTOR;
2223
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS;
2324
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH;
@@ -58,11 +59,34 @@ public void resolveFollowSymlinks_requestOverride_requestOverrideShouldTakePrece
5859
}
5960

6061
@Test
61-
public void noOverride_shouldUseDefaults() {
62-
transferManagerConfiguration = TransferManagerConfiguration.builder().build();
63-
assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS)).isFalse();
64-
assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_MAX_DEPTH)).isEqualTo(Integer.MAX_VALUE);
65-
assertThat(transferManagerConfiguration.option(EXECUTOR)).isNotNull();
62+
public void noOverride_orNullOverride_shouldUseDefaults() {
63+
assertDefaultTransferManagerConfiguration(
64+
TransferManagerConfiguration.builder().build()
65+
);
66+
67+
assertDefaultTransferManagerConfiguration(
68+
TransferManagerConfiguration.builder()
69+
.transferDirectoryMaxConcurrency(null)
70+
.executor(null)
71+
.uploadDirectoryFollowSymbolicLinks(null)
72+
.uploadDirectoryMaxDepth(null)
73+
.build()
74+
);
75+
}
76+
77+
private void assertDefaultTransferManagerConfiguration(TransferManagerConfiguration config) {
78+
assertThat(config.option(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS)).isFalse();
79+
assertThat(config.option(UPLOAD_DIRECTORY_MAX_DEPTH)).isEqualTo(Integer.MAX_VALUE);
80+
assertThat(config.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(100);
81+
assertThat(config.option(EXECUTOR)).isNotNull();
82+
}
83+
84+
@Test
85+
public void transferDirectoryMaxConcurrency_customValue_shouldBeStored() {
86+
transferManagerConfiguration = TransferManagerConfiguration.builder()
87+
.transferDirectoryMaxConcurrency(50)
88+
.build();
89+
assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(50);
6690
}
6791

6892
@Test

0 commit comments

Comments
 (0)