Skip to content

Add ability to specify transferDirectoryMaxConcurrency in S3TransferManager #6329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "S3 Transfer Manager",
"contributor": "AndreKurait",
"description": "Extend [#5031](https://github.com/aws/aws-sdk-java-v2/pull/5031) by allowing user configuration of directoryTransferMaxConcurrency during construction of S3TransferManager. This allows for more control of upper limits on JVM Heap memory allocation. See [#6330](https://github.com/aws/aws-sdk-java-v2/issues/6330)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,18 @@ interface Builder {
*/
Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth);

/**
* Specifies the maximum number of concurrent file transfers that will be performed when
* uploading or downloading a directory. This setting controls the concurrency for an individual
* call to {@link S3TransferManager#uploadDirectory} or {@link S3TransferManager#downloadDirectory}.
* <p>
* Default to 100
*
* @param directoryTransferMaxConcurrency the maximum number of concurrent file transfers
* @return This builder for method chaining.
*/
Builder directoryTransferMaxConcurrency(Integer directoryTransferMaxConcurrency);

/**
* Builds an instance of {@link S3TransferManager} based on the settings supplied to this builder
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand Down Expand Up @@ -110,7 +109,7 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
failedFileDownloads),
allOfFutures,
DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
transferConfiguration.option(TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY));
listObjectsHelper.listS3ObjectsRecursively(request)
.filter(downloadDirectoryRequest.filter())
.subscribe(asyncBufferingSubscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {
public static final TransferConfigurationOption<Boolean> UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS =
new TransferConfigurationOption<>("UploadDirectoryFileVisitOption", Boolean.class);

public static final TransferConfigurationOption<Integer> DIRECTORY_TRANSFER_MAX_CONCURRENCY =
new TransferConfigurationOption<>("DirectoryTransferMaxConcurrency", Integer.class);

public static final TransferConfigurationOption<Executor> EXECUTOR =
new TransferConfigurationOption<>("Executor", Executor.class);

Expand All @@ -45,6 +48,7 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {
.builder()
.put(UPLOAD_DIRECTORY_MAX_DEPTH, DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH)
.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, false)
.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY)
.build();

private final String name;
Expand All @@ -69,4 +73,3 @@ public String toString() {
return name;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.TRANSFER_MANAGER_DEFAULTS;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH;
Expand Down Expand Up @@ -42,6 +43,7 @@ private TransferManagerConfiguration(Builder builder) {
AttributeMap.Builder standardOptions = AttributeMap.builder();
standardOptions.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, builder.uploadDirectoryFollowSymbolicLinks);
standardOptions.put(UPLOAD_DIRECTORY_MAX_DEPTH, builder.uploadDirectoryMaxDepth);
standardOptions.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, builder.directoryTransferMaxConcurrency);
finalizeExecutor(builder, standardOptions);
options = standardOptions.build().merge(TRANSFER_MANAGER_DEFAULTS);
}
Expand Down Expand Up @@ -97,6 +99,7 @@ public static final class Builder {

private Boolean uploadDirectoryFollowSymbolicLinks;
private Integer uploadDirectoryMaxDepth;
private Integer directoryTransferMaxConcurrency;
private Executor executor;


Expand All @@ -110,6 +113,11 @@ public Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth) {
return this;
}

public Builder directoryTransferMaxConcurrency(Integer directoryTransferMaxConcurrency) {
this.directoryTransferMaxConcurrency = directoryTransferMaxConcurrency;
return this;
}

public Builder executor(Executor executor) {
this.executor = executor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private static TransferManagerConfiguration resolveTransferManagerConfiguration(
TransferManagerConfiguration.Builder transferConfigBuilder = TransferManagerConfiguration.builder();
transferConfigBuilder.uploadDirectoryFollowSymbolicLinks(tmBuilder.uploadDirectoryFollowSymbolicLinks);
transferConfigBuilder.uploadDirectoryMaxDepth(tmBuilder.uploadDirectoryMaxDepth);
transferConfigBuilder.directoryTransferMaxConcurrency(tmBuilder.directoryTransferMaxConcurrency);
transferConfigBuilder.executor(tmBuilder.executor);
return transferConfigBuilder.build();
}
Expand All @@ -95,6 +96,7 @@ public static final class DefaultBuilder implements S3TransferManager.Builder {
private Executor executor;
private Boolean uploadDirectoryFollowSymbolicLinks;
private Integer uploadDirectoryMaxDepth;
private Integer directoryTransferMaxConcurrency;

@Override
public DefaultBuilder s3Client(S3AsyncClient s3AsyncClient) {
Expand Down Expand Up @@ -136,6 +138,20 @@ public Integer getUploadDirectoryMaxDepth() {
return uploadDirectoryMaxDepth;
}

@Override
public DefaultBuilder directoryTransferMaxConcurrency(Integer directoryTransferMaxConcurrency) {
this.directoryTransferMaxConcurrency = directoryTransferMaxConcurrency;
return this;
}

public void setDirectoryTransferMaxConcurrency(Integer directoryTransferMaxConcurrency) {
directoryTransferMaxConcurrency(directoryTransferMaxConcurrency);
}

public Integer getDirectoryTransferMaxConcurrency() {
return directoryTransferMaxConcurrency;
}

@Override
public S3TransferManager build() {
return createTransferManager(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand Down Expand Up @@ -102,7 +101,8 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur

AsyncBufferingSubscriber<Path> bufferingSubscriber =
new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path),
allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
allOfFutures,
transferConfiguration.option(TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY));

iterablePublisher.subscribe(bufferingSubscriber);
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.EXECUTOR;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH;
Expand Down Expand Up @@ -62,9 +63,24 @@ public void noOverride_shouldUseDefaults() {
transferManagerConfiguration = TransferManagerConfiguration.builder().build();
assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS)).isFalse();
assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_MAX_DEPTH)).isEqualTo(Integer.MAX_VALUE);
assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(100);
assertThat(transferManagerConfiguration.option(EXECUTOR)).isNotNull();
}

@Test
public void directoryTransferMaxConcurrency_customValue_shouldBeStored() {
transferManagerConfiguration = TransferManagerConfiguration.builder()
.directoryTransferMaxConcurrency(50)
.build();
assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(50);
}

@Test
public void directoryTransferMaxConcurrency_noOverride_shouldUseDefault() {
transferManagerConfiguration = TransferManagerConfiguration.builder().build();
assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(100);
}

@Test
public void close_noCustomExecutor_shouldCloseDefaultOne() {
transferManagerConfiguration = TransferManagerConfiguration.builder().build();
Expand Down
Loading