Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-b003027.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon S3",
"contributor": "",
"description": "Fix OutOfMemory issues when using S3CrtRequestBodyStreamAdapter on streams that produce data faster than they can be consumed."
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@
@SdkInternalApi
public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStream {
private static final long MINIMUM_BYTES_BUFFERED = 1024 * 1024L;
// for 16 kb chunks, this limits to about 16 MB (2x the standard crt provided buffer size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the 16KB come from? Is it related to #6542?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was removed based on the sizeHint approach, but I've updated the MINIMUM_BYTES_BUFFERED to what it was prior to #3800

private static final int MAXIMUM_OUTSTANDING_DEMAND = 1024;
private final SdkHttpContentPublisher bodyPublisher;
private final ByteBufferStoringSubscriber requestBodySubscriber;

private final AtomicBoolean subscribed = new AtomicBoolean(false);

public S3CrtRequestBodyStreamAdapter(SdkHttpContentPublisher bodyPublisher) {
this.bodyPublisher = bodyPublisher;
this.requestBodySubscriber = new ByteBufferStoringSubscriber(MINIMUM_BYTES_BUFFERED);
this.requestBodySubscriber = new ByteBufferStoringSubscriber(MINIMUM_BYTES_BUFFERED, MAXIMUM_OUTSTANDING_DEMAND);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;

class S3CrtRequestBodyStreamAdapterTest {

Expand Down Expand Up @@ -56,6 +58,36 @@ void getRequestData_fillsInputBuffer_publisherBuffersAreSmaller() {
assertThat(inputBuffer.remaining()).isEqualTo(0);
}

@Test
void getRequestData_fillsInputBuffer_limitsOutstandingDemand() {
int inputBufferSize = 2000;
int maximumOutstandingDemand = 1024;

RequestTrackingPublisher requestTrackingPublisher = new RequestTrackingPublisher();
SdkHttpContentPublisher requestBody = requestBody(requestTrackingPublisher, 42L);

S3CrtRequestBodyStreamAdapter adapter = new S3CrtRequestBodyStreamAdapter(requestBody);

ByteBuffer inputBuffer = ByteBuffer.allocate(inputBufferSize);
for (int i = 0; i < maximumOutstandingDemand; i++) {
// we are under the minimum buffer size, so each request here increases outstanding demand by 1
adapter.sendRequestBody(inputBuffer);
// release 1 byte of data, calling onNext (satisfies one request, but then requests 1 more)
requestTrackingPublisher.release(1);
}
// we should have 2x requests
assertThat(requestTrackingPublisher.requests()).isEqualTo(maximumOutstandingDemand * 2);
// but the total released bytes is only maximumOutstandingDemand
assertThat(inputBuffer.remaining()).isEqualTo(inputBufferSize - maximumOutstandingDemand + 1);

// now that we have reached maximum outstanding demand, new requests won't be sent
adapter.sendRequestBody(inputBuffer);
assertThat(requestTrackingPublisher.requests()).isEqualTo(maximumOutstandingDemand * 2);



}

private static SdkHttpContentPublisher requestBody(Publisher<ByteBuffer> delegate, long size) {
return new SdkHttpContentPublisher() {
@Override
Expand Down Expand Up @@ -114,4 +146,44 @@ public void getRequestData_publisherThrows_wrapsExceptionIfNotRuntimeException()
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(IOException.class);
}

private static class RequestTrackingPublisher implements Publisher<ByteBuffer> {
ByteBufferStoringSubscriber subscriber;
RequestTrackingSubscription subscription = new RequestTrackingSubscription();

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
assertThat(subscriber).isInstanceOf(ByteBufferStoringSubscriber.class);
this.subscriber = (ByteBufferStoringSubscriber) subscriber;
this.subscriber.onSubscribe(subscription);
}

// publish up to n requests
public void release(int n) {
for (int i = 0; i < n; i++) {
ByteBuffer buffer = ByteBuffer.allocate(1);
subscriber.onNext(buffer);
}
}

public long requests() {
return subscription.requests;
}
}

private static class RequestTrackingSubscription implements Subscription {

long requests = 0;

@Override
public void request(long n) {
requests += n;
}

@Override
public void cancel() {

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -56,6 +57,10 @@ public class ByteBufferStoringSubscriber implements Subscriber<ByteBuffer> {

private final Phaser phaser = new Phaser(1);

private final AtomicInteger outstandingDemand = new AtomicInteger(0);

private final Optional<Integer> maximumOutstandingDemand;

/**
* The active subscription. Set when {@link #onSubscribe(Subscription)} is invoked.
*/
Expand All @@ -67,6 +72,19 @@ public class ByteBufferStoringSubscriber implements Subscriber<ByteBuffer> {
public ByteBufferStoringSubscriber(long minimumBytesBuffered) {
this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive");
this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE);
this.maximumOutstandingDemand = Optional.empty();
}

/**
* Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval and which limits the
* maximum outstanding demand (requests) to its subscription.
*/
public ByteBufferStoringSubscriber(long minimumBytesBuffered, int maximumOutstandingDemand) {
this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive");
this.maximumOutstandingDemand = Optional.of(Validate.isPositive(maximumOutstandingDemand,
"maximumOutstandingDemand must be positive"));

this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -175,12 +193,14 @@ public void onSubscribe(Subscription s) {
storingSubscriber.onSubscribe(new DemandIgnoringSubscription(s));
subscription = s;
subscription.request(1);
outstandingDemand.incrementAndGet();
subscriptionLatch.countDown();
}

@Override
public void onNext(ByteBuffer byteBuffer) {
int remaining = byteBuffer.remaining();
outstandingDemand.decrementAndGet();
storingSubscriber.onNext(byteBuffer.duplicate());
addBufferedDataAmount(remaining);
phaser.arrive();
Expand All @@ -204,7 +224,13 @@ private void addBufferedDataAmount(long amountToAdd) {
}

private void maybeRequestMore(long currentDataBuffered) {
// if we have too many outstanding requests, no need to make more requests
if (maximumOutstandingDemand.isPresent() && outstandingDemand.get() >= maximumOutstandingDemand.get()) {
return;
}

if (currentDataBuffered < minimumBytesBuffered) {
Copy link
Contributor

@zoewangg zoewangg Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of enforcing maximumOutstandingDemand, can we use byteBuffer.remaining() as a hint and check outstandingDemand.get() * buffer hints + currentDataBuffered < minimumBytesBuffered?

Because this class is used else where like https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/async/InputStreamSubscriber.java#L44 and it'd address the issue there as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - thats a great idea - I've updated the approach to use a sizeHint

outstandingDemand.incrementAndGet();
subscription.request(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ public class ByteBufferStoringSubscriberTest {
public void constructorCalled_withNonPositiveSize_throwsException() {
assertThatCode(() -> new ByteBufferStoringSubscriber(1)).doesNotThrowAnyException();
assertThatCode(() -> new ByteBufferStoringSubscriber(Integer.MAX_VALUE)).doesNotThrowAnyException();
assertThatCode(() -> new ByteBufferStoringSubscriber(1, 1)).doesNotThrowAnyException();

assertThatThrownBy(() -> new ByteBufferStoringSubscriber(0)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(-1)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(Integer.MIN_VALUE)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(1, 0)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(1, -1)).isInstanceOf(IllegalArgumentException.class);
}

@Test
Expand All @@ -76,6 +79,20 @@ public void doesNotRequestMoreThanMaxBytes() {
verifyNoMoreInteractions(subscription);
}

@Test
public void doesNotRequestMoreThanMaxDemand() {
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(5, 2);

subscriber.onSubscribe(subscription); // request 1, demand = 1
subscriber.onNext(fullByteBufferOfSize(3)); // demand = 0
subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 1
subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 2
verify(subscription, times(3)).request(1);

subscriber.transferTo(emptyByteBufferOfSize(1)); // demand already maximum, no request
verifyNoMoreInteractions(subscription);
}

@Test
public void canStoreMoreThanMaxBytesButWontAskForMoreUntilBelowMax() {
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(3);
Expand Down
Loading