Skip to content

Commit bb3f8a6

Browse files
committed
Fix bug in FileAsyncRequestBody where inflight parts were negative
1 parent 94e363d commit bb3f8a6

File tree

4 files changed

+31
-17
lines changed

4 files changed

+31
-17
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon S3",
4+
"contributor": "",
5+
"description": "Fix bug in S3 Multipart uploads with FileAsyncRequestBody - ensure that concurrency is limited correctly by bufferSizeInBytes"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import java.nio.ByteBuffer;
1919
import java.nio.file.Path;
2020
import java.nio.file.attribute.FileTime;
21+
import java.util.Collections;
22+
import java.util.HashSet;
2123
import java.util.Optional;
24+
import java.util.Set;
2225
import java.util.concurrent.atomic.AtomicBoolean;
2326
import java.util.concurrent.atomic.AtomicInteger;
2427
import java.util.concurrent.atomic.AtomicLong;
@@ -52,7 +55,7 @@ public final class FileAsyncRequestBodySplitHelper {
5255

5356
private volatile boolean isDone = false;
5457

55-
private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0);
58+
private Set<Long> requestBodyStartPositionsInFlight = Collections.synchronizedSet(new HashSet<>());
5659
private AtomicInteger chunkIndex = new AtomicInteger(0);
5760
private final FileTime modifiedTimeAtStart;
5861
private final long sizeAtStart;
@@ -106,9 +109,10 @@ private void sendAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublis
106109

107110
private void doSendAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
108111
while (shouldSendMore()) {
109-
AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(simplePublisher);
112+
long position = chunkSize * chunkIndex.getAndIncrement();
113+
AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(position, simplePublisher);
110114
simplePublisher.send(currentAsyncRequestBody);
111-
numAsyncRequestBodiesInFlight.incrementAndGet();
115+
requestBodyStartPositionsInFlight.add(position);
112116
checkCompletion(simplePublisher, currentAsyncRequestBody);
113117
}
114118
}
@@ -126,13 +130,12 @@ private void checkCompletion(SimplePublisher<AsyncRequestBody> simplePublisher,
126130
}
127131
}
128132

129-
private void startNextRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
130-
numAsyncRequestBodiesInFlight.decrementAndGet();
133+
private void startNextRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher, long completedPosition) {
134+
requestBodyStartPositionsInFlight.remove(completedPosition);
131135
sendAsyncRequestBody(simplePublisher);
132136
}
133137

134-
private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
135-
long position = chunkSize * chunkIndex.getAndIncrement();
138+
private AsyncRequestBody newFileAsyncRequestBody(long position, SimplePublisher<AsyncRequestBody> simplePublisher) {
136139
long numBytesToReadForThisChunk = Math.min(totalContentLength - position, chunkSize);
137140
FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder()
138141
.path(path)
@@ -142,7 +145,7 @@ private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher<AsyncRequestBod
142145
.modifiedTimeAtStart(modifiedTimeAtStart)
143146
.sizeAtStart(sizeAtStart)
144147
.build();
145-
return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher);
148+
return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher, position);
146149
}
147150

148151
/**
@@ -153,35 +156,39 @@ private boolean shouldSendMore() {
153156
return false;
154157
}
155158

156-
long currentUsedBuffer = (long) numAsyncRequestBodiesInFlight.get() * bufferPerAsyncRequestBody;
159+
long currentUsedBuffer = (long) requestBodyStartPositionsInFlight.size() * bufferPerAsyncRequestBody;
157160
return currentUsedBuffer + bufferPerAsyncRequestBody <= totalBufferSize;
158161
}
159162

160163
@SdkTestInternalApi
161-
AtomicInteger numAsyncRequestBodiesInFlight() {
162-
return numAsyncRequestBodiesInFlight;
164+
int numAsyncRequestBodiesInFlight() {
165+
return requestBodyStartPositionsInFlight.size();
163166
}
164167

165168
private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody {
166169

167170
private final FileAsyncRequestBody fileAsyncRequestBody;
168171
private final SimplePublisher<AsyncRequestBody> simplePublisher;
172+
private final long position;
169173

170174
FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody,
171-
SimplePublisher<AsyncRequestBody> simplePublisher) {
175+
SimplePublisher<AsyncRequestBody> simplePublisher, long position) {
172176
this.fileAsyncRequestBody = fileAsyncRequestBody;
173177
this.simplePublisher = simplePublisher;
178+
this.position = position;
174179
}
175180

176181
@Override
177182
public void subscribe(Subscriber<? super ByteBuffer> s) {
178-
fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher))
183+
fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher, position))
179184
// The reason we still need to call startNextRequestBody when the subscription is
180185
// cancelled is that upstream could cancel the subscription even though the stream has
181186
// finished successfully before onComplete. If this happens, doAfterOnComplete callback
182187
// will never be invoked, and if the current buffer is full, the publisher will stop
183188
// sending new FileAsyncRequestBody, leading to uncompleted future.
184-
.doAfterOnCancel(() -> startNextRequestBody(simplePublisher))
189+
.doAfterOnCancel(() -> {
190+
startNextRequestBody(simplePublisher, position);
191+
})
185192
.subscribe(s);
186193
}
187194

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,13 @@ public void split_differentChunkSize_shouldSplitCorrectly(int chunkSize) throws
8787

8888
private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper helper, AtomicInteger maxConcurrency) {
8989
return () -> {
90-
int concurrency = helper.numAsyncRequestBodiesInFlight().get();
90+
int concurrency = helper.numAsyncRequestBodiesInFlight();
9191

9292
if (concurrency > maxConcurrency.get()) {
9393
maxConcurrency.set(concurrency);
9494
}
95-
assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueLessThan(10);
95+
assertThat(concurrency).isLessThan(10);
96+
assertThat(concurrency).isGreaterThanOrEqualTo(0);
9697
};
9798
}
9899
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher<AsyncRequestBod
3333
Path file,
3434
int chunkSize) throws Exception {
3535

36+
long contentLength = file.toFile().length();
3637
List<CompletableFuture<ByteBuffer>> futures = new ArrayList<>();
3738
publisher.subscribe(requestBody -> {
3839
CompletableFuture<ByteBuffer> baosFuture = new CompletableFuture<>();
@@ -47,7 +48,6 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher<AsyncRequestBod
4748
futures.add(baosFuture);
4849
}).get(5, TimeUnit.SECONDS);
4950

50-
long contentLength = file.toFile().length();
5151
Assertions.assertThat(futures.size()).isEqualTo((int) Math.ceil(contentLength / (double) chunkSize));
5252

5353
for (int i = 0; i < futures.size(); i++) {

0 commit comments

Comments
 (0)