Skip to content

Commit f79332b

Browse files
authored
Fixed OOM issue that could surface when users try to download objects… (#5025)
* Fixed OOM issue that could surface when users try to download objects from an S3 bucket that has millions of small files. * Fix sonatype issue
1 parent 4d6a6e3 commit f79332b

File tree

5 files changed

+40
-30
lines changed

5 files changed

+40
-30
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Fixed OOM issue that could surface when users try to download objects from an S3 bucket that has millions of small files. See [#4987](https://github.com/aws/aws-sdk-java-v2/issues/4987)"
6+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515

1616
package software.amazon.awssdk.transfer.s3.internal;
1717

18+
import java.util.Set;
1819
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.ConcurrentHashMap;
1921
import java.util.concurrent.atomic.AtomicInteger;
2022
import java.util.function.Function;
2123
import org.reactivestreams.Subscriber;
@@ -40,13 +42,22 @@ public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
4042
private volatile boolean upstreamDone;
4143
private Subscription subscription;
4244

45+
private final Set<CompletableFuture<?>> requestsInFlight;
46+
4347
public AsyncBufferingSubscriber(Function<T, CompletableFuture<?>> consumer,
4448
CompletableFuture<Void> returnFuture,
4549
int maxConcurrentExecutions) {
4650
this.returnFuture = returnFuture;
4751
this.consumer = consumer;
4852
this.maxConcurrentExecutions = maxConcurrentExecutions;
4953
this.numRequestsInFlight = new AtomicInteger(0);
54+
this.requestsInFlight = ConcurrentHashMap.newKeySet();
55+
56+
returnFuture.whenComplete((r, t) -> {
57+
if (t != null) {
58+
requestsInFlight.forEach(f -> f.cancel(true));
59+
}
60+
});
5061
}
5162

5263
@Override
@@ -64,8 +75,11 @@ public void onSubscribe(Subscription subscription) {
6475
@Override
6576
public void onNext(T item) {
6677
numRequestsInFlight.incrementAndGet();
67-
consumer.apply(item).whenComplete((r, t) -> {
78+
CompletableFuture<?> currentRequest = consumer.apply(item);
79+
requestsInFlight.add(currentRequest);
80+
currentRequest.whenComplete((r, t) -> {
6881
checkForCompletion(numRequestsInFlight.decrementAndGet());
82+
requestsInFlight.remove(currentRequest);
6983
synchronized (this) {
7084
subscription.request(1);
7185
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,15 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
106106
Queue<FailedFileDownload> failedFileDownloads = new ConcurrentLinkedQueue<>();
107107

108108
CompletableFuture<Void> allOfFutures = new CompletableFuture<>();
109-
110109
AsyncBufferingSubscriber<S3Object> asyncBufferingSubscriber =
111-
new AsyncBufferingSubscriber<>(downloadSingleFile(returnFuture, downloadDirectoryRequest, request,
110+
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
112111
failedFileDownloads),
113112
allOfFutures,
114113
DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY);
115114
listObjectsHelper.listS3ObjectsRecursively(request)
116115
.filter(downloadDirectoryRequest.filter())
117116
.subscribe(asyncBufferingSubscriber);
117+
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);
118118

119119
allOfFutures.whenComplete((r, t) -> {
120120
if (t != null) {
@@ -128,19 +128,14 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
128128
}
129129

130130
private Function<S3Object, CompletableFuture<?>> downloadSingleFile(
131-
CompletableFuture<CompletedDirectoryDownload> returnFuture,
132131
DownloadDirectoryRequest downloadDirectoryRequest,
133132
ListObjectsV2Request listRequest,
134133
Queue<FailedFileDownload> failedFileDownloads) {
135134

136-
return s3Object -> {
137-
CompletableFuture<CompletedFileDownload> future = doDownloadSingleFile(downloadDirectoryRequest,
138-
failedFileDownloads,
139-
listRequest,
140-
s3Object);
141-
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
142-
return future;
143-
};
135+
return s3Object -> doDownloadSingleFile(downloadDirectoryRequest,
136+
failedFileDownloads,
137+
listRequest,
138+
s3Object);
144139
}
145140

146141
private Path determineDestinationPath(DownloadDirectoryRequest downloadDirectoryRequest,

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.junit.jupiter.api.Test;
3737
import org.junit.jupiter.params.ParameterizedTest;
3838
import org.junit.jupiter.params.provider.ValueSource;
39-
import org.reactivestreams.Subscription;
39+
import software.amazon.awssdk.utils.async.SimplePublisher;
4040

4141
class AsyncBufferingSubscriberTest {
4242
private static final int MAX_CONCURRENT_EXECUTIONS = 5;
@@ -96,20 +96,16 @@ void differentNumberOfStrings_shouldCompleteSuccessfully(int numberOfStrings) th
9696
}
9797

9898
@Test
99-
void onErrorInvoked_shouldCompleteFutureExceptionally() {
100-
subscriber.onSubscribe(new Subscription() {
101-
@Override
102-
public void request(long n) {
103-
104-
}
105-
106-
@Override
107-
public void cancel() {
108-
109-
}
110-
});
99+
void onErrorInvoked_shouldCompleteFutureExceptionallyAndCancelRequestsFuture() {
111100
RuntimeException exception = new RuntimeException("test");
112-
subscriber.onError(exception);
101+
SimplePublisher<String> simplePublisher = new SimplePublisher<>();
102+
simplePublisher.subscribe(subscriber);
103+
simplePublisher.send("test");
104+
simplePublisher.send("test");
105+
106+
simplePublisher.error(exception);
113107
assertThat(returnFuture).isCompletedExceptionally();
108+
assertThat(futures.get(0)).isCancelled();
109+
assertThat(futures.get(1)).isCancelled();
114110
}
115111
}

test/stability-tests/README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ First add tests to TestRunner Class, then run the following command.
3131

3232
```
3333
mvn clean install -pl :stability-tests --am -P quick
34-
mvn clean install -pl :bom-inernal
35-
cd test/stability-tests
36-
mvn package -P test-jar
37-
java -jar target/stability-tests-uber.jar
34+
mvn clean install -pl :bom-internal
35+
mvn package -P test-jar -pl :stability-tests
36+
java -jar test/stability-tests/target/stability-tests-uber.jar
3837
```
3938

4039
## Adding New Tests

0 commit comments

Comments
 (0)