Skip to content

Commit be0a85b

Browse files
committed
Add ability to specify directoryTransferMaxConcurrency in S3TransferManager creation to limit concurrency on download and upload directory to control memory use
Signed-off-by: Andre Kurait <[email protected]>
1 parent 2d18a41 commit be0a85b

File tree

7 files changed

+59
-5
lines changed

7 files changed

+59
-5
lines changed

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,18 @@ interface Builder {
780780
*/
781781
Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth);
782782

783+
/**
784+
* Specifies the maximum number of concurrent file transfers that will be performed when
785+
* uploading or downloading a directory. This setting controls the concurrency for an individual
786+
* call to {@link S3TransferManager#uploadDirectory} or {@link S3TransferManager#downloadDirectory}.
787+
* <p>
788+
* Default to 100
789+
*
790+
* @param directoryTransferMaxConcurrency the maximum number of concurrent file transfers
791+
* @return This builder for method chaining.
792+
*/
793+
Builder directoryTransferMaxConcurrency(Integer directoryTransferMaxConcurrency);
794+
783795
/**
784796
* Builds an instance of {@link S3TransferManager} based on the settings supplied to this builder
785797
*

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package software.amazon.awssdk.transfer.s3.internal;
1717

1818
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
19-
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
2019
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;
2120

2221
import java.io.IOException;
@@ -110,7 +109,7 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
110109
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
111110
failedFileDownloads),
112111
allOfFutures,
113-
DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
112+
transferConfiguration.option(TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY));
114113
listObjectsHelper.listS3ObjectsRecursively(request)
115114
.filter(downloadDirectoryRequest.filter())
116115
.subscribe(asyncBufferingSubscriber);

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {
3232
public static final TransferConfigurationOption<Boolean> UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS =
3333
new TransferConfigurationOption<>("UploadDirectoryFileVisitOption", Boolean.class);
3434

35+
public static final TransferConfigurationOption<Integer> DIRECTORY_TRANSFER_MAX_CONCURRENCY =
36+
new TransferConfigurationOption<>("DirectoryTransferMaxConcurrency", Integer.class);
37+
3538
public static final TransferConfigurationOption<Executor> EXECUTOR =
3639
new TransferConfigurationOption<>("Executor", Executor.class);
3740

@@ -45,6 +48,7 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {
4548
.builder()
4649
.put(UPLOAD_DIRECTORY_MAX_DEPTH, DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH)
4750
.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, false)
51+
.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY)
4852
.build();
4953

5054
private final String name;
@@ -69,4 +73,3 @@ public String toString() {
6973
return name;
7074
}
7175
}
72-

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.transfer.s3.internal;
1717

18+
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY;
1819
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.TRANSFER_MANAGER_DEFAULTS;
1920
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS;
2021
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH;
@@ -42,6 +43,7 @@ private TransferManagerConfiguration(Builder builder) {
4243
AttributeMap.Builder standardOptions = AttributeMap.builder();
4344
standardOptions.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, builder.uploadDirectoryFollowSymbolicLinks);
4445
standardOptions.put(UPLOAD_DIRECTORY_MAX_DEPTH, builder.uploadDirectoryMaxDepth);
46+
standardOptions.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, builder.directoryTransferMaxConcurrency);
4547
finalizeExecutor(builder, standardOptions);
4648
options = standardOptions.build().merge(TRANSFER_MANAGER_DEFAULTS);
4749
}
@@ -97,6 +99,7 @@ public static final class Builder {
9799

98100
private Boolean uploadDirectoryFollowSymbolicLinks;
99101
private Integer uploadDirectoryMaxDepth;
102+
private Integer directoryTransferMaxConcurrency;
100103
private Executor executor;
101104

102105

@@ -110,6 +113,11 @@ public Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth) {
110113
return this;
111114
}
112115

116+
public Builder directoryTransferMaxConcurrency(Integer directoryTransferMaxConcurrency) {
117+
this.directoryTransferMaxConcurrency = directoryTransferMaxConcurrency;
118+
return this;
119+
}
120+
113121
public Builder executor(Executor executor) {
114122
this.executor = executor;
115123
return this;

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ private static TransferManagerConfiguration resolveTransferManagerConfiguration(
8686
TransferManagerConfiguration.Builder transferConfigBuilder = TransferManagerConfiguration.builder();
8787
transferConfigBuilder.uploadDirectoryFollowSymbolicLinks(tmBuilder.uploadDirectoryFollowSymbolicLinks);
8888
transferConfigBuilder.uploadDirectoryMaxDepth(tmBuilder.uploadDirectoryMaxDepth);
89+
transferConfigBuilder.directoryTransferMaxConcurrency(tmBuilder.directoryTransferMaxConcurrency);
8990
transferConfigBuilder.executor(tmBuilder.executor);
9091
return transferConfigBuilder.build();
9192
}
@@ -95,6 +96,7 @@ public static final class DefaultBuilder implements S3TransferManager.Builder {
9596
private Executor executor;
9697
private Boolean uploadDirectoryFollowSymbolicLinks;
9798
private Integer uploadDirectoryMaxDepth;
99+
private Integer directoryTransferMaxConcurrency;
98100

99101
@Override
100102
public DefaultBuilder s3Client(S3AsyncClient s3AsyncClient) {
@@ -136,6 +138,20 @@ public Integer getUploadDirectoryMaxDepth() {
136138
return uploadDirectoryMaxDepth;
137139
}
138140

141+
@Override
142+
public DefaultBuilder directoryTransferMaxConcurrency(Integer directoryTransferMaxConcurrency) {
143+
this.directoryTransferMaxConcurrency = directoryTransferMaxConcurrency;
144+
return this;
145+
}
146+
147+
public void setDirectoryTransferMaxConcurrency(Integer directoryTransferMaxConcurrency) {
148+
directoryTransferMaxConcurrency(directoryTransferMaxConcurrency);
149+
}
150+
151+
public Integer getDirectoryTransferMaxConcurrency() {
152+
return directoryTransferMaxConcurrency;
153+
}
154+
139155
@Override
140156
public S3TransferManager build() {
141157
return createTransferManager(this);

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package software.amazon.awssdk.transfer.s3.internal;
1717

1818
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
19-
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
2019
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;
2120

2221
import java.io.IOException;
@@ -102,7 +101,8 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
102101

103102
AsyncBufferingSubscriber<Path> bufferingSubscriber =
104103
new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path),
105-
allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
104+
allOfFutures,
105+
transferConfiguration.option(TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY));
106106

107107
iterablePublisher.subscribe(bufferingSubscriber);
108108
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerConfigurationTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.mockito.Mockito.never;
2020
import static org.mockito.Mockito.verify;
21+
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY;
2122
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.EXECUTOR;
2223
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS;
2324
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH;
@@ -62,9 +63,24 @@ public void noOverride_shouldUseDefaults() {
6263
transferManagerConfiguration = TransferManagerConfiguration.builder().build();
6364
assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS)).isFalse();
6465
assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_MAX_DEPTH)).isEqualTo(Integer.MAX_VALUE);
66+
assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(100);
6567
assertThat(transferManagerConfiguration.option(EXECUTOR)).isNotNull();
6668
}
6769

70+
@Test
71+
public void directoryTransferMaxConcurrency_customValue_shouldBeStored() {
72+
transferManagerConfiguration = TransferManagerConfiguration.builder()
73+
.directoryTransferMaxConcurrency(50)
74+
.build();
75+
assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(50);
76+
}
77+
78+
@Test
79+
public void directoryTransferMaxConcurrency_noOverride_shouldUseDefault() {
80+
transferManagerConfiguration = TransferManagerConfiguration.builder().build();
81+
assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(100);
82+
}
83+
6884
@Test
6985
public void close_noCustomExecutor_shouldCloseDefaultOne() {
7086
transferManagerConfiguration = TransferManagerConfiguration.builder().build();

0 commit comments

Comments
 (0)