Skip to content

Commit e1ad65a

Browse files
committed
Improve workflow and add more functional tests
1 parent 0144760 commit e1ad65a

File tree

10 files changed

+207
-49
lines changed

10 files changed

+207
-49
lines changed

bom-internal/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@
235235
<version>${rxjava.version}</version>
236236
<scope>test</scope>
237237
</dependency>
238+
<dependency>
239+
<groupId>io.reactivex.rxjava3</groupId>
240+
<artifactId>rxjava</artifactId>
241+
<version>${rxjava3.version}</version>
242+
</dependency>
238243
<dependency>
239244
<artifactId>commons-lang3</artifactId>
240245
<groupId>org.apache.commons</groupId>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
<org.eclipse.jdt.version>3.10.0</org.eclipse.jdt.version>
125125
<org.eclipse.text.version>3.5.101</org.eclipse.text.version>
126126
<rxjava.version>2.2.21</rxjava.version>
127+
<rxjava3.version>3.1.5</rxjava3.version>
127128
<commons-codec.verion>1.17.1</commons-codec.verion>
128129
<jmh.version>1.37</jmh.version>
129130
<awscrt.version>0.38.1</awscrt.version>

services/s3/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,5 +230,11 @@
230230
<artifactId>jimfs</artifactId>
231231
<scope>test</scope>
232232
</dependency>
233+
<dependency>
234+
<groupId>io.reactivex.rxjava2</groupId>
235+
<artifactId>rxjava</artifactId>
236+
<version>${rxjava.version}</version>
237+
<scope>test</scope>
238+
</dependency>
233239
</dependencies>
234240
</project>

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515

1616
package software.amazon.awssdk.services.s3.internal.multipart;
1717

18+
import static software.amazon.awssdk.services.s3.internal.multipart.MultipartUploadHelper.contentLengthMismatchForPart;
19+
import static software.amazon.awssdk.services.s3.internal.multipart.MultipartUploadHelper.partNumMismatch;
1820
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
1921

2022
import java.util.Collection;
2123
import java.util.HashMap;
2224
import java.util.Map;
25+
import java.util.Optional;
2326
import java.util.concurrent.CompletableFuture;
2427
import java.util.concurrent.ConcurrentLinkedQueue;
2528
import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,7 +157,16 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
154157
return;
155158
}
156159

157-
validatePart(asyncRequestBody, currentPartNum);
160+
Optional<SdkClientException> sdkClientException = validatePart(asyncRequestBody, currentPartNum);
161+
if (sdkClientException.isPresent()) {
162+
multipartUploadHelper.failRequestsElegantly(futures,
163+
sdkClientException.get(),
164+
uploadId,
165+
returnFuture,
166+
putObjectRequest);
167+
subscription.cancel();
168+
return;
169+
}
158170

159171
asyncRequestBodyInFlight.incrementAndGet();
160172
UploadPartRequest uploadRequest = SdkPojoConversionUtils.toUploadPartRequest(putObjectRequest,
@@ -178,47 +190,37 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
178190
subscription.request(1);
179191
}
180192

181-
private void validatePart(AsyncRequestBody asyncRequestBody, int currentPartNum) {
193+
private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestBody, int currentPartNum) {
182194
if (!asyncRequestBody.contentLength().isPresent()) {
183-
SdkClientException e = SdkClientException.create("Content length must be present on the AsyncRequestBody");
184-
multipartUploadHelper.failRequestsElegantly(futures, e, uploadId, returnFuture, putObjectRequest);
185-
return;
195+
return Optional.of(MultipartUploadHelper.contentLengthMissingForPart(currentPartNum));
186196
}
187197

188198
Long currentPartSize = asyncRequestBody.contentLength().get();
199+
189200
if (currentPartNum > expectedNumParts) {
190-
SdkClientException exception = SdkClientException.create(String.format("The number of parts divided is "
191-
+ "not equal to the expected number of "
192-
+ "parts. Expected: %d, Actual: %d",
193-
expectedNumParts, currentPartNum));
194-
multipartUploadHelper.failRequestsElegantly(futures, exception, uploadId, returnFuture, putObjectRequest);
195-
return;
201+
return Optional.of(partNumMismatch(expectedNumParts, currentPartNum));
196202
}
197203

198204
if (currentPartNum == expectedNumParts) {
199-
validateLastPartSize(currentPartSize);
200-
return;
205+
return validateLastPartSize(currentPartSize);
201206
}
202207

203208
if (currentPartSize != partSize) {
204-
SdkClientException e = SdkClientException.create(String.format("Content length must be equal to the "
205-
+ "part size. Expected: %d, Actual: %d",
206-
partSize,
207-
currentPartSize));
208-
multipartUploadHelper.failRequestsElegantly(futures, e, uploadId, returnFuture, putObjectRequest);
209+
return Optional.of(contentLengthMismatchForPart(partSize, currentPartSize));
209210
}
211+
return Optional.empty();
210212
}
211213

212-
private void validateLastPartSize(Long currentPartSize) {
214+
private Optional<SdkClientException> validateLastPartSize(Long currentPartSize) {
213215
long remainder = totalSize % partSize;
214216
long expectedLastPartSize = remainder == 0 ? partSize : remainder;
215217
if (currentPartSize != expectedLastPartSize) {
216-
SdkClientException exception =
218+
return Optional.of(
217219
SdkClientException.create("Content length of the last part must be equal to the "
218220
+ "expected last part size. Expected: " + expectedLastPartSize
219-
+ ", Actual: " + currentPartSize);
220-
multipartUploadHelper.failRequestsElegantly(futures, exception, uploadId, returnFuture, putObjectRequest);
221+
+ ", Actual: " + currentPartSize));
221222
}
223+
return Optional.empty();
222224
}
223225

224226
private boolean shouldFailRequest() {
@@ -256,6 +258,14 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
256258
// List of CompletedParts needs to be in ascending order
257259
parts = mergeCompletedParts();
258260
}
261+
262+
int actualNumParts = partNumber.get() - 1;
263+
if (actualNumParts != expectedNumParts) {
264+
SdkClientException exception = partNumMismatch(expectedNumParts, actualNumParts);
265+
multipartUploadHelper.failRequestsElegantly(futures, exception, uploadId, returnFuture, putObjectRequest);
266+
return;
267+
}
268+
259269
completeMpuFuture = multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, putObjectRequest,
260270
totalSize);
261271
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,15 @@ public final class MultipartUploadHelper {
4848
private static final Logger log = Logger.loggerFor(MultipartUploadHelper.class);
4949

5050
private final S3AsyncClient s3AsyncClient;
51-
private final long partSizeInBytes;
5251
private final GenericMultipartHelper<PutObjectRequest, PutObjectResponse> genericMultipartHelper;
5352

5453
private final long maxMemoryUsageInBytes;
5554
private final long multipartUploadThresholdInBytes;
5655

5756
public MultipartUploadHelper(S3AsyncClient s3AsyncClient,
58-
long partSizeInBytes,
5957
long multipartUploadThresholdInBytes,
6058
long maxMemoryUsageInBytes) {
6159
this.s3AsyncClient = s3AsyncClient;
62-
this.partSizeInBytes = partSizeInBytes;
6360
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
6461
SdkPojoConversionUtils::toAbortMultipartUploadRequest,
6562
SdkPojoConversionUtils::toPutObjectResponse);
@@ -160,4 +157,22 @@ void uploadInOneChunk(PutObjectRequest putObjectRequest,
160157
CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectResponseCompletableFuture);
161158
CompletableFutureUtils.forwardResultTo(putObjectResponseCompletableFuture, returnFuture);
162159
}
160+
161+
static SdkClientException contentLengthMissingForPart(int currentPartNum) {
162+
return SdkClientException.create("Content length is missing on the AsyncRequestBody for part number " + currentPartNum);
163+
}
164+
165+
static SdkClientException contentLengthMismatchForPart(long expected, long actual) {
166+
return SdkClientException.create(String.format("Content length must not be greater than "
167+
+ "part size. Expected: %d, Actual: %d",
168+
expected,
169+
actual));
170+
}
171+
172+
static SdkClientException partNumMismatch(int expectedNumParts, int actualNumParts) {
173+
return SdkClientException.create(String.format("The number of parts divided is "
174+
+ "not equal to the expected number of "
175+
+ "parts. Expected: %d, Actual: %d",
176+
expectedNumParts, actualNumParts));
177+
}
163178
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public UploadWithKnownContentLengthHelper(S3AsyncClient s3AsyncClient,
6262
SdkPojoConversionUtils::toPutObjectResponse);
6363
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
6464
this.multipartUploadThresholdInBytes = multipartUploadThresholdInBytes;
65-
this.multipartUploadHelper = new MultipartUploadHelper(s3AsyncClient, partSizeInBytes, multipartUploadThresholdInBytes,
65+
this.multipartUploadHelper = new MultipartUploadHelper(s3AsyncClient, multipartUploadThresholdInBytes,
6666
maxMemoryUsageInBytes);
6767
}
6868

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package software.amazon.awssdk.services.s3.internal.multipart;
1717

1818

19+
import static software.amazon.awssdk.services.s3.internal.multipart.MultipartUploadHelper.contentLengthMismatchForPart;
20+
import static software.amazon.awssdk.services.s3.internal.multipart.MultipartUploadHelper.contentLengthMissingForPart;
1921
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
2022

2123
import java.util.Collection;
@@ -71,7 +73,7 @@ public UploadWithUnknownContentLengthHelper(S3AsyncClient s3AsyncClient,
7173
SdkPojoConversionUtils::toPutObjectResponse);
7274
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
7375
this.multipartUploadThresholdInBytes = multipartUploadThresholdInBytes;
74-
this.multipartUploadHelper = new MultipartUploadHelper(s3AsyncClient, partSizeInBytes, multipartUploadThresholdInBytes,
76+
this.multipartUploadHelper = new MultipartUploadHelper(s3AsyncClient, multipartUploadThresholdInBytes,
7577
maxMemoryUsageInBytes);
7678
}
7779

@@ -167,6 +169,14 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
167169
log.trace(() -> "Received asyncRequestBody " + asyncRequestBody.contentLength());
168170
asyncRequestBodyInFlight.incrementAndGet();
169171

172+
Optional<SdkClientException> sdkClientException = validatePart(asyncRequestBody, currentPartNum);
173+
if (sdkClientException.isPresent()) {
174+
multipartUploadHelper.failRequestsElegantly(futures, sdkClientException.get(), uploadId, returnFuture,
175+
putObjectRequest);
176+
subscription.cancel();
177+
return;
178+
}
179+
170180
if (isFirstAsyncRequestBody.compareAndSet(true, false)) {
171181
log.trace(() -> "Received first async request body");
172182
// If this is the first AsyncRequestBody received, request another one because we don't know if there is more
@@ -206,24 +216,24 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
206216
}
207217
}
208218

209-
private void sendUploadPartRequest(String uploadId,
210-
AsyncRequestBody asyncRequestBody,
211-
int currentPartNum) {
219+
private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestBody, int currentPartNum) {
212220
Optional<Long> contentLength = asyncRequestBody.contentLength();
213221
if (!contentLength.isPresent()) {
214-
SdkClientException e = SdkClientException.create("Content length must be present on the AsyncRequestBody");
215-
multipartUploadHelper.failRequestsElegantly(futures, e, uploadId, returnFuture, putObjectRequest);
222+
return Optional.of(contentLengthMissingForPart(currentPartNum));
216223
}
217224

218225
Long contentLengthCurrentPart = contentLength.get();
219226
if (contentLengthCurrentPart > partSizeInBytes) {
220-
SdkClientException e = SdkClientException.create(String.format("Content length must not be greater than the "
221-
+ "part size. Expected: %d, Actual: %d",
222-
partSizeInBytes,
223-
contentLengthCurrentPart));
224-
multipartUploadHelper.failRequestsElegantly(futures, e, uploadId, returnFuture, putObjectRequest);
227+
return Optional.of(contentLengthMismatchForPart(partSizeInBytes, contentLengthCurrentPart));
228+
225229
}
230+
return Optional.empty();
231+
}
226232

233+
private void sendUploadPartRequest(String uploadId,
234+
AsyncRequestBody asyncRequestBody,
235+
int currentPartNum) {
236+
Long contentLengthCurrentPart = asyncRequestBody.contentLength().get();
227237
this.contentLength.getAndAdd(contentLengthCurrentPart);
228238

229239
multipartUploadHelper
@@ -281,11 +291,11 @@ private void completeMultipartUploadIfFinish(int requestsInFlight) {
281291
.toArray(CompletedPart[]::new);
282292

283293
long totalLength = contentLength.get();
284-
int expectedNumPart = genericMultipartHelper.determinePartCount(totalLength, partSizeInBytes);
285-
if (parts.length != expectedNumPart) {
294+
int expectedNumParts = genericMultipartHelper.determinePartCount(totalLength, partSizeInBytes);
295+
if (parts.length != expectedNumParts) {
286296
SdkClientException exception = SdkClientException.create(
287297
String.format("The number of UploadParts requests is not equal to the expected number of parts. "
288-
+ "Expected: %d, Actual: %d", expectedNumPart, parts.length));
298+
+ "Expected: %d, Actual: %d", expectedNumParts, parts.length));
289299
multipartUploadHelper.failRequestsElegantly(futures, exception, uploadId, returnFuture, putObjectRequest);
290300
return;
291301
}

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,13 @@ public void beforeEach() {
101101
@Test
102102
void validatePart_withMissingContentLength_shouldFailRequest() {
103103
subscriber.onNext(createMockAsyncRequestBodyWithEmptyContentLength());
104-
verifyFailRequestsElegantly("Content length must be present on the AsyncRequestBody");
104+
verifyFailRequestsElegantly("Content length is missing on the AsyncRequestBody");
105105
}
106106

107107
@Test
108108
void validatePart_withPartSizeExceedingLimit_shouldFailRequest() {
109109
subscriber.onNext(createMockAsyncRequestBody(PART_SIZE + 1));
110-
verifyFailRequestsElegantly("Content length must be equal to the part size");
110+
verifyFailRequestsElegantly("Content length must not be greater than part size");
111111
}
112112

113113
@Test
@@ -145,7 +145,6 @@ void validateTotalPartNum_receivedMoreParts_shouldFail() {
145145
.thenReturn(CompletableFuture.completedFuture(null));
146146
lastPartSubscriber.onNext(createMockAsyncRequestBody(expectedLastPartSize));
147147
lastPartSubscriber.onNext(createMockAsyncRequestBody(expectedLastPartSize));
148-
lastPartSubscriber.onComplete();
149148

150149
verifyFailRequestsElegantly("The number of parts divided is not equal to the expected number of parts");
151150
}
@@ -266,6 +265,7 @@ private void verifyFailRequestsElegantly(String expectedErrorMessage) {
266265
Throwable exception = exceptionCaptor.getValue();
267266
assertThat(exception).isInstanceOf(SdkClientException.class);
268267
assertThat(exception.getMessage()).contains(expectedErrorMessage);
268+
verify(subscription).cancel();
269269
}
270270

271271
private Map<Integer, CompletedPart> createExistingParts(int numExistingParts) {

0 commit comments

Comments
 (0)