Skip to content

Commit 400f19f

Browse files
authored
Added AsyncResponseBody.fromInputStream and AsyncRequestBody.forBlockingInputStream. (#3563)
Added AsyncResponseBody.fromInputStream and AsyncRequestBody.forBlockingInputStream, allowing streaming operation requests to be written to like an input stream. Example usage with retry support, without blocking calling thread: ```java String data = "Hello, world!"; ExecutorService executor = Executors.newCachedThreadPool(); InputStream data = new StringInputStream("Hello, world!"); s3.putObject(r -> r.bucket("foo").key("bar"), AsyncRequestBody.fromInputStream(data, data.length(), executor)) .join(); ``` Example usage without retry support, with blocking calling thread: ```java String data = "Hello, world!"; BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(data.length()); CompletableFuture<?> responseFuture = s3.putObject(r -> r.bucket("foo").key("bar"), body); body.writeInputStream(new StringInputStream(data)); responseFuture.join(); ``` Other changes made to support this change: 1. Added `InputStreamConsumingPublisher`, a `Publisher<ByteBuffer>` that can read from an `InputStream`. These supporting changes were also made, and are also being reviewed in #3565: 1. Updated `SimplePublisher` to return more useful and predictable error messages when invoked after `onComplete`, `onError` or `cancel` has been called. 1. Added `CompletableFutureUtils.joinInterruptibly`, which behaves equivalent to executing `join()`, but allows the calling thread to still respond to interrupts. 1. Added `CompletableFutureUtils.joinInterruptiblyIgnoringFailures`, which behaves equivalent to executing `join()`, but allows the calling thread to still respond to interrupts, and ignores exceptions that are thrown.
1 parent 99b9d3f commit 400f19f

File tree

14 files changed

+1041
-2
lines changed

14 files changed

+1041
-2
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Added AsyncResponseBody.fromInputStream and AsyncRequestBody.forBlockingInputStream, allowing streaming operation requests to be written to like an input stream."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,20 @@
1616
package software.amazon.awssdk.core.async;
1717

1818
import java.io.File;
19+
import java.io.InputStream;
1920
import java.io.OutputStream;
2021
import java.nio.ByteBuffer;
2122
import java.nio.charset.Charset;
2223
import java.nio.charset.StandardCharsets;
2324
import java.nio.file.Path;
2425
import java.util.Optional;
26+
import java.util.concurrent.ExecutorService;
2527
import org.reactivestreams.Publisher;
2628
import org.reactivestreams.Subscriber;
2729
import software.amazon.awssdk.annotations.SdkPublicApi;
2830
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
2931
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
32+
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
3033
import software.amazon.awssdk.core.internal.util.Mimetype;
3134
import software.amazon.awssdk.utils.BinaryUtils;
3235

@@ -161,6 +164,44 @@ static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
161164
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
162165
}
163166

167+
/**
168+
* Creates a {@link AsyncRequestBody} from a {@link InputStream}.
169+
*
170+
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
171+
* non-blocking event loop threads owned by the SDK.
172+
*/
173+
static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) {
174+
return new InputStreamWithExecutorAsyncRequestBody(inputStream, contentLength, executor);
175+
}
176+
177+
/**
178+
* Creates a {@link BlockingInputStreamAsyncRequestBody} to use for writing an input stream to the downstream service.
179+
*
180+
* <p><b>Example Usage</b>
181+
*
182+
* <pre>
183+
* S3Client s3 = ...;
184+
*
185+
* InputStream streamToWrite = ...;
186+
* long streamToWriteLength = ...;
187+
*
188+
* // Start the operation
189+
* BlockingInputStreamAsyncRequestBody body =
190+
* AsyncRequestBody.forBlockingInputStream(streamToWriteLength);
191+
* CompletableFuture<PutObjectResponse> responseFuture =
192+
* s3.putObject(r -> r.bucket("bucketName").key("key"), body);
193+
*
194+
* // Write the input stream to the running operation
195+
* body.writeInputStream(streamToWrite);
196+
*
197+
* // Wait for the service to respond.
198+
* PutObjectResponse response = responseFuture.join();
199+
* </pre>
200+
*/
201+
static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLength) {
202+
return new BlockingInputStreamAsyncRequestBody(contentLength);
203+
}
204+
164205
/**
165206
* Creates a {@link BlockingOutputStreamAsyncRequestBody} to use for writing to the downstream service as if it's an output
166207
* stream. Retries are not supported for this request body.
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.io.InputStream;
19+
import java.nio.ByteBuffer;
20+
import java.time.Duration;
21+
import java.util.Optional;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import org.reactivestreams.Subscriber;
26+
import software.amazon.awssdk.annotations.SdkPublicApi;
27+
import software.amazon.awssdk.core.exception.NonRetryableException;
28+
import software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream;
29+
import software.amazon.awssdk.core.internal.util.NoopSubscription;
30+
import software.amazon.awssdk.utils.async.InputStreamConsumingPublisher;
31+
32+
/**
33+
* An implementation of {@link AsyncRequestBody} that allows performing a blocking write of an input stream to a downstream
34+
* service.
35+
*
36+
* <p>See {@link AsyncRequestBody#forBlockingInputStream(Long)}.
37+
*/
38+
@SdkPublicApi
39+
public final class BlockingInputStreamAsyncRequestBody implements AsyncRequestBody {
40+
private final InputStreamConsumingPublisher delegate = new InputStreamConsumingPublisher();
41+
private final CountDownLatch subscribedLatch = new CountDownLatch(1);
42+
private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
43+
private final Long contentLength;
44+
private final Duration subscribeTimeout;
45+
46+
BlockingInputStreamAsyncRequestBody(Long contentLength) {
47+
this(contentLength, Duration.ofSeconds(10));
48+
}
49+
50+
BlockingInputStreamAsyncRequestBody(Long contentLength, Duration subscribeTimeout) {
51+
this.contentLength = contentLength;
52+
this.subscribeTimeout = subscribeTimeout;
53+
}
54+
55+
@Override
56+
public Optional<Long> contentLength() {
57+
return Optional.ofNullable(contentLength);
58+
}
59+
60+
/**
61+
* Block the calling thread and write the provided input stream to the downstream service.
62+
*
63+
* <p>This method will block the calling thread immediately. This means that this request body should usually be passed to
64+
* the SDK before this method is called.
65+
*
66+
* <p>This method will return the amount of data written when the entire input stream has been written. This will throw an
67+
* exception if writing the input stream has failed.
68+
*
69+
* <p>You can invoke {@link #cancel()} to cancel any blocked write calls to the downstream service (and mark the stream as
70+
* failed).
71+
*/
72+
public long writeInputStream(InputStream inputStream) {
73+
try {
74+
waitForSubscriptionIfNeeded();
75+
if (contentLength != null) {
76+
return delegate.doBlockingWrite(new SdkLengthAwareInputStream(inputStream, contentLength));
77+
}
78+
79+
return delegate.doBlockingWrite(inputStream);
80+
} catch (InterruptedException e) {
81+
Thread.currentThread().interrupt();
82+
delegate.cancel();
83+
throw new RuntimeException(e);
84+
}
85+
}
86+
87+
/**
88+
* Cancel any running write (and mark the stream as failed).
89+
*/
90+
public void cancel() {
91+
delegate.cancel();
92+
}
93+
94+
@Override
95+
public void subscribe(Subscriber<? super ByteBuffer> s) {
96+
if (subscribeCalled.compareAndSet(false, true)) {
97+
delegate.subscribe(s);
98+
subscribedLatch.countDown();
99+
} else {
100+
s.onSubscribe(new NoopSubscription(s));
101+
s.onError(NonRetryableException.create("A retry was attempted, but AsyncRequestBody.forBlockingInputStream does not "
102+
+ "support retries. Consider using AsyncRequestBody.fromInputStream with an "
103+
+ "input stream that supports mark/reset to get retry support."));
104+
}
105+
}
106+
107+
private void waitForSubscriptionIfNeeded() throws InterruptedException {
108+
long timeoutSeconds = subscribeTimeout.getSeconds();
109+
if (!subscribedLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {
110+
throw new IllegalStateException("The service request was not made within " + timeoutSeconds + " seconds of "
111+
+ "doBlockingWrite being invoked. Make sure to invoke the service request "
112+
+ "BEFORE invoking doBlockingWrite if your caller is single-threaded.");
113+
}
114+
}
115+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/exception/NonRetryableException.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ public static Builder builder() {
4545
return new BuilderImpl();
4646
}
4747

48+
public static NonRetryableException create(String message) {
49+
return builder().message(message).build();
50+
}
51+
52+
public static NonRetryableException create(String message, Throwable cause) {
53+
return builder().message(message).cause(cause).build();
54+
}
55+
4856
public interface Builder extends SdkClientException.Builder {
4957
@Override
5058
Builder message(String message);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.nio.ByteBuffer;
21+
import java.util.Optional;
22+
import java.util.concurrent.CancellationException;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Future;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.TimeoutException;
28+
import org.reactivestreams.Subscriber;
29+
import software.amazon.awssdk.annotations.SdkInternalApi;
30+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
31+
import software.amazon.awssdk.core.async.AsyncRequestBody;
32+
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
33+
import software.amazon.awssdk.core.exception.NonRetryableException;
34+
import software.amazon.awssdk.core.internal.util.NoopSubscription;
35+
import software.amazon.awssdk.utils.IoUtils;
36+
import software.amazon.awssdk.utils.Logger;
37+
38+
/**
39+
* A {@link AsyncRequestBody} that allows reading data off of an {@link InputStream} using a background
40+
* {@link ExecutorService}.
41+
* <p>
42+
* Created via {@link AsyncRequestBody#fromInputStream(InputStream, Long, ExecutorService)}.
43+
*/
44+
@SdkInternalApi
45+
public class InputStreamWithExecutorAsyncRequestBody implements AsyncRequestBody {
46+
private static final Logger log = Logger.loggerFor(InputStreamWithExecutorAsyncRequestBody.class);
47+
48+
private final Object subscribeLock = new Object();
49+
private final InputStream inputStream;
50+
private final Long contentLength;
51+
private final ExecutorService executor;
52+
53+
private Future<?> writeFuture;
54+
55+
public InputStreamWithExecutorAsyncRequestBody(InputStream inputStream,
56+
Long contentLength,
57+
ExecutorService executor) {
58+
this.inputStream = inputStream;
59+
this.contentLength = contentLength;
60+
this.executor = executor;
61+
IoUtils.markStreamWithMaxReadLimit(inputStream);
62+
}
63+
64+
@Override
65+
public Optional<Long> contentLength() {
66+
return Optional.ofNullable(contentLength);
67+
}
68+
69+
@Override
70+
public void subscribe(Subscriber<? super ByteBuffer> s) {
71+
// Each subscribe cancels the previous subscribe.
72+
synchronized (subscribeLock) {
73+
try {
74+
if (writeFuture != null) {
75+
writeFuture.cancel(true);
76+
waitForCancellation(writeFuture); // Wait for the cancellation
77+
tryReset(inputStream);
78+
}
79+
80+
BlockingInputStreamAsyncRequestBody delegate = AsyncRequestBody.forBlockingInputStream(contentLength);
81+
writeFuture = executor.submit(() -> doBlockingWrite(delegate));
82+
delegate.subscribe(s);
83+
} catch (Throwable t) {
84+
s.onSubscribe(new NoopSubscription(s));
85+
s.onError(t);
86+
}
87+
}
88+
}
89+
90+
private void tryReset(InputStream inputStream) {
91+
try {
92+
inputStream.reset();
93+
} catch (IOException e) {
94+
String message = "Request cannot be retried, because the request stream could not be reset.";
95+
throw NonRetryableException.create(message, e);
96+
}
97+
}
98+
99+
@SdkTestInternalApi
100+
public Future<?> activeWriteFuture() {
101+
synchronized (subscribeLock) {
102+
return writeFuture;
103+
}
104+
}
105+
106+
private void doBlockingWrite(BlockingInputStreamAsyncRequestBody asyncRequestBody) {
107+
try {
108+
asyncRequestBody.writeInputStream(inputStream);
109+
} catch (Throwable t) {
110+
log.debug(() -> "Encountered error while writing input stream to service.", t);
111+
throw t;
112+
}
113+
}
114+
115+
private void waitForCancellation(Future<?> writeFuture) {
116+
try {
117+
writeFuture.get(10, TimeUnit.SECONDS);
118+
} catch (ExecutionException | CancellationException e) {
119+
// Expected - we cancelled.
120+
} catch (InterruptedException e) {
121+
Thread.currentThread().interrupt();
122+
throw new RuntimeException(e);
123+
} catch (TimeoutException e) {
124+
throw new IllegalStateException("Timed out waiting to reset the input stream.", e);
125+
}
126+
}
127+
}

0 commit comments

Comments
 (0)