Skip to content

Add Range-based Multipart download Subscriber for Pre-signed URLs #6331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: feature/master/pre-signed-url-getobject
Choose a base branch
from

Conversation

jencymaryjoseph
Copy link
Contributor

Motivation and Context

This PR introduces support for multipart downloads using presigned URLs. Unlike the existing MultipartDownloaderSubscriber which relies on partNumber, this implementation uses HTTP range headers to achieve multipart functionality with pre-signed URLs.

Modifications

PresignedUrlMultipartDownloaderSubscriber: New reactive subscriber that handles size discovery through initial range requests, manages multipart state with thread-safe operations, and validates response consistency using ETag comparison.

Testing

PresignedUrlMultipartDownloaderSubscriberTckTest: Reactive Streams specification compliance test using SubscriberWhiteboxVerification with mocked S3AsyncClient for isolated testing.
PresignedUrlMultipartDownloaderSubscriberWiremockTest: WireMock tests covering multipart downloads with various part sizes, single-part downloads, and error scenarios.
PresignedUrlMultipartDownloadTestUtil: WireMock stubbing utilities for range request simulation and verification.

Screenshots (if appropriate)

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)

Checklist

  • I have read the CONTRIBUTING document
  • Local run of mvn install succeeds
  • My code follows the code style of this project
  • My change requires a change to the Javadoc documentation
  • I have updated the Javadoc documentation accordingly
  • I have added tests to cover my changes
  • All new and existing tests passed
  • I have added a changelog entry. Adding a new entry must be accomplished by running the scripts/new-change script and following the instructions. Commit the new file created by the script in .changes/next-release with your changes.
  • My change is to implement 1.11 parity feature and I have updated LaunchChangelog

License

  • I confirm that this pull request can be released under the Apache 2 license

@jencymaryjoseph jencymaryjoseph requested a review from a team as a code owner August 11, 2025 22:33

@Override
public void onSubscribe(Subscription s) {
synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need synchronized here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right, this was defensive programming, we don't need synchronization here since Reactive Streams spec guarantees onSubscribe is called at most once per Subscriber.

private long parseContentRangeForTotalSize(String contentRange) {
Matcher matcher = CONTENT_RANGE_PATTERN.matcher(contentRange);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid Content-Range header: " + contentRange);
Copy link
Contributor

@zoewangg zoewangg Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Did we handle the exception elsewhere? because this may cause the request to get stuck, we probably wanted to make sure we handle it elegantly, cancelling subscription, failing the future exceptionally etc

}
}

private long parseContentRangeForTotalSize(String contentRange) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this class to a util or something and add some unit tests?

future.complete(null);
}

public CompletableFuture<Void> future() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future() method is used for testing to verify subscriber completion/error states

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, can we add SdkTestInternalApi annotation?

import software.amazon.awssdk.services.s3.utils.AsyncResponseTransformerTestSupplier;

@WireMockTest
class PresignedUrlMultipartDownloaderSubscriberWiremockTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of testing PresignedUrlMultipartDownloaderSubscriber, can we add some tests using S3AsyncClient directly? Example: https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java

this.totalContentLength = parseContentRangeForTotalSize(response.contentRange());
this.totalParts = calculateTotalParts(totalContentLength, configuredPartSizeInBytes);
log.debug(() -> String.format("Total content length: %d, Total parts: %d", totalContentLength, totalParts));
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use Optional<SdkClientException> validation pattern like KnownContentLengthAsyncRequestBodySubscriber instead of try-catch for control flow.

return;
}
}
if (totalParts != null && totalParts > 1 && totalComplete < totalParts) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggesting creating a method to make it more clear, maybe hasMoreParts()?

}
}

private void validateResponse(GetObjectResponse response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add all validations specified in the transfer manager SEP?

Copy link

if (eTag == null) {
this.eTag = response.eTag();
log.debug(() -> String.format("Multipart object ETag: %s", this.eTag));
} else if (response.eTag() != null && !eTag.equals(response.eTag())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test case for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed ETag validation, now we capture the ETag from the first part and use it for If-Match header validation on subsequent parts which aligns with the SEP requirements.

return (int) Math.ceil((double) contentLength / partSize);
}

private PresignedUrlDownloadRequest createPartRequest(int partIndex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: suggesting createRangedGetRequest since this it not part request

.endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort()))
.serviceConfiguration(S3Configuration.builder()
.pathStyleAccessEnabled(true)
.checksumValidationEnabled(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we disabling checksum validation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Range requests don't have checksums to validate, so this setting has no effect but false would be more explicit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit odd though since it's enabled by default and we are testing with it off explicitly. Can we enable checksum mode by default (I think we should if we can)

int totalComplete = completedParts.get();
log.debug(() -> String.format("Completed part %d", totalComplete));

synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to lock the entire block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reduce the sync lock scope and make totalContentLength, totalParts, and eTag volatile.

@zoewangg zoewangg requested a review from Copilot August 13, 2025 00:37
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new implementation for multipart downloads using presigned URLs with HTTP range headers instead of S3's native partNumber-based multipart API. The implementation uses reactive streams to handle concurrent part downloads while maintaining thread safety and proper response validation.

  • Adds PresignedUrlMultipartDownloaderSubscriber class for range-based multipart downloads with presigned URLs
  • Implements size discovery through initial range requests and ETag validation for response consistency
  • Provides comprehensive test coverage including TCK compliance tests and WireMock integration tests

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
PresignedUrlMultipartDownloaderSubscriber.java Core subscriber implementation for presigned URL multipart downloads using HTTP range requests
PresignedUrlMultipartDownloaderSubscriberTckTest.java Reactive Streams TCK compliance test with mocked S3AsyncClient
PresignedUrlMultipartDownloaderSubscriberWiremockTest.java Integration tests using WireMock for various download scenarios
PresignedUrlMultipartDownloadTestUtil.java Test utility class for WireMock stubbing and verification
Comments suppressed due to low confidence (2)

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloadTestUtil.java:138

  • The XML tag '' appears to be incomplete or truncated. It should likely be '' to match the pattern used in line 125.
                .withBody("<e><Code>400</Code><Message>test error message</Message></e>")));

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.

@jencymaryjoseph jencymaryjoseph force-pushed the feature/multipartSubscriber branch from e9d282d to c12ea47 Compare August 14, 2025 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants