Skip to content

Commit ad97cfb

Browse files
authored
Added AsyncRequestBody.forBlockingOutputStream (#3565)
Added AsyncRequestBody.forBlockingOutputStream, allowing streaming operation requests to be written to like an output stream. Example usage: ```java BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(5L); CompletableFuture<?> responseFuture = client.streamingInputOperation(StreamingInputOperationRequest.builder().build(), body); try (OutputStream stream = body.outputStream()) { // TODO: Test writing too much data and too little data stream.write("Hello".getBytes(StandardCharsets.UTF_8)); } responseFuture.join(); ``` Other changes made to support this change: 1. Added `OutputStreamPublisher` to utils. This is a `Publisher<ByteBuffer>` that can be written to like it's an output stream. Writes will block when there is no available downstream demand. 1. Added `CancellableOutputStream` to utils. This is an output stream that can be cancelled. This is useful for cancelling the upload via the output stream. The following changes appear in this PR as well as #3563: 1. Updated `SimplePublisher` to return more useful and predictable error messages when invoked after `onComplete`, `onError` or `cancel` has been called. 1. Added `CompletableFutureUtils.joinInterruptibly`, which behaves equivalent to executing `join()`, but allows the calling thread to still respond to interrupts. 1. Added `CompletableFutureUtils.joinInterruptiblyIgnoringFailures`, which behaves equivalent to executing `join()`, but allows the calling thread to still respond to interrupts, and ignores exceptions that are thrown.
1 parent c90d4a7 commit ad97cfb

File tree

13 files changed

+960
-71
lines changed

13 files changed

+960
-71
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Added AsyncRequestBody.forBlockingOutputStream, allowing streaming operation requests to be written to like an output stream."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.core.async;
1717

1818
import java.io.File;
19+
import java.io.OutputStream;
1920
import java.nio.ByteBuffer;
2021
import java.nio.charset.Charset;
2122
import java.nio.charset.StandardCharsets;
@@ -160,6 +161,40 @@ static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
160161
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
161162
}
162163

164+
/**
165+
* Creates a {@link BlockingOutputStreamAsyncRequestBody} to use for writing to the downstream service as if it's an output
166+
* stream. Retries are not supported for this request body.
167+
*
168+
* <p>The caller is responsible for calling {@link OutputStream#close()} on the {@link #outputStream()} when writing is
169+
* complete.
170+
*
171+
* <p><b>Example Usage</b>
172+
* <p>
173+
* {@snippet :
174+
* S3AsyncClient s3 = S3AsyncClient.create(); // Use one client for your whole application!
175+
*
176+
* byte[] dataToSend = "Hello".getBytes(StandardCharsets.UTF_8);
177+
* long lengthOfDataToSend = dataToSend.length();
178+
*
179+
* // Start the operation
180+
* BlockingInputStreamAsyncRequestBody body =
181+
* AsyncRequestBody.forBlockingOutputStream(lengthOfDataToSend);
182+
* CompletableFuture<PutObjectResponse> responseFuture =
183+
* s3.putObject(r -> r.bucket("bucketName").key("key"), body);
184+
*
185+
* // Write the input stream to the running operation
186+
* try (CancellableOutputStream outputStream = body.outputStream()) {
187+
* outputStream.write(dataToSend);
188+
* }
189+
*
190+
* // Wait for the service to respond.
191+
* PutObjectResponse response = responseFuture.join();
192+
* }
193+
*/
194+
static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long contentLength) {
195+
return new BlockingOutputStreamAsyncRequestBody(contentLength);
196+
}
197+
163198
/**
164199
* Creates a {@link AsyncRequestBody} with no content.
165200
*
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.io.OutputStream;
19+
import java.nio.ByteBuffer;
20+
import java.time.Duration;
21+
import java.util.Optional;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import org.reactivestreams.Subscriber;
26+
import software.amazon.awssdk.annotations.SdkPublicApi;
27+
import software.amazon.awssdk.core.exception.NonRetryableException;
28+
import software.amazon.awssdk.core.internal.util.NoopSubscription;
29+
import software.amazon.awssdk.utils.CancellableOutputStream;
30+
import software.amazon.awssdk.utils.async.OutputStreamPublisher;
31+
32+
/**
33+
* An implementation of {@link AsyncRequestBody} that allows performing a blocking write of an output stream to a downstream
34+
* service.
35+
*
36+
* <p>The caller is responsible for calling {@link OutputStream#close()} on the {@link #outputStream()} when writing is
37+
* complete.
38+
*
39+
* @see AsyncRequestBody#forBlockingOutputStream(Long)
40+
*/
41+
@SdkPublicApi
42+
public final class BlockingOutputStreamAsyncRequestBody implements AsyncRequestBody {
43+
private final OutputStreamPublisher delegate = new OutputStreamPublisher();
44+
private final CountDownLatch subscribedLatch = new CountDownLatch(1);
45+
private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
46+
private final Long contentLength;
47+
private final Duration subscribeTimeout;
48+
49+
BlockingOutputStreamAsyncRequestBody(Long contentLength) {
50+
this(contentLength, Duration.ofSeconds(10));
51+
}
52+
53+
BlockingOutputStreamAsyncRequestBody(Long contentLength, Duration subscribeTimeout) {
54+
this.contentLength = contentLength;
55+
this.subscribeTimeout = subscribeTimeout;
56+
}
57+
58+
/**
59+
* Return an output stream to which blocking writes can be made to the downstream service.
60+
*
61+
* <p>This method will block the calling thread until the SDK is connected to the service. This means that this request body
62+
* should usually be passed to the SDK before this method is called.
63+
*
64+
* <p>You can invoke {@link CancellableOutputStream#cancel()} to cancel any blocked write calls to the downstream service
65+
* (and mark the stream as failed).
66+
*/
67+
public CancellableOutputStream outputStream() {
68+
waitForSubscriptionIfNeeded();
69+
return delegate;
70+
}
71+
72+
@Override
73+
public Optional<Long> contentLength() {
74+
return Optional.ofNullable(contentLength);
75+
}
76+
77+
@Override
78+
public void subscribe(Subscriber<? super ByteBuffer> s) {
79+
if (subscribeCalled.compareAndSet(false, true)) {
80+
delegate.subscribe(s);
81+
subscribedLatch.countDown();
82+
} else {
83+
s.onSubscribe(new NoopSubscription(s));
84+
s.onError(NonRetryableException.create("A retry was attempted, but AsyncRequestBody.forBlockingOutputStream does not "
85+
+ "support retries."));
86+
}
87+
}
88+
89+
private void waitForSubscriptionIfNeeded() {
90+
try {
91+
long timeoutSeconds = subscribeTimeout.getSeconds();
92+
if (!subscribedLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {
93+
throw new IllegalStateException("The service request was not made within " + timeoutSeconds + " seconds of "
94+
+ "outputStream being invoked. Make sure to invoke the service request "
95+
+ "BEFORE invoking outputStream if your caller is single-threaded.");
96+
}
97+
} catch (InterruptedException e) {
98+
Thread.currentThread().interrupt();
99+
throw new RuntimeException("Interrupted while waiting for subscription.", e);
100+
}
101+
}
102+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
import static software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult.END_OF_STREAM;
22+
23+
import java.io.IOException;
24+
import java.io.OutputStream;
25+
import java.nio.ByteBuffer;
26+
import java.time.Duration;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.ScheduledExecutorService;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.Timeout;
31+
import software.amazon.awssdk.utils.CancellableOutputStream;
32+
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
33+
import software.amazon.awssdk.utils.async.StoringSubscriber;
34+
35+
class BlockingOutputStreamAsyncRequestBodyTest {
36+
@Test
37+
public void outputStream_waitsForSubscription() throws IOException {
38+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
39+
try {
40+
BlockingOutputStreamAsyncRequestBody requestBody =
41+
AsyncRequestBody.forBlockingOutputStream(0L);
42+
executor.schedule(() -> requestBody.subscribe(new StoringSubscriber<>(1)), 100, MILLISECONDS);
43+
try (OutputStream stream = requestBody.outputStream()) {
44+
stream.write('h');
45+
}
46+
} finally {
47+
executor.shutdownNow();
48+
}
49+
}
50+
51+
@Test
52+
@Timeout(10)
53+
public void outputStream_failsIfSubscriptionNeverComes() {
54+
BlockingOutputStreamAsyncRequestBody requestBody =
55+
new BlockingOutputStreamAsyncRequestBody(0L, Duration.ofSeconds(1));
56+
assertThatThrownBy(requestBody::outputStream).hasMessageContaining("The service request was not made");
57+
}
58+
59+
@Test
60+
public void outputStream_writesToSubscriber() throws IOException {
61+
BlockingOutputStreamAsyncRequestBody requestBody =
62+
AsyncRequestBody.forBlockingOutputStream(0L);
63+
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(4);
64+
requestBody.subscribe(subscriber);
65+
66+
CancellableOutputStream outputStream = requestBody.outputStream();
67+
outputStream.write(0);
68+
outputStream.write(1);
69+
outputStream.close();
70+
71+
ByteBuffer out = ByteBuffer.allocate(4);
72+
assertThat(subscriber.transferTo(out)).isEqualTo(END_OF_STREAM);
73+
out.flip();
74+
75+
assertThat(out.remaining()).isEqualTo(2);
76+
assertThat(out.get()).isEqualTo((byte) 0);
77+
assertThat(out.get()).isEqualTo((byte) 1);
78+
}
79+
80+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.io.IOException;
21+
import java.io.OutputStream;
22+
import java.util.concurrent.CompletableFuture;
23+
import org.junit.AfterClass;
24+
import org.junit.BeforeClass;
25+
import org.junit.Test;
26+
import software.amazon.awssdk.core.ResponseInputStream;
27+
import software.amazon.awssdk.core.async.AsyncRequestBody;
28+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
29+
import software.amazon.awssdk.core.async.BlockingOutputStreamAsyncRequestBody;
30+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
31+
import software.amazon.awssdk.testutils.service.S3BucketUtils;
32+
33+
/**
34+
* Test InputStream and OutputStream operations on AsyncRequestBody and AsyncResponseTransformer against S3.
35+
*/
36+
public class AsyncInOutStreamIntegrationTest extends S3IntegrationTestBase {
37+
38+
private static final String BUCKET = S3BucketUtils.temporaryBucketName("async-in-out-stream");
39+
40+
/**
41+
* Creates all the test resources for the tests.
42+
*/
43+
@BeforeClass
44+
public static void createResources() {
45+
createBucket(BUCKET);
46+
}
47+
48+
/**
49+
* Releases all resources created in this test.
50+
*/
51+
@AfterClass
52+
public static void tearDown() {
53+
deleteBucketAndAllContents(BUCKET);
54+
}
55+
56+
@Test
57+
public void largeFilePutGet() throws IOException {
58+
long length = 4 * 1024 * 1024; // 4 MB
59+
BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(length);
60+
CompletableFuture<?> response = s3Async.putObject(r -> r.bucket(BUCKET).key("foo"), body);
61+
62+
try (OutputStream os = body.outputStream()) {
63+
for (int i = 0; i < length; i++) {
64+
os.write(i % 255);
65+
}
66+
}
67+
68+
response.join();
69+
70+
try (ResponseInputStream<GetObjectResponse> is =
71+
s3Async.getObject(r -> r.bucket(BUCKET).key("foo"),
72+
AsyncResponseTransformer.toBlockingInputStream())
73+
.join()) {
74+
75+
assertThat(is.response().contentLength()).isEqualTo(length);
76+
77+
for (int i = 0; i < length; i++) {
78+
assertThat(is.read()).isEqualTo(i % 255);
79+
}
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)