Skip to content

Commit 43950cf

Browse files
authored
Add timeout to ResponseInputStream and ResponsePublisher (#6367)
* Add timeout to ResponseInputStream and ResponsePublisher * Add apache test dependency * Address comments
1 parent 4fba4ba commit 43950cf

File tree

11 files changed

+705
-20
lines changed

11 files changed

+705
-20
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Adds timeouts to ResponsePublisher and ResponseInputStream to close connection if response not consumed"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,33 @@
1515

1616
package software.amazon.awssdk.core;
1717

18+
import java.io.IOException;
1819
import java.io.InputStream;
20+
import java.time.Duration;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.ScheduledFuture;
24+
import java.util.concurrent.TimeUnit;
1925
import software.amazon.awssdk.annotations.SdkPublicApi;
26+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2027
import software.amazon.awssdk.core.io.SdkFilterInputStream;
2128
import software.amazon.awssdk.http.Abortable;
2229
import software.amazon.awssdk.http.AbortableInputStream;
2330
import software.amazon.awssdk.utils.IoUtils;
31+
import software.amazon.awssdk.utils.Logger;
2432
import software.amazon.awssdk.utils.Validate;
2533

2634
/**
2735
* Input stream that provides access to the unmarshalled POJO response returned by the service in addition to the streamed
2836
* contents. This input stream should be closed after all data has been read from the stream.
37+
*
38+
* <p>
39+
* <b>NOTE:</b> You must read this stream promptly to avoid automatic cancellation. The default timeout for reading is 60
40+
* seconds, which starts when the response stream is ready. If {@link #read()} is not invoked before the timeout, the stream will
41+
* automatically abort to prevent resource leakage.
42+
* <p>
43+
* The timeout can be customized by passing a {@link Duration} to the constructor, or disabled entirely by
44+
* passing {@link Duration#ZERO} or a negative {@link Duration}.
2945
* <p>
3046
* Note about the Apache http client: This input stream can be used to leverage a feature of the Apache http client where
3147
* connections are released back to the connection pool to be reused. As such, calling {@link ResponseInputStream#close() close}
@@ -43,19 +59,37 @@
4359
@SdkPublicApi
4460
public final class ResponseInputStream<ResponseT> extends SdkFilterInputStream implements Abortable {
4561

62+
private static final Logger log = Logger.loggerFor(ResponseInputStream.class);
63+
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
4664
private final ResponseT response;
4765
private final Abortable abortable;
66+
private ScheduledFuture<?> timeoutTask;
67+
private volatile boolean hasRead = false;
4868

4969
public ResponseInputStream(ResponseT resp, AbortableInputStream in) {
70+
this(resp, in, null);
71+
}
72+
73+
public ResponseInputStream(ResponseT resp, AbortableInputStream in, Duration timeout) {
5074
super(in);
5175
this.response = Validate.paramNotNull(resp, "response");
5276
this.abortable = Validate.paramNotNull(in, "abortableInputStream");
77+
78+
Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
79+
scheduleTimeoutTask(resolvedTimeout);
5380
}
5481

5582
public ResponseInputStream(ResponseT resp, InputStream in) {
83+
this(resp, in, null);
84+
}
85+
86+
public ResponseInputStream(ResponseT resp, InputStream in, Duration timeout) {
5687
super(in);
5788
this.response = Validate.paramNotNull(resp, "response");
5889
this.abortable = in instanceof Abortable ? (Abortable) in : null;
90+
91+
Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
92+
scheduleTimeoutTask(resolvedTimeout);
5993
}
6094

6195
/**
@@ -65,15 +99,77 @@ public ResponseT response() {
6599
return response;
66100
}
67101

102+
@Override
103+
public int read() throws IOException {
104+
cancelTimeoutTask();
105+
return super.read();
106+
}
107+
108+
@Override
109+
public int read(byte[] b) throws IOException {
110+
cancelTimeoutTask();
111+
return super.read(b);
112+
}
113+
114+
@Override
115+
public int read(byte[] b, int off, int len) throws IOException {
116+
cancelTimeoutTask();
117+
return super.read(b, off, len);
118+
}
119+
120+
private void cancelTimeoutTask() {
121+
if (!hasRead && timeoutTask != null) {
122+
timeoutTask.cancel(false);
123+
}
124+
hasRead = true;
125+
}
126+
127+
private void scheduleTimeoutTask(Duration timeout) {
128+
if (timeout.equals(Duration.ZERO) || timeout.isNegative()) {
129+
return;
130+
}
131+
132+
long timeoutInMillis = timeout.toMillis();
133+
timeoutTask = TimeoutScheduler.INSTANCE.schedule(() -> {
134+
if (!hasRead) {
135+
log.debug(() -> String.format("InputStream was not read before timeout of [%d] milliseconds, aborting "
136+
+ "stream and closing connection.", timeoutInMillis));
137+
abort();
138+
}
139+
}, timeoutInMillis, TimeUnit.MILLISECONDS);
140+
}
141+
142+
private static final class TimeoutScheduler {
143+
static final ScheduledExecutorService INSTANCE =
144+
Executors.newScheduledThreadPool(1, r -> {
145+
Thread t = new Thread(r, "response-input-stream-timeout-scheduler");
146+
t.setDaemon(true);
147+
return t;
148+
});
149+
}
150+
68151
/**
69152
* Close the underlying connection, dropping all remaining data in the stream, and not leaving the
70153
* connection open to be used for future requests.
71154
*/
72155
@Override
73156
public void abort() {
157+
if (timeoutTask != null) {
158+
timeoutTask.cancel(false);
159+
}
74160
if (abortable != null) {
75161
abortable.abort();
76162
}
77-
IoUtils.closeQuietly(in, null);
163+
IoUtils.closeQuietlyV2(in, log);
164+
}
165+
166+
@SdkTestInternalApi
167+
public boolean hasTimeoutTask() {
168+
return timeoutTask != null;
169+
}
170+
171+
@SdkTestInternalApi
172+
public boolean timeoutTaskDoneOrCancelled() {
173+
return timeoutTask != null && timeoutTask.isDone();
78174
}
79175
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.InputStream;
2020
import java.nio.ByteBuffer;
2121
import java.nio.file.Path;
22+
import java.time.Duration;
2223
import java.util.Map;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.function.Consumer;
@@ -271,6 +272,10 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
271272
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
272273
* completed after the entire response body has finished streaming.
273274
* <p>
275+
* The publisher has a default timeout of 60 seconds that starts when the response body begins streaming. If no subscriber is
276+
* registered within this time, the subscription will be automatically cancelled. Use {@link #toPublisher(Duration)} to
277+
* specify a custom timeout.
278+
* <p>
274279
* You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
275280
* transformer is only recommended for advanced use cases.
276281
* <p>
@@ -293,6 +298,34 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo
293298
return new PublisherAsyncResponseTransformer<>();
294299
}
295300

301+
/**
302+
* Creates an {@link AsyncResponseTransformer} with a custom timeout that publishes the response body content through a
303+
* {@link ResponsePublisher}, which is an {@link SdkPublisher} that also contains a reference to the {@link SdkResponse}
304+
* returned by the service.
305+
* <p>
306+
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed
307+
* once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This behavior differs from some
308+
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
309+
* completed after the entire response body has finished streaming.
310+
* <p>
311+
* The timeout starts when the response body begins streaming. If no subscriber is registered within the specified timeout,
312+
* the subscription will be automatically cancelled. To disable the timeout, pass {@link Duration#ZERO} or a negative
313+
* {@link Duration}.
314+
* <p>
315+
* You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
316+
* transformer is only recommended for advanced use cases.
317+
*
318+
* @param timeout Maximum time to wait for subscription before cancelling. Use {@link Duration#ZERO} or a negative
319+
* {@link Duration} to disable timeout.
320+
* @param <ResponseT> Pojo response type.
321+
* @return AsyncResponseTransformer instance.
322+
* @see #toPublisher()
323+
*/
324+
static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT,
325+
ResponsePublisher<ResponseT>> toPublisher(Duration timeout) {
326+
return new PublisherAsyncResponseTransformer<>(timeout);
327+
}
328+
296329
/**
297330
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an {@link InputStream}.
298331
* <p>

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,55 @@
1616
package software.amazon.awssdk.core.async;
1717

1818
import java.nio.ByteBuffer;
19+
import java.time.Duration;
1920
import java.util.Objects;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.ScheduledFuture;
24+
import java.util.concurrent.TimeUnit;
2025
import org.reactivestreams.Subscriber;
26+
import org.reactivestreams.Subscription;
2127
import software.amazon.awssdk.annotations.SdkPublicApi;
28+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2229
import software.amazon.awssdk.core.SdkResponse;
30+
import software.amazon.awssdk.utils.Logger;
2331
import software.amazon.awssdk.utils.ToString;
2432
import software.amazon.awssdk.utils.Validate;
2533

2634
/**
2735
* An {@link SdkPublisher} that publishes response body content and also contains a reference to the {@link SdkResponse} returned
2836
* by the service.
37+
* <p>
38+
* <b>NOTE:</b> You must subscribe to this publisher promptly to avoid automatic cancellation. The default timeout for
39+
* subscribing is 60 seconds, which starts when the response body begins streaming. If {@link #subscribe(Subscriber)} is not
40+
* invoked before the timeout, the publisher will automatically cancel the underlying subscription to prevent resource leakage.
41+
* <p>
42+
* The timeout can be customized by passing a {@link Duration} to the constructor, or disabled entirely by
43+
* passing {@link Duration#ZERO} or a negative {@link Duration}.
2944
*
3045
* @param <ResponseT> Pojo response type.
3146
* @see AsyncResponseTransformer#toPublisher()
3247
*/
3348
@SdkPublicApi
3449
public final class ResponsePublisher<ResponseT extends SdkResponse> implements SdkPublisher<ByteBuffer> {
3550

51+
private static final Logger log = Logger.loggerFor(ResponsePublisher.class);
52+
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
3653
private final ResponseT response;
3754
private final SdkPublisher<ByteBuffer> publisher;
55+
private ScheduledFuture<?> timeoutTask;
56+
private volatile boolean subscribed = false;
3857

3958
public ResponsePublisher(ResponseT response, SdkPublisher<ByteBuffer> publisher) {
59+
this(response, publisher, null);
60+
}
61+
62+
public ResponsePublisher(ResponseT response, SdkPublisher<ByteBuffer> publisher, Duration timeout) {
4063
this.response = Validate.paramNotNull(response, "response");
4164
this.publisher = Validate.paramNotNull(publisher, "publisher");
65+
66+
Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
67+
scheduleTimeoutTask(resolvedTimeout);
4268
}
4369

4470
/**
@@ -50,9 +76,59 @@ public ResponseT response() {
5076

5177
@Override
5278
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
79+
subscribed = true;
80+
if (timeoutTask != null) {
81+
timeoutTask.cancel(false);
82+
}
83+
5384
publisher.subscribe(subscriber);
5485
}
5586

87+
private void scheduleTimeoutTask(Duration timeout) {
88+
if (timeout.equals(Duration.ZERO) || timeout.isNegative()) {
89+
return;
90+
}
91+
92+
long timeoutInMillis = timeout.toMillis();
93+
timeoutTask = TimeoutScheduler.INSTANCE.schedule(() -> {
94+
if (!subscribed) {
95+
log.debug(() -> String.format("Publisher was not consumed before timeout of [%d] milliseconds, cancelling "
96+
+ "subscription and closing connection.", timeoutInMillis));
97+
98+
publisher.subscribe(new CancellingSubscriber());
99+
}
100+
}, timeoutInMillis, TimeUnit.MILLISECONDS);
101+
}
102+
103+
private static final class TimeoutScheduler {
104+
static final ScheduledExecutorService INSTANCE =
105+
Executors.newScheduledThreadPool(1, r -> {
106+
Thread t = new Thread(r, "response-publisher-timeout-scheduler");
107+
t.setDaemon(true);
108+
return t;
109+
});
110+
}
111+
112+
private static class CancellingSubscriber implements Subscriber<ByteBuffer> {
113+
114+
@Override
115+
public void onSubscribe(Subscription s) {
116+
s.cancel();
117+
}
118+
119+
@Override
120+
public void onNext(ByteBuffer b) {
121+
}
122+
123+
@Override
124+
public void onError(Throwable t) {
125+
}
126+
127+
@Override
128+
public void onComplete() {
129+
}
130+
}
131+
56132
@Override
57133
public String toString() {
58134
return ToString.builder("ResponsePublisher")
@@ -84,4 +160,14 @@ public int hashCode() {
84160
result = 31 * result + (publisher != null ? publisher.hashCode() : 0);
85161
return result;
86162
}
163+
164+
@SdkTestInternalApi
165+
public boolean hasTimeoutTask() {
166+
return timeoutTask != null;
167+
}
168+
169+
@SdkTestInternalApi
170+
public boolean timeoutTaskDoneOrCancelled() {
171+
return timeoutTask != null && timeoutTask.isDone();
172+
}
87173
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.core.internal.async;
1717

1818
import java.nio.ByteBuffer;
19+
import java.time.Duration;
1920
import java.util.concurrent.CompletableFuture;
2021
import software.amazon.awssdk.annotations.SdkInternalApi;
2122
import software.amazon.awssdk.core.SdkResponse;
@@ -35,6 +36,14 @@ public final class PublisherAsyncResponseTransformer<ResponseT extends SdkRespon
3536

3637
private volatile CompletableFuture<ResponsePublisher<ResponseT>> future;
3738
private volatile ResponseT response;
39+
private Duration timeout;
40+
41+
public PublisherAsyncResponseTransformer() {
42+
}
43+
44+
public PublisherAsyncResponseTransformer(Duration timeout) {
45+
this.timeout = timeout;
46+
}
3847

3948
@Override
4049
public CompletableFuture<ResponsePublisher<ResponseT>> prepare() {
@@ -50,7 +59,7 @@ public void onResponse(ResponseT response) {
5059

5160
@Override
5261
public void onStream(SdkPublisher<ByteBuffer> publisher) {
53-
future.complete(new ResponsePublisher<>(response, publisher));
62+
future.complete(new ResponsePublisher<>(response, publisher, timeout));
5463
}
5564

5665
@Override

0 commit comments

Comments
 (0)