-
Notifications
You must be signed in to change notification settings - Fork 926
Fix bug in MultipartS3AsyncClient GetObject Retryable Errors #6309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
.../amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
...ava/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a bug in the MultipartS3AsyncClient where retryable errors during GetObject operations were failing immediately or causing the initial error to be propagated even after successful retries. The fix improves error handling logic in the SplittingTransformer to properly handle retries for the first part of multipart downloads.
- Fixed error propagation logic to only retry errors for the first part of multipart downloads
- Added proper future forwarding to ensure upstream errors are handled correctly
- Enhanced test coverage with comprehensive WireMock tests for retry scenarios
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
SplittingTransformer.java | Core fix - updated error handling logic to properly distinguish first part from subsequent parts and handle retries correctly |
MultipartDownloaderSubscriber.java | Added future tracking and cleanup for proper error handling |
DownloadObjectHelper.java | Added future forwarding to ensure exceptions are properly propagated |
S3MultipartClientGetObjectWiremockTest.java | Comprehensive new test suite covering retry scenarios and error handling |
MultipartDownloaderSubscriberWiremockTest.java | Removed obsolete test that was failing on first request errors |
MultipartS3AsyncClient.java | Updated javadoc to reflect GET support |
pom.xml | Added test dependency for retries module |
IndividualPartSubscriberTckTest.java | Updated constructor call to match new signature |
bugfix changelog | Documentation of the fix |
...are/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java
Outdated
Show resolved
Hide resolved
Please add more context and details in the PR description. |
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Show resolved
Hide resolved
|
...ava/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
@@ -314,14 +317,26 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) { | |||
); | |||
} | |||
} | |||
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response)); | |||
|
|||
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we forward result here again and do it for every part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we just need to do it for the first part. When this is removed, an error on a subsequent part won't get propagated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically, in the new test I added, if we don't call this at all, instead of S3Exception being thrown, we get:
SdkClientException: Unable to execute HTTP request: onError() was already invoked. (SDK Attempt Count: 2)
Line 317 in 14cccda
assertThatThrownBy(() -> multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we should probably figure out why onError was invoked multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the flow of events:
- Error response is returned
IndividualTransformer.exceptionOccurred()
is invoked, which callspublisherToUpstream.error(error)
- In
SimplePublisher.doProcessQueue()
, in theON_ERROR
casefailureMessage
is set IndividualPartSubscriber.onNext()
is invoked, because there is still outstanding demand. Here,publisherToUpstream.send(byteBuffer)
is called- In
SimplePublisher.doProcessQueue()
, the entry isonNext
, but because thefailureMessage
is set,entry.resultFuture.completeExceptionally(failureMessage.get())
is invoked
Not sure how we can prevent 4) from happening, is it necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could always prevent sending bytes to the publisherToUpstream
after IndividualTransformer
received an error, would that be a solution?
/** | ||
* Tracks the part number. Errors will only be retried for the first part. | ||
*/ | ||
private final AtomicInteger partNumber = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT (non-blocking): I think we should avoid referring to 'parts' in the SplittingTranformer
, that's a s3 concept, when this class is more abstract, in a core package. Maybe rename to something like 'onNextSignalsSent' or onNextNumber
which would track the total amount of 'onNext' signals sent to the downstreamSubscriber
@@ -259,28 +262,27 @@ private void handleFutureCancel(Throwable e) { | |||
* body publisher. | |||
*/ | |||
private class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> { | |||
private final int partNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: same here, avoid referring to s3 related terms
@@ -314,14 +317,26 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) { | |||
); | |||
} | |||
} | |||
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response)); | |||
|
|||
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could always prevent sending bytes to the publisherToUpstream
after IndividualTransformer
received an error, would that be a solution?
Motivation and Context
This PR fixes a bug in the Java-based multipart S3 client (
MultipartS3AsyncClient
) that causes incorrect retry behavior and duplicate request logging during GetObject operations.Currently, when a retryable error, e.g.,
503 Slowdown
, is returned by S3, the SDK may do one of two things:200
response returned by the sever will be ignored. Instead, the SDK will log the initial error, and continue to retry until retry attempts are exhausted. This may happen instead of scenario 1 (failing right away), if many concurrent requests are in progress and a race condition occurs.Modifications
SplittingTransformer
IndividualTransformer
publisherToUpstream.error()
inIndividualTransformer.exceptionOccurred()
for part numbers greater than 1individualFuture.completeExceptionally()
inIndividualTransformer.prepare()
when theresultFuture
completesDownloadObjectHelper
MultipartDownloaderSubscriber
onError()
is invokedTesting
Added mock tests
Integ tests passed
Screenshots (if appropriate)
Types of changes