Skip to content

Commit e5708e1

Browse files
committed
Add BufferingAsyncRequestBody that buffers the entire content and supports multiple concurrent subscribers
1 parent eba1843 commit e5708e1

File tree

3 files changed

+756
-0
lines changed

3 files changed

+756
-0
lines changed
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.Set;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.concurrent.atomic.AtomicLong;
28+
import org.reactivestreams.Subscriber;
29+
import org.reactivestreams.Subscription;
30+
import software.amazon.awssdk.annotations.SdkInternalApi;
31+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
32+
import software.amazon.awssdk.annotations.ThreadSafe;
33+
import software.amazon.awssdk.core.async.AsyncRequestBody;
34+
import software.amazon.awssdk.core.exception.NonRetryableException;
35+
import software.amazon.awssdk.core.internal.util.NoopSubscription;
36+
import software.amazon.awssdk.utils.Logger;
37+
import software.amazon.awssdk.utils.SdkAutoCloseable;
38+
import software.amazon.awssdk.utils.Validate;
39+
40+
/**
41+
* An implementation of {@link AsyncRequestBody} that buffers the entire content and supports multiple concurrent subscribers.
42+
*
43+
* <p>This class allows data to be sent incrementally via the {@link #send(ByteBuffer)} method, buffered internally,
44+
* and then replayed to multiple subscribers independently. Each subscriber receives a complete copy of all buffered data
45+
* when they subscribe and request it.
46+
*
47+
* <p>Usage Pattern:
48+
* {@snippet :
49+
* BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(contentLength);
50+
*
51+
* // Send data incrementally
52+
* body.send(ByteBuffer.wrap("Hello ".getBytes()));
53+
* body.send(ByteBuffer.wrap("World".getBytes()));
54+
*
55+
* // Mark data as complete and ready for subscription
56+
* body.complete();
57+
*
58+
* // Multiple subscribers can now consume the buffered data
59+
* body.subscribe(subscriber1);
60+
* body.subscribe(subscriber2);
61+
* }
62+
*
63+
* <h3>Thread Safety:</h3>
64+
* This class is thread-safe and supports concurrent operations:
65+
* <ul>
66+
* <li>Multiple threads can call {@link #send(ByteBuffer)} concurrently</li>
67+
* <li>Multiple subscribers can be added concurrently</li>
68+
* <li>Each subscriber operates independently with its own state</li>
69+
* </ul>
70+
*
71+
* <h3>Subscription Behavior:</h3>
72+
* <ul>
73+
* <li>Subscribers can only subscribe after {@link #complete()} has been called</li>
74+
* <li>Each subscriber receives a read-only view of the buffered data</li>
75+
* <li>Subscribers receive data independently based on their own demand signaling</li>
76+
* <li>If the body is closed, new subscribers will receive an error immediately</li>
77+
* </ul>
78+
*
79+
* <h3>Resource Management:</h3>
80+
* The body should be closed when no longer needed to free buffered data and notify active subscribers.
81+
* Closing the body will:
82+
* <ul>
83+
* <li>Clear all buffered data</li>
84+
* <li>Send error notifications to all active subscribers</li>
85+
* <li>Prevent new subscriptions</li>
86+
* </ul>
87+
*
88+
*/
89+
@ThreadSafe
90+
@SdkInternalApi
91+
public final class BufferingAsyncRequestBody implements AsyncRequestBody, SdkAutoCloseable {
92+
private static final Logger log = Logger.loggerFor(BufferingAsyncRequestBody.class);
93+
94+
private final Long length;
95+
private final List<ByteBuffer> bufferedData = new ArrayList<>();
96+
private boolean dataReady;
97+
private boolean closed;
98+
private final Set<ReplayableByteBufferSubscription> subscriptions;
99+
private final Object lock = new Object();
100+
101+
/**
102+
* Creates a new BufferingAsyncRequestBody with the specified content length.
103+
*
104+
* @param length the total content length in bytes, or null if unknown
105+
*/
106+
BufferingAsyncRequestBody(Long length) {
107+
this.length = length;
108+
this.subscriptions = ConcurrentHashMap.newKeySet();
109+
}
110+
111+
/**
112+
* Sends a chunk of data to be buffered for later consumption by subscribers.
113+
*
114+
* @param data the data to buffer, must not be null
115+
* @throws NullPointerException if data is null
116+
*/
117+
public void send(ByteBuffer data) {
118+
Validate.paramNotNull(data, "data");
119+
synchronized (lock) {
120+
if (closed) {
121+
throw new IllegalStateException("Cannot send data to closed body");
122+
}
123+
if (dataReady) {
124+
throw new IllegalStateException("Request body has already been completed");
125+
}
126+
bufferedData.add(data);
127+
}
128+
}
129+
130+
/**
131+
* Marks the request body as complete and ready for subscription.
132+
*
133+
* <p>This method must be called before any subscribers can successfully subscribe
134+
* to this request body. After calling this method, no more data should be sent
135+
* via {@link #send(ByteBuffer)}.
136+
*
137+
* <p>Once complete, multiple subscribers can subscribe and will each receive
138+
* the complete buffered content independently.
139+
*/
140+
public void complete() {
141+
synchronized (lock) {
142+
if (dataReady) {
143+
return;
144+
}
145+
if (closed) {
146+
throw new IllegalStateException("The AsyncRequestBody has been closed");
147+
}
148+
dataReady = true;
149+
}
150+
}
151+
152+
@Override
153+
public Optional<Long> contentLength() {
154+
return Optional.ofNullable(length);
155+
}
156+
157+
@Override
158+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
159+
Validate.paramNotNull(subscriber, "subscriber");
160+
161+
synchronized (lock) {
162+
if (!dataReady) {
163+
subscriber.onSubscribe(new NoopSubscription(subscriber));
164+
subscriber.onError(NonRetryableException.create(
165+
"Unexpected error occurred. Data is not ready to be subscribed"));
166+
return;
167+
}
168+
169+
if (closed) {
170+
subscriber.onSubscribe(new NoopSubscription(subscriber));
171+
subscriber.onError(NonRetryableException.create(
172+
"AsyncRequestBody has been closed"));
173+
return;
174+
}
175+
}
176+
177+
ReplayableByteBufferSubscription replayableByteBufferSubscription =
178+
new ReplayableByteBufferSubscription(subscriber, bufferedData);
179+
subscriber.onSubscribe(replayableByteBufferSubscription);
180+
subscriptions.add(replayableByteBufferSubscription);
181+
}
182+
183+
@Override
184+
public String body() {
185+
return BodyType.BYTES.getName();
186+
}
187+
188+
/**
189+
* <p>Closes this request body and releases all resources. This will:
190+
* <ul>
191+
* <li>Clear all buffered data to free memory</li>
192+
* <li>Notify all active subscribers with an error</li>
193+
* <li>Prevent new subscriptions from succeeding</li>
194+
* </ul>
195+
*
196+
* <p>This method is idempotent - calling it multiple times has no additional effect.
197+
* It is safe to call this method concurrently from multiple threads.
198+
*/
199+
@Override
200+
public void close() {
201+
synchronized (lock) {
202+
if (closed) {
203+
return;
204+
}
205+
206+
closed = true;
207+
bufferedData.clear();
208+
subscriptions.forEach(s -> s.notifyError(new IllegalStateException("The publisher has been closed")));
209+
subscriptions.clear();
210+
}
211+
212+
}
213+
214+
@SdkTestInternalApi
215+
List<ByteBuffer> bufferedData() {
216+
return Collections.unmodifiableList(bufferedData);
217+
}
218+
219+
private class ReplayableByteBufferSubscription implements Subscription {
220+
private final AtomicInteger index = new AtomicInteger(0);
221+
private final AtomicBoolean done = new AtomicBoolean(false);
222+
private final List<ByteBuffer> buffers;
223+
private final AtomicBoolean processingRequest = new AtomicBoolean(false);
224+
private Subscriber<? super ByteBuffer> currentSubscriber;
225+
private final AtomicLong outstandingDemand = new AtomicLong();
226+
227+
private ReplayableByteBufferSubscription(Subscriber<? super ByteBuffer> subscriber, List<ByteBuffer> buffers) {
228+
this.buffers = buffers;
229+
this.currentSubscriber = subscriber;
230+
}
231+
232+
@Override
233+
public void request(long n) {
234+
if (n <= 0) {
235+
currentSubscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
236+
currentSubscriber = null;
237+
return;
238+
}
239+
240+
if (done.get()) {
241+
return;
242+
}
243+
244+
outstandingDemand.updateAndGet(current -> {
245+
if (Long.MAX_VALUE - current < n) {
246+
return Long.MAX_VALUE;
247+
}
248+
249+
return current + n;
250+
});
251+
processRequest();
252+
}
253+
254+
private void processRequest() {
255+
do {
256+
if (!processingRequest.compareAndSet(false, true)) {
257+
// Some other thread is processing the queue, so we don't need to.
258+
return;
259+
}
260+
261+
try {
262+
doProcessRequest();
263+
} catch (Throwable e) {
264+
notifyError(new IllegalStateException("Encountered fatal error in publisher", e));
265+
subscriptions.remove(this);
266+
break;
267+
} finally {
268+
processingRequest.set(false);
269+
}
270+
271+
} while (shouldProcessRequest());
272+
}
273+
274+
private boolean shouldProcessRequest() {
275+
return !done.get() && outstandingDemand.get() > 0 && index.get() < buffers.size();
276+
}
277+
278+
private void doProcessRequest() {
279+
while (true) {
280+
if (!shouldProcessRequest()) {
281+
return;
282+
}
283+
284+
int currentIndex = this.index.getAndIncrement();
285+
286+
if (currentIndex >= buffers.size()) {
287+
// This should never happen, but defensive programming
288+
notifyError(new IllegalStateException("Index out of bounds"));
289+
subscriptions.remove(this);
290+
return;
291+
}
292+
293+
ByteBuffer buffer = buffers.get(currentIndex);
294+
currentSubscriber.onNext(buffer.asReadOnlyBuffer());
295+
outstandingDemand.decrementAndGet();
296+
297+
if (currentIndex == buffers.size() - 1) {
298+
done.set(true);
299+
currentSubscriber.onComplete();
300+
subscriptions.remove(this);
301+
break;
302+
}
303+
}
304+
}
305+
306+
@Override
307+
public void cancel() {
308+
done.set(true);
309+
subscriptions.remove(this);
310+
}
311+
312+
public void notifyError(Exception exception) {
313+
if (currentSubscriber != null) {
314+
done.set(true);
315+
currentSubscriber.onError(exception);
316+
currentSubscriber = null;
317+
}
318+
}
319+
}
320+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
19+
import java.nio.ByteBuffer;
20+
import org.apache.commons.lang3.RandomStringUtils;
21+
import org.reactivestreams.Publisher;
22+
import org.reactivestreams.tck.TestEnvironment;
23+
import software.amazon.awssdk.core.SdkBytes;
24+
25+
public class BufferingAsyncRequestBodyTckTest extends org.reactivestreams.tck.PublisherVerification<ByteBuffer> {
26+
public BufferingAsyncRequestBodyTckTest() {
27+
super(new TestEnvironment(true));
28+
}
29+
30+
@Override
31+
public Publisher<ByteBuffer> createPublisher(long elements) {
32+
BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024 * elements);
33+
for (int i = 0; i < elements; i++) {
34+
bufferingAsyncRequestBody.send(SdkBytes.fromUtf8String(RandomStringUtils.randomAscii(1024)).asByteBuffer());
35+
}
36+
37+
bufferingAsyncRequestBody.complete();
38+
return bufferingAsyncRequestBody;
39+
}
40+
41+
@Override
42+
public Publisher<ByteBuffer> createFailedPublisher() {
43+
BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024L);
44+
bufferingAsyncRequestBody.close();
45+
return null;
46+
}
47+
48+
public long maxElementsFromPublisher() {
49+
return 100;
50+
}
51+
52+
}

0 commit comments

Comments
 (0)