Skip to content

Add ability to specify transferDirectoryMaxConcurrency in S3TransferManager #6329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "S3 Transfer Manager",
"contributor": "AndreKurait",
"description": "Extend [#5031](https://github.com/aws/aws-sdk-java-v2/pull/5031) by allowing user configuration of transferDirectoryMaxConcurrency during construction of S3TransferManager. This allows for more control of upper limits on JVM Heap memory allocation. See [#6330](https://github.com/aws/aws-sdk-java-v2/issues/6330)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,18 @@ interface Builder {
*/
Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth);

/**
* Specifies the maximum number of concurrent file transfers that will be performed when
* uploading or downloading a directory. This setting controls the concurrency for an individual
* call to {@link S3TransferManager#uploadDirectory} or {@link S3TransferManager#downloadDirectory}.
* <p>
* Default to 100
*
* @param transferDirectoryMaxConcurrency the maximum number of concurrent file transfers
* @return This builder for method chaining.
*/
Builder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency);

/**
* Builds an instance of {@link S3TransferManager} based on the settings supplied to this builder
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand Down Expand Up @@ -110,7 +109,9 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
failedFileDownloads),
allOfFutures,
DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
transferConfiguration.option(
TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY
));
listObjectsHelper.listS3ObjectsRecursively(request)
.filter(downloadDirectoryRequest.filter())
.subscribe(asyncBufferingSubscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {
public static final TransferConfigurationOption<Boolean> UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS =
new TransferConfigurationOption<>("UploadDirectoryFileVisitOption", Boolean.class);

public static final TransferConfigurationOption<Integer> DIRECTORY_TRANSFER_MAX_CONCURRENCY =
new TransferConfigurationOption<>("TransferDirectoryMaxConcurrency", Integer.class);

public static final TransferConfigurationOption<Executor> EXECUTOR =
new TransferConfigurationOption<>("Executor", Executor.class);

Expand All @@ -45,6 +48,7 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {
.builder()
.put(UPLOAD_DIRECTORY_MAX_DEPTH, DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH)
.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, false)
.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY)
.build();

private final String name;
Expand All @@ -69,4 +73,3 @@ public String toString() {
return name;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.TRANSFER_MANAGER_DEFAULTS;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH;
Expand Down Expand Up @@ -42,6 +43,7 @@ private TransferManagerConfiguration(Builder builder) {
AttributeMap.Builder standardOptions = AttributeMap.builder();
standardOptions.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, builder.uploadDirectoryFollowSymbolicLinks);
standardOptions.put(UPLOAD_DIRECTORY_MAX_DEPTH, builder.uploadDirectoryMaxDepth);
standardOptions.put(DIRECTORY_TRANSFER_MAX_CONCURRENCY, builder.transferDirectoryMaxConcurrency);
finalizeExecutor(builder, standardOptions);
options = standardOptions.build().merge(TRANSFER_MANAGER_DEFAULTS);
}
Expand Down Expand Up @@ -97,6 +99,7 @@ public static final class Builder {

private Boolean uploadDirectoryFollowSymbolicLinks;
private Integer uploadDirectoryMaxDepth;
private Integer transferDirectoryMaxConcurrency;
private Executor executor;


Expand All @@ -110,6 +113,11 @@ public Builder uploadDirectoryMaxDepth(Integer uploadDirectoryMaxDepth) {
return this;
}

public Builder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) {
this.transferDirectoryMaxConcurrency = transferDirectoryMaxConcurrency;
return this;
}

public Builder executor(Executor executor) {
this.executor = executor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private static TransferManagerConfiguration resolveTransferManagerConfiguration(
TransferManagerConfiguration.Builder transferConfigBuilder = TransferManagerConfiguration.builder();
transferConfigBuilder.uploadDirectoryFollowSymbolicLinks(tmBuilder.uploadDirectoryFollowSymbolicLinks);
transferConfigBuilder.uploadDirectoryMaxDepth(tmBuilder.uploadDirectoryMaxDepth);
transferConfigBuilder.transferDirectoryMaxConcurrency(tmBuilder.transferDirectoryMaxConcurrency);
transferConfigBuilder.executor(tmBuilder.executor);
return transferConfigBuilder.build();
}
Expand All @@ -95,6 +96,7 @@ public static final class DefaultBuilder implements S3TransferManager.Builder {
private Executor executor;
private Boolean uploadDirectoryFollowSymbolicLinks;
private Integer uploadDirectoryMaxDepth;
private Integer transferDirectoryMaxConcurrency;

@Override
public DefaultBuilder s3Client(S3AsyncClient s3AsyncClient) {
Expand Down Expand Up @@ -136,6 +138,20 @@ public Integer getUploadDirectoryMaxDepth() {
return uploadDirectoryMaxDepth;
}

@Override
public DefaultBuilder transferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) {
this.transferDirectoryMaxConcurrency = transferDirectoryMaxConcurrency;
return this;
}

public void setTransferDirectoryMaxConcurrency(Integer transferDirectoryMaxConcurrency) {
transferDirectoryMaxConcurrency(transferDirectoryMaxConcurrency);
}

public Integer getTransferDirectoryMaxConcurrency() {
return transferDirectoryMaxConcurrency;
}

@Override
public S3TransferManager build() {
return createTransferManager(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand Down Expand Up @@ -102,7 +101,10 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur

AsyncBufferingSubscriber<Path> bufferingSubscriber =
new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path),
allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
allOfFutures,
transferConfiguration.option(
TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY
));

iterablePublisher.subscribe(bufferingSubscriber);
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -521,6 +526,114 @@ void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_fails

assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception);
}
@Test
void downloadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception {
// Create many S3 objects to test concurrency
int totalFiles = 50;
String[] keys = new String[totalFiles];
for (int i = 0; i < totalFiles; i++) {
keys[i] = "file" + i + ".txt";
}
stubSuccessfulListObjects(listObjectsHelper, keys);

// Configuration with expected concurrency limit
int configuredMaxConcurrency = 5;

// Track concurrent operations
AtomicInteger currentlyActive = new AtomicInteger(0);
AtomicInteger peakConcurrency = new AtomicInteger(0);

// Use a Phaser to coordinate phases
Phaser phaser = new Phaser(1); // Start with test thread

// Mock the single download function to track concurrent operations
when(singleDownloadFunction.apply(any())).thenAnswer(invocation -> {
phaser.register(); // Each operation registers

CompletableFuture<CompletedFileDownload> future = new CompletableFuture<>();

CompletableFuture.runAsync(() -> {
try {
// Track entry
int current = currentlyActive.incrementAndGet();
peakConcurrency.updateAndGet(max -> Math.max(max, current));

// Wait at barrier - this forces all operations to be active simultaneously
phaser.arriveAndAwaitAdvance();

// Complete
future.complete(CompletedFileDownload.builder()
.response(GetObjectResponse.builder().eTag("test").build())
.build());

} catch (Exception e) {
future.completeExceptionally(e);
} finally {
currentlyActive.decrementAndGet();
phaser.arriveAndDeregister();
}
});

return new DefaultFileDownload(future,
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder()
.transferredBytes(0L)
.build()),
() -> DownloadFileRequest.builder()
.getObjectRequest(GetObjectRequest.builder().build())
.destination(Paths.get("."))
.build(),
null);
});

// Configure with our expected limit
// To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is
// configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1
TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder()
.transferDirectoryMaxConcurrency(configuredMaxConcurrency)
.build();

DownloadDirectoryHelper customHelper = new DownloadDirectoryHelper(
customConfig, listObjectsHelper, singleDownloadFunction);

// Start download asynchronously
CompletableFuture<Void> downloadTask = CompletableFuture.runAsync(() -> {
DirectoryDownload download = customHelper.downloadDirectory(
DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.build()
);
download.completionFuture().join();
});

// Wait for operations to register (but not complete the phase)
// We wait for more than the configured limit to ensure we'd catch violations
Duration maxWait = Duration.ofSeconds(5);
long deadline = System.nanoTime() + maxWait.toNanos();
int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread
while (current < configuredMaxConcurrency) {
if (System.nanoTime() >= deadline) {
throw new AssertionError(
"Timed out waiting for registrations: current=" + current +
", configuredMaxConcurrency=" + configuredMaxConcurrency);
}
LockSupport.parkNanos(10_000_000L); // ~10 ms
current = phaser.getRegisteredParties() - 1;
}

// Check peak BEFORE releasing the phase
int observedPeak = peakConcurrency.get();
assertThat(observedPeak)
.as("Implementation allowed %d concurrent operations but was configured for %d",
observedPeak, configuredMaxConcurrency)
.isEqualTo(configuredMaxConcurrency);

// Release the phase to let operations complete
phaser.arriveAndDeregister();

// Complete the test
downloadTask.get(2, TimeUnit.SECONDS);
}

private static DefaultFileDownload completedDownload() {
return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.EXECUTOR;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.UPLOAD_DIRECTORY_MAX_DEPTH;
Expand Down Expand Up @@ -58,11 +59,34 @@ public void resolveFollowSymlinks_requestOverride_requestOverrideShouldTakePrece
}

@Test
public void noOverride_shouldUseDefaults() {
transferManagerConfiguration = TransferManagerConfiguration.builder().build();
assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS)).isFalse();
assertThat(transferManagerConfiguration.option(UPLOAD_DIRECTORY_MAX_DEPTH)).isEqualTo(Integer.MAX_VALUE);
assertThat(transferManagerConfiguration.option(EXECUTOR)).isNotNull();
public void noOverride_orNullOverride_shouldUseDefaults() {
assertDefaultTransferManagerConfiguration(
TransferManagerConfiguration.builder().build()
);

assertDefaultTransferManagerConfiguration(
TransferManagerConfiguration.builder()
.transferDirectoryMaxConcurrency(null)
.executor(null)
.uploadDirectoryFollowSymbolicLinks(null)
.uploadDirectoryMaxDepth(null)
.build()
);
}

private void assertDefaultTransferManagerConfiguration(TransferManagerConfiguration config) {
assertThat(config.option(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS)).isFalse();
assertThat(config.option(UPLOAD_DIRECTORY_MAX_DEPTH)).isEqualTo(Integer.MAX_VALUE);
assertThat(config.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(100);
assertThat(config.option(EXECUTOR)).isNotNull();
}

@Test
public void transferDirectoryMaxConcurrency_customValue_shouldBeStored() {
transferManagerConfiguration = TransferManagerConfiguration.builder()
.transferDirectoryMaxConcurrency(50)
.build();
assertThat(transferManagerConfiguration.option(DIRECTORY_TRANSFER_MAX_CONCURRENCY)).isEqualTo(50);
}

@Test
Expand Down
Loading
Loading