From ed1e4f1e5e58d9e35e144e9c2e1d47a53cf66f99 Mon Sep 17 00:00:00 2001 From: yibole Date: Mon, 18 Aug 2025 09:58:31 -0700 Subject: [PATCH 1/2] added part count and content range validation for download --- .../MultipartDownloaderSubscriber.java | 63 +++++++++++++++++++ .../multipart/MultipartDownloadTestUtil.java | 27 ++++++++ ...ipartDownloaderSubscriberWiremockTest.java | 26 ++++++++ 3 files changed, 116 insertions(+) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 1ccae234631d..edf50ff92ca2 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -23,6 +23,7 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -76,6 +77,16 @@ public class MultipartDownloaderSubscriber implements Subscriber totalParts) { + validatePartsCount(completedParts.get()); log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); subscription.cancel(); return; @@ -162,10 +174,20 @@ private void requestMoreIfNeeded(GetObjectResponse response) { totalParts = partCount; } + String actualContentRange = response.contentRange(); + if (actualContentRange != null && partSize == null) { + getRangeInfo(actualContentRange); + log.debug(() -> String.format("Part size of the object to download: " + partSize)); + log.debug(() -> String.format("Total Content Length of the object to download: " + totalContentLength)); + } + + validateContentRange(totalComplete, actualContentRange); + synchronized (lock) { if (totalParts != null && totalParts > 1 && totalComplete < totalParts) { subscription.request(1); } else { + validatePartsCount(completedParts.get()); log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); subscription.cancel(); } @@ -198,4 +220,45 @@ private GetObjectRequest nextRequest(int nextPartToGet) { } }); } + + private void validatePartsCount(int currentGetCount) { + if (totalParts != null && currentGetCount != totalParts) { + String errorMessage = "PartsCount validation failed. Expected " + totalParts + ", downloaded" + + " " + currentGetCount + " parts."; + log.error(() -> errorMessage); + subscription.cancel(); + SdkClientException exception = SdkClientException.create(errorMessage); + onError(exception); + } + } + + private void validateContentRange(int partNumber, String contentRange) { + if (contentRange == null) { + return; + } + + long expectedStart = (partNumber - 1) * partSize; + long expectedEnd = partNumber == totalParts ? totalContentLength - 1 : expectedStart + partSize - 1; + + String expectedContentRange = String.format("bytes %d-%d/%d", expectedStart, expectedEnd, totalContentLength); + + if (!expectedContentRange.equals(contentRange)) { + String errorMessage = String.format( + "Content-Range validation failed for part %d. Expected: %s, Actual: %s", + partNumber, expectedContentRange, contentRange); + log.error(() -> errorMessage); + onError(SdkClientException.create(errorMessage)); + } + } + + private void getRangeInfo(String contentRange) { + String rangeInfo = contentRange.substring(6); + String[] parts = rangeInfo.split("/"); + + this.totalContentLength = Long.parseLong(parts[1]); + String[] rangeParts = parts[0].split("-"); + long startByte = Long.parseLong(rangeParts[0]); + long endByte = Long.parseLong(rangeParts[1]); + this.partSize = endByte - startByte + 1; + } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java index 708972b6b0d7..3df11de206b5 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java @@ -70,6 +70,21 @@ public byte[] stubForPart(String testBucket, String testKey,int part, int totalP aResponse() .withHeader("x-amz-mp-parts-count", totalPart + "") .withHeader("ETag", eTag) + .withHeader("Content-Length", String.valueOf(body.length)) + .withHeader("Content-Range", contentRange(part, totalPart, partSize)) + .withBody(body))); + return body; + } + + public byte[] stubForPartwithWrongContentRange(String testBucket, String testKey,int part, int totalPart, int partSize) { + byte[] body = new byte[partSize]; + random.nextBytes(body); + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=%d", testBucket, testKey, part))).willReturn( + aResponse() + .withHeader("x-amz-mp-parts-count", totalPart + "") + .withHeader("ETag", eTag) + .withHeader("Content-Length", String.valueOf(body.length)) + .withHeader("Content-Range", contentRange(part, totalPart, partSize + 1)) .withBody(body))); return body; } @@ -95,4 +110,16 @@ public byte[] stubForPartSuccess(int part, int totalPart, int partSize) { .withBody(body))); return body; } + + private String contentRange(int part, int totalPart, int partSize) { + long totalObjectSize = (long) totalPart * partSize; + long startByte = (long) (part - 1) * partSize; + long endByte = startByte + partSize - 1; + + if (part == totalPart) { + endByte = totalObjectSize - 1; + } + + return String.format("bytes %d-%d/%d", startByte, endByte, totalObjectSize); + } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java index 1c6eb666a9c2..6dcf24550756 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java @@ -160,6 +160,32 @@ void errorOnThirdRequest_shouldCompleteExceptionallyOnlyPartsGreaterThanTwo( } } + @ParameterizedTest + @MethodSource("argumentsProvider") + void wrongContentRangeOnSecondRequest_should(AsyncResponseTransformerTestSupplier supplier, + int amountOfPartToTest, + int partSize) { + util.stubForPart(testBucket, testKey, 1, 3, partSize); + util.stubForPartwithWrongContentRange(testBucket, testKey, 2, 3, partSize); + util.stubForPart(testBucket, testKey, 3, 3, partSize); + //byte[] expectedBody = util.stubAllParts(testBucket, testKey, amountOfPartToTest, partSize); + AsyncResponseTransformer transformer = supplier.transformer(); + AsyncResponseTransformer.SplitResult split = transformer.split( + SplittingTransformerConfiguration.builder() + .bufferSizeInBytes(1024 * 32L) + .build()); + Subscriber> subscriber = new MultipartDownloaderSubscriber( + s3AsyncClient, + GetObjectRequest.builder() + .bucket(testBucket) + .key(testKey) + .build()); + + split.publisher().subscribe(subscriber); + T response = split.resultFuture().join(); + + } + private static Stream argumentsProvider() { // amount of part, individual part size List> partSizes = Arrays.asList( From 391a506ca2a3e5226cbfa5795c0065fcd3bd6033 Mon Sep 17 00:00:00 2001 From: yibole Date: Mon, 18 Aug 2025 13:39:15 -0700 Subject: [PATCH 2/2] changelog added --- .changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json | 6 ++++++ .../archtests/CodingConventionWithSuppressionTest.java | 2 ++ 2 files changed, 8 insertions(+) create mode 100644 .changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json b/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json new file mode 100644 index 000000000000..c6ce2c184d71 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Added partCount and ContentRange validation for s3 transfer manager download request " +} diff --git a/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java b/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java index d2edcaac742d..3795b30b5d3a 100644 --- a/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java +++ b/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java @@ -36,6 +36,7 @@ import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.metrics.publishers.emf.EmfMetricLoggingPublisher; import software.amazon.awssdk.metrics.publishers.emf.internal.MetricEmfConverter; +import software.amazon.awssdk.services.s3.internal.multipart.MultipartDownloaderSubscriber; import software.amazon.awssdk.utils.Logger; /** @@ -57,6 +58,7 @@ public class CodingConventionWithSuppressionTest { private static final Set ALLOWED_ERROR_LOG_SUPPRESSION = new HashSet<>( Arrays.asList( ArchUtils.classNameToPattern(EmfMetricLoggingPublisher.class), + ArchUtils.classNameToPattern(MultipartDownloaderSubscriber.class), ArchUtils.classWithInnerClassesToPattern(ResponseTransformer.class))); @Test