Skip to content

Commit 3c8d84b

Browse files
committed
Use a sizehint instead of relying on a new parameter
1 parent 326e615 commit 3c8d84b

File tree

4 files changed

+30
-52
lines changed

4 files changed

+30
-52
lines changed

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,15 @@
2727
*/
2828
@SdkInternalApi
2929
public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStream {
30-
private static final long MINIMUM_BYTES_BUFFERED = 1024 * 1024L;
31-
// for 16 kb chunks, this limits to about 16 MB (2x the standard crt provided buffer size)
32-
private static final int MAXIMUM_OUTSTANDING_DEMAND = 1024;
30+
private static final long MINIMUM_BYTES_BUFFERED = 16 * 1024 * 1024L;
3331
private final SdkHttpContentPublisher bodyPublisher;
3432
private final ByteBufferStoringSubscriber requestBodySubscriber;
3533

3634
private final AtomicBoolean subscribed = new AtomicBoolean(false);
3735

3836
public S3CrtRequestBodyStreamAdapter(SdkHttpContentPublisher bodyPublisher) {
3937
this.bodyPublisher = bodyPublisher;
40-
this.requestBodySubscriber = new ByteBufferStoringSubscriber(MINIMUM_BYTES_BUFFERED, MAXIMUM_OUTSTANDING_DEMAND);
38+
this.requestBodySubscriber = new ByteBufferStoringSubscriber(MINIMUM_BYTES_BUFFERED);
4139
}
4240

4341
@Override

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,32 +60,25 @@ void getRequestData_fillsInputBuffer_publisherBuffersAreSmaller() {
6060

6161
@Test
6262
void getRequestData_fillsInputBuffer_limitsOutstandingDemand() {
63-
int inputBufferSize = 2000;
64-
int maximumOutstandingDemand = 1024;
63+
int minBytesBuffered = 16 * 1024 * 1024;
64+
int inputBufferSize = 1024;
6565

6666
RequestTrackingPublisher requestTrackingPublisher = new RequestTrackingPublisher();
67-
SdkHttpContentPublisher requestBody = requestBody(requestTrackingPublisher, 42L);
67+
SdkHttpContentPublisher requestBody = requestBody(requestTrackingPublisher, minBytesBuffered);
6868

6969
S3CrtRequestBodyStreamAdapter adapter = new S3CrtRequestBodyStreamAdapter(requestBody);
7070

7171
ByteBuffer inputBuffer = ByteBuffer.allocate(inputBufferSize);
72-
for (int i = 0; i < maximumOutstandingDemand; i++) {
73-
// we are under the minimum buffer size, so each request here increases outstanding demand by 1
74-
adapter.sendRequestBody(inputBuffer);
75-
// release 1 byte of data, calling onNext (satisfies one request, but then requests 1 more)
76-
requestTrackingPublisher.release(1);
77-
}
78-
// we should have 2x requests
79-
assertThat(requestTrackingPublisher.requests()).isEqualTo(maximumOutstandingDemand * 2);
80-
// but the total released bytes is only maximumOutstandingDemand
81-
assertThat(inputBuffer.remaining()).isEqualTo(inputBufferSize - maximumOutstandingDemand + 1);
82-
83-
// now that we have reached maximum outstanding demand, new requests won't be sent
84-
adapter.sendRequestBody(inputBuffer);
85-
assertThat(requestTrackingPublisher.requests()).isEqualTo(maximumOutstandingDemand * 2);
86-
72+
adapter.sendRequestBody(inputBuffer); // initiate the subscription, but no bytes available, makes 1 request
8773

74+
// release 1 request of minBytesBuffered bytes of data, calling onNext (satisfies one request, but then requests 1 more)
75+
requestTrackingPublisher.release(1, minBytesBuffered-100);
76+
assertThat(requestTrackingPublisher.requests()).isEqualTo(2);
8877

78+
// call sendRequestBody, outstandingDemand=1, sizeHint=16*1024*1024-100 + existing data buffered is > our min
79+
// so no more requests will be made
80+
adapter.sendRequestBody(inputBuffer);
81+
assertThat(requestTrackingPublisher.requests()).isEqualTo(2);
8982
}
9083

9184
private static SdkHttpContentPublisher requestBody(Publisher<ByteBuffer> delegate, long size) {
@@ -159,9 +152,9 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
159152
}
160153

161154
// publish up to n requests
162-
public void release(int n) {
155+
public void release(int n, int size) {
163156
for (int i = 0; i < n; i++) {
164-
ByteBuffer buffer = ByteBuffer.allocate(1);
157+
ByteBuffer buffer = ByteBuffer.allocate(size);
165158
subscriber.onNext(buffer);
166159
}
167160
}

utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,21 @@ public class ByteBufferStoringSubscriber implements Subscriber<ByteBuffer> {
5959

6060
private final AtomicInteger outstandingDemand = new AtomicInteger(0);
6161

62-
private final Optional<Integer> maximumOutstandingDemand;
62+
private volatile long byteBufferSizeHint = 0L;
6363

6464
/**
6565
* The active subscription. Set when {@link #onSubscribe(Subscription)} is invoked.
6666
*/
6767
private Subscription subscription;
6868

6969
/**
70-
* Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval.
70+
* Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval. The subscriber will
71+
* only request more from the subscription when fewer bytes are buffered AND in flight requests from the subscription will
72+
* likely be under minimumBytesBuffered.
7173
*/
7274
public ByteBufferStoringSubscriber(long minimumBytesBuffered) {
7375
this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive");
7476
this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE);
75-
this.maximumOutstandingDemand = Optional.empty();
76-
}
77-
78-
/**
79-
* Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval and which limits the
80-
* maximum outstanding demand (requests) to its subscription.
81-
*/
82-
public ByteBufferStoringSubscriber(long minimumBytesBuffered, int maximumOutstandingDemand) {
83-
this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive");
84-
this.maximumOutstandingDemand = Optional.of(Validate.isPositive(maximumOutstandingDemand,
85-
"maximumOutstandingDemand must be positive"));
86-
87-
this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE);
8877
}
8978

9079
/**
@@ -201,6 +190,10 @@ public void onSubscribe(Subscription s) {
201190
public void onNext(ByteBuffer byteBuffer) {
202191
int remaining = byteBuffer.remaining();
203192
outstandingDemand.decrementAndGet();
193+
// atomic update not required here, in a race it does not matter which thread sets this value since it is not being
194+
// incremented, just set.
195+
byteBufferSizeHint = byteBuffer.remaining();
196+
204197
storingSubscriber.onNext(byteBuffer.duplicate());
205198
addBufferedDataAmount(remaining);
206199
phaser.arrive();
@@ -224,12 +217,8 @@ private void addBufferedDataAmount(long amountToAdd) {
224217
}
225218

226219
private void maybeRequestMore(long currentDataBuffered) {
227-
// if we have too many outstanding requests, no need to make more requests
228-
if (maximumOutstandingDemand.isPresent() && outstandingDemand.get() >= maximumOutstandingDemand.get()) {
229-
return;
230-
}
231-
232-
if (currentDataBuffered < minimumBytesBuffered) {
220+
long dataBufferedAndInFlight = currentDataBuffered + (byteBufferSizeHint * outstandingDemand.get());
221+
if (dataBufferedAndInFlight < minimumBytesBuffered) {
233222
outstandingDemand.incrementAndGet();
234223
subscription.request(1);
235224
}

utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,10 @@ public class ByteBufferStoringSubscriberTest {
5353
public void constructorCalled_withNonPositiveSize_throwsException() {
5454
assertThatCode(() -> new ByteBufferStoringSubscriber(1)).doesNotThrowAnyException();
5555
assertThatCode(() -> new ByteBufferStoringSubscriber(Integer.MAX_VALUE)).doesNotThrowAnyException();
56-
assertThatCode(() -> new ByteBufferStoringSubscriber(1, 1)).doesNotThrowAnyException();
5756

5857
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(0)).isInstanceOf(IllegalArgumentException.class);
5958
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(-1)).isInstanceOf(IllegalArgumentException.class);
6059
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(Integer.MIN_VALUE)).isInstanceOf(IllegalArgumentException.class);
61-
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(1, 0)).isInstanceOf(IllegalArgumentException.class);
62-
assertThatThrownBy(() -> new ByteBufferStoringSubscriber(1, -1)).isInstanceOf(IllegalArgumentException.class);
6360
}
6461

6562
@Test
@@ -80,16 +77,17 @@ public void doesNotRequestMoreThanMaxBytes() {
8077
}
8178

8279
@Test
83-
public void doesNotRequestMoreThanMaxDemand() {
84-
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(5, 2);
80+
public void doesNotRequestMoreWhenInflightMoreThanMinBytes() {
81+
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(5);
8582

8683
subscriber.onSubscribe(subscription); // request 1, demand = 1
87-
subscriber.onNext(fullByteBufferOfSize(3)); // demand = 0
84+
subscriber.onNext(fullByteBufferOfSize(3)); // demand = 0, sizeHint=3
8885
subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 1
8986
subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 2
9087
verify(subscription, times(3)).request(1);
9188

92-
subscriber.transferTo(emptyByteBufferOfSize(1)); // demand already maximum, no request
89+
//sizeHint=3, demand=2, dataBufferedAndInFlight=6. 6 > 5, so no new request
90+
subscriber.transferTo(emptyByteBufferOfSize(1));
9391
verifyNoMoreInteractions(subscription);
9492
}
9593

0 commit comments

Comments
 (0)