Skip to content

Commit c24fe82

Browse files
committed
Add timeout to ResponseInputStream and ResponsePublisher
1 parent 4dab0c3 commit c24fe82

File tree

10 files changed

+668
-20
lines changed

10 files changed

+668
-20
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Adds timeouts to ResponsePublisher and ResponseInputStream to close connection if response not consumed"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,32 @@
1515

1616
package software.amazon.awssdk.core;
1717

18+
import java.io.IOException;
1819
import java.io.InputStream;
20+
import java.time.Duration;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.ScheduledFuture;
24+
import java.util.concurrent.TimeUnit;
1925
import software.amazon.awssdk.annotations.SdkPublicApi;
26+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2027
import software.amazon.awssdk.core.io.SdkFilterInputStream;
2128
import software.amazon.awssdk.http.Abortable;
2229
import software.amazon.awssdk.http.AbortableInputStream;
2330
import software.amazon.awssdk.utils.IoUtils;
31+
import software.amazon.awssdk.utils.Logger;
2432
import software.amazon.awssdk.utils.Validate;
2533

2634
/**
2735
* Input stream that provides access to the unmarshalled POJO response returned by the service in addition to the streamed
2836
* contents. This input stream should be closed after all data has been read from the stream.
37+
*
38+
* <p>
39+
* <b>NOTE:</b> You must read this stream promptly to avoid automatic cancellation. The default timeout for reading is 60
40+
* seconds. If {@link #read()} is not invoked before the timeout, the stream will automatically abort to prevent resource leakage.
41+
* <p>
42+
* The timeout can be customized by passing a {@link Duration} to the constructor, or disabled entirely by
43+
* passing {@link Duration#ZERO}.
2944
* <p>
3045
* Note about the Apache http client: This input stream can be used to leverage a feature of the Apache http client where
3146
* connections are released back to the connection pool to be reused. As such, calling {@link ResponseInputStream#close() close}
@@ -43,19 +58,31 @@
4358
@SdkPublicApi
4459
public final class ResponseInputStream<ResponseT> extends SdkFilterInputStream implements Abortable {
4560

61+
private static final Logger log = Logger.loggerFor(ResponseInputStream.class);
62+
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
4663
private final ResponseT response;
4764
private final Abortable abortable;
65+
private ScheduledFuture<?> timeoutTask;
66+
private volatile boolean hasRead = false;
4867

4968
public ResponseInputStream(ResponseT resp, AbortableInputStream in) {
69+
this(resp, in, null);
70+
}
71+
72+
public ResponseInputStream(ResponseT resp, AbortableInputStream in, Duration timeout) {
5073
super(in);
5174
this.response = Validate.paramNotNull(resp, "response");
5275
this.abortable = Validate.paramNotNull(in, "abortableInputStream");
76+
77+
Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
78+
scheduleTimeoutTask(resolvedTimeout);
5379
}
5480

5581
public ResponseInputStream(ResponseT resp, InputStream in) {
5682
super(in);
5783
this.response = Validate.paramNotNull(resp, "response");
5884
this.abortable = in instanceof Abortable ? (Abortable) in : null;
85+
scheduleTimeoutTask(DEFAULT_TIMEOUT);
5986
}
6087

6188
/**
@@ -65,15 +92,77 @@ public ResponseT response() {
6592
return response;
6693
}
6794

95+
@Override
96+
public int read() throws IOException {
97+
cancelTimeoutTask();
98+
return super.read();
99+
}
100+
101+
@Override
102+
public int read(byte[] b) throws IOException {
103+
cancelTimeoutTask();
104+
return super.read(b);
105+
}
106+
107+
@Override
108+
public int read(byte[] b, int off, int len) throws IOException {
109+
cancelTimeoutTask();
110+
return super.read(b, off, len);
111+
}
112+
113+
private void cancelTimeoutTask() {
114+
hasRead = true;
115+
if (timeoutTask != null) {
116+
timeoutTask.cancel(false);
117+
}
118+
}
119+
120+
private void scheduleTimeoutTask(Duration timeout) {
121+
if (timeout.equals(Duration.ZERO)) {
122+
return;
123+
}
124+
125+
long timeoutInMillis = timeout.toMillis();
126+
timeoutTask = TimeoutScheduler.INSTANCE.schedule(() -> {
127+
if (!hasRead) {
128+
log.debug(() -> String.format("InputStream was not read before timeout of [%d] milliseconds, aborting "
129+
+ "stream and closing connection.", timeoutInMillis));
130+
abort();
131+
}
132+
}, timeoutInMillis, TimeUnit.MILLISECONDS);
133+
}
134+
135+
private static final class TimeoutScheduler {
136+
static final ScheduledExecutorService INSTANCE =
137+
Executors.newScheduledThreadPool(1, r -> {
138+
Thread t = new Thread(r, "response-input-stream-timeout-scheduler");
139+
t.setDaemon(true);
140+
return t;
141+
});
142+
}
143+
68144
/**
69145
* Close the underlying connection, dropping all remaining data in the stream, and not leaving the
70146
* connection open to be used for future requests.
71147
*/
72148
@Override
73149
public void abort() {
150+
if (timeoutTask != null) {
151+
timeoutTask.cancel(false);
152+
}
74153
if (abortable != null) {
75154
abortable.abort();
76155
}
77-
IoUtils.closeQuietly(in, null);
156+
IoUtils.closeQuietlyV2(in, log);
157+
}
158+
159+
@SdkTestInternalApi
160+
public boolean hasTimeoutTask() {
161+
return timeoutTask != null;
162+
}
163+
164+
@SdkTestInternalApi
165+
public boolean timeoutTaskDoneOrCancelled() {
166+
return timeoutTask != null && timeoutTask.isDone();
78167
}
79168
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.InputStream;
2020
import java.nio.ByteBuffer;
2121
import java.nio.file.Path;
22+
import java.time.Duration;
2223
import java.util.Map;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.function.Consumer;
@@ -271,6 +272,9 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
271272
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
272273
* completed after the entire response body has finished streaming.
273274
* <p>
275+
* The publisher has a default timeout of 60 seconds. If no subscriber is registered within this time, the subscription
276+
* will be automatically cancelled. Use {@link #toPublisher(Duration)} to specify a custom timeout.
277+
* <p>
274278
* You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
275279
* transformer is only recommended for advanced use cases.
276280
* <p>
@@ -293,6 +297,32 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo
293297
return new PublisherAsyncResponseTransformer<>();
294298
}
295299

300+
/**
301+
* Creates an {@link AsyncResponseTransformer} with a custom timeout that publishes the response body content through a
302+
* {@link ResponsePublisher}, which is an {@link SdkPublisher} that also contains a reference to the {@link SdkResponse}
303+
* returned by the service.
304+
* <p>
305+
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed
306+
* once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This behavior differs from some
307+
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
308+
* completed after the entire response body has finished streaming.
309+
* <p>
310+
* If no subscriber is registered within the specified timeout, the subscription will be automatically cancelled. To
311+
* disable the timeout, pass {@link Duration#ZERO}.
312+
* <p>
313+
* You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
314+
* transformer is only recommended for advanced use cases.
315+
*
316+
* @param timeout Maximum time to wait for subscription before cancelling. Use {@link Duration#ZERO} to disable timeout.
317+
* @param <ResponseT> Pojo response type.
318+
* @return AsyncResponseTransformer instance.
319+
* @see #toPublisher()
320+
*/
321+
static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT,
322+
ResponsePublisher<ResponseT>> toPublisher(Duration timeout) {
323+
return new PublisherAsyncResponseTransformer<>(timeout);
324+
}
325+
296326
/**
297327
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an {@link InputStream}.
298328
* <p>

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,55 @@
1616
package software.amazon.awssdk.core.async;
1717

1818
import java.nio.ByteBuffer;
19+
import java.time.Duration;
1920
import java.util.Objects;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.ScheduledFuture;
24+
import java.util.concurrent.TimeUnit;
2025
import org.reactivestreams.Subscriber;
26+
import org.reactivestreams.Subscription;
2127
import software.amazon.awssdk.annotations.SdkPublicApi;
28+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2229
import software.amazon.awssdk.core.SdkResponse;
30+
import software.amazon.awssdk.utils.Logger;
2331
import software.amazon.awssdk.utils.ToString;
2432
import software.amazon.awssdk.utils.Validate;
2533

2634
/**
2735
* An {@link SdkPublisher} that publishes response body content and also contains a reference to the {@link SdkResponse} returned
2836
* by the service.
37+
* <p>
38+
* <b>NOTE:</b> You must subscribe to this publisher promptly to avoid automatic cancellation. The default timeout
39+
* for subscribing is 60 seconds. If {@link #subscribe(Subscriber)} is not invoked before the timeout, the publisher
40+
* will automatically cancel the underlying subscription to prevent resource leakage.
41+
* <p>
42+
* The timeout can be customized by passing a {@link Duration} to the constructor, or disabled entirely by
43+
* passing {@link Duration#ZERO}.
2944
*
3045
* @param <ResponseT> Pojo response type.
3146
* @see AsyncResponseTransformer#toPublisher()
3247
*/
3348
@SdkPublicApi
3449
public final class ResponsePublisher<ResponseT extends SdkResponse> implements SdkPublisher<ByteBuffer> {
3550

51+
private static final Logger log = Logger.loggerFor(ResponsePublisher.class);
52+
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
3653
private final ResponseT response;
3754
private final SdkPublisher<ByteBuffer> publisher;
55+
private ScheduledFuture<?> timeoutTask;
56+
private volatile boolean subscribed = false;
3857

3958
public ResponsePublisher(ResponseT response, SdkPublisher<ByteBuffer> publisher) {
59+
this(response, publisher, null);
60+
}
61+
62+
public ResponsePublisher(ResponseT response, SdkPublisher<ByteBuffer> publisher, Duration timeout) {
4063
this.response = Validate.paramNotNull(response, "response");
4164
this.publisher = Validate.paramNotNull(publisher, "publisher");
65+
66+
Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
67+
scheduleTimeoutTask(resolvedTimeout);
4268
}
4369

4470
/**
@@ -50,9 +76,59 @@ public ResponseT response() {
5076

5177
@Override
5278
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
79+
subscribed = true;
80+
if (timeoutTask != null) {
81+
timeoutTask.cancel(false);
82+
}
83+
5384
publisher.subscribe(subscriber);
5485
}
5586

87+
private void scheduleTimeoutTask(Duration timeout) {
88+
if (timeout.equals(Duration.ZERO)) {
89+
return;
90+
}
91+
92+
long timeoutInMillis = timeout.toMillis();
93+
timeoutTask = TimeoutScheduler.INSTANCE.schedule(() -> {
94+
if (!subscribed) {
95+
log.debug(() -> String.format("Publisher was not consumed before timeout of [%d] milliseconds, cancelling "
96+
+ "subscription and closing connection.", timeoutInMillis));
97+
98+
publisher.subscribe(new CancellingSubscriber());
99+
}
100+
}, timeoutInMillis, TimeUnit.MILLISECONDS);
101+
}
102+
103+
private static final class TimeoutScheduler {
104+
static final ScheduledExecutorService INSTANCE =
105+
Executors.newScheduledThreadPool(1, r -> {
106+
Thread t = new Thread(r, "response-publisher-timeout-scheduler");
107+
t.setDaemon(true);
108+
return t;
109+
});
110+
}
111+
112+
private static class CancellingSubscriber implements Subscriber<ByteBuffer> {
113+
114+
@Override
115+
public void onSubscribe(Subscription s) {
116+
s.cancel();
117+
}
118+
119+
@Override
120+
public void onNext(ByteBuffer b) {
121+
}
122+
123+
@Override
124+
public void onError(Throwable t) {
125+
}
126+
127+
@Override
128+
public void onComplete() {
129+
}
130+
}
131+
56132
@Override
57133
public String toString() {
58134
return ToString.builder("ResponsePublisher")
@@ -84,4 +160,14 @@ public int hashCode() {
84160
result = 31 * result + (publisher != null ? publisher.hashCode() : 0);
85161
return result;
86162
}
163+
164+
@SdkTestInternalApi
165+
public boolean hasTimeoutTask() {
166+
return timeoutTask != null;
167+
}
168+
169+
@SdkTestInternalApi
170+
public boolean timeoutTaskDoneOrCancelled() {
171+
return timeoutTask != null && timeoutTask.isDone();
172+
}
87173
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.core.internal.async;
1717

1818
import java.nio.ByteBuffer;
19+
import java.time.Duration;
1920
import java.util.concurrent.CompletableFuture;
2021
import software.amazon.awssdk.annotations.SdkInternalApi;
2122
import software.amazon.awssdk.core.SdkResponse;
@@ -35,6 +36,14 @@ public final class PublisherAsyncResponseTransformer<ResponseT extends SdkRespon
3536

3637
private volatile CompletableFuture<ResponsePublisher<ResponseT>> future;
3738
private volatile ResponseT response;
39+
private Duration timeout;
40+
41+
public PublisherAsyncResponseTransformer() {
42+
}
43+
44+
public PublisherAsyncResponseTransformer(Duration timeout) {
45+
this.timeout = timeout;
46+
}
3847

3948
@Override
4049
public CompletableFuture<ResponsePublisher<ResponseT>> prepare() {
@@ -50,7 +59,7 @@ public void onResponse(ResponseT response) {
5059

5160
@Override
5261
public void onStream(SdkPublisher<ByteBuffer> publisher) {
53-
future.complete(new ResponsePublisher<>(response, publisher));
62+
future.complete(new ResponsePublisher<>(response, publisher, timeout));
5463
}
5564

5665
@Override

0 commit comments

Comments
 (0)