-
Notifications
You must be signed in to change notification settings - Fork 926
Stream retry support part 1: Add BufferingAsyncRequestBody that buffers the entire content and sup… #6313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/master/mpu-stream-retry
Are you sure you want to change the base?
Stream retry support part 1: Add BufferingAsyncRequestBody that buffers the entire content and sup… #6313
Conversation
…ports multiple concurrent subscribers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces BufferingAsyncRequestBody
, a new implementation of AsyncRequestBody
that buffers content in memory and supports multiple concurrent subscribers for stream retry functionality in the AWS SDK.
- Adds a thread-safe, multicast publisher that can buffer data incrementally and replay it to multiple subscribers
- Implements comprehensive test coverage including concurrency tests and reactive streams TCK compliance
- Provides resource management through proper cleanup of buffered data and subscriber notifications
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
File | Description |
---|---|
BufferingAsyncRequestBody.java | Core implementation of the buffering async request body with multicast publisher capabilities |
BufferingAsyncRequestBodyTest.java | Comprehensive unit tests covering functionality, edge cases, and thread safety |
BufferingAsyncRequestBodyTckTest.java | Reactive Streams TCK compliance test suite |
Comments suppressed due to low confidence (1)
core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java:208
- [nitpick] The error message should be consistent with the one used in the subscribe method. Consider using 'AsyncRequestBody has been closed' to match the error message on line 172.
subscriptions.forEach(s -> s.notifyError(new IllegalStateException("The publisher has been closed")));
...c/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
Outdated
Show resolved
Hide resolved
...core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <[email protected]>
|
...core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public Optional<Long> contentLength() { | ||
return Optional.ofNullable(length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of weird to me that contentLength
could return an empty optional, but we require all the data to be buffered first before we start publishing. Should we just require all of the data to be ready at construction time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point! It'll also reduce the complexity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this new class and updated the existing ByteBuffersAsyncRequestBody to make it compliant to the reactive stream spec and implement closable interface; it already supports multicast today
…ports multiple concurrent subscribers
Motivation and Context
#6198
This is the first PR to support retry in stream based transfer in s3 multipart client.
This PR addsBufferingAsyncRequestBody
that is essentially a multicast publisher and can be subscribed multiple times until it's been closedThis PR made the following changes to existing
ByteBuffersAsyncRequestBody
: