Skip to content

Commit d078a6e

Browse files
committed
Address comments
1 parent a538e8c commit d078a6e

File tree

6 files changed

+53
-21
lines changed

6 files changed

+53
-21
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@
3737
*
3838
* <p>
3939
* <b>NOTE:</b> You must read this stream promptly to avoid automatic cancellation. The default timeout for reading is 60
40-
* seconds. If {@link #read()} is not invoked before the timeout, the stream will automatically abort to prevent resource leakage.
40+
* seconds, which starts when the response stream is ready. If {@link #read()} is not invoked before the timeout, the stream will
41+
* automatically abort to prevent resource leakage.
4142
* <p>
4243
* The timeout can be customized by passing a {@link Duration} to the constructor, or disabled entirely by
43-
* passing {@link Duration#ZERO}.
44+
* passing {@link Duration#ZERO} or a negative {@link Duration}.
4445
* <p>
4546
* Note about the Apache http client: This input stream can be used to leverage a feature of the Apache http client where
4647
* connections are released back to the connection pool to be reused. As such, calling {@link ResponseInputStream#close() close}
@@ -79,10 +80,16 @@ public ResponseInputStream(ResponseT resp, AbortableInputStream in, Duration tim
7980
}
8081

8182
public ResponseInputStream(ResponseT resp, InputStream in) {
83+
this(resp, in, null);
84+
}
85+
86+
public ResponseInputStream(ResponseT resp, InputStream in, Duration timeout) {
8287
super(in);
8388
this.response = Validate.paramNotNull(resp, "response");
8489
this.abortable = in instanceof Abortable ? (Abortable) in : null;
85-
scheduleTimeoutTask(DEFAULT_TIMEOUT);
90+
91+
Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
92+
scheduleTimeoutTask(resolvedTimeout);
8693
}
8794

8895
/**
@@ -111,14 +118,14 @@ public int read(byte[] b, int off, int len) throws IOException {
111118
}
112119

113120
private void cancelTimeoutTask() {
114-
hasRead = true;
115-
if (timeoutTask != null) {
121+
if (!hasRead && timeoutTask != null) {
116122
timeoutTask.cancel(false);
117123
}
124+
hasRead = true;
118125
}
119126

120127
private void scheduleTimeoutTask(Duration timeout) {
121-
if (timeout.equals(Duration.ZERO)) {
128+
if (timeout.equals(Duration.ZERO) || timeout.isNegative()) {
122129
return;
123130
}
124131

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,9 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
272272
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
273273
* completed after the entire response body has finished streaming.
274274
* <p>
275-
* The publisher has a default timeout of 60 seconds. If no subscriber is registered within this time, the subscription
276-
* will be automatically cancelled. Use {@link #toPublisher(Duration)} to specify a custom timeout.
275+
* The publisher has a default timeout of 60 seconds that starts when the response body begins streaming. If no subscriber is
276+
* registered within this time, the subscription will be automatically cancelled. Use {@link #toPublisher(Duration)} to
277+
* specify a custom timeout.
277278
* <p>
278279
* You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
279280
* transformer is only recommended for advanced use cases.
@@ -307,13 +308,15 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo
307308
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
308309
* completed after the entire response body has finished streaming.
309310
* <p>
310-
* If no subscriber is registered within the specified timeout, the subscription will be automatically cancelled. To
311-
* disable the timeout, pass {@link Duration#ZERO}.
311+
* The timeout starts when the response body begins streaming. If no subscriber is registered within the specified timeout,
312+
* the subscription will be automatically cancelled. To disable the timeout, pass {@link Duration#ZERO} or a negative
313+
* {@link Duration}.
312314
* <p>
313315
* You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
314316
* transformer is only recommended for advanced use cases.
315317
*
316-
* @param timeout Maximum time to wait for subscription before cancelling. Use {@link Duration#ZERO} to disable timeout.
318+
* @param timeout Maximum time to wait for subscription before cancelling. Use {@link Duration#ZERO} or a negative
319+
* {@link Duration} to disable timeout.
317320
* @param <ResponseT> Pojo response type.
318321
* @return AsyncResponseTransformer instance.
319322
* @see #toPublisher()

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535
* An {@link SdkPublisher} that publishes response body content and also contains a reference to the {@link SdkResponse} returned
3636
* by the service.
3737
* <p>
38-
* <b>NOTE:</b> You must subscribe to this publisher promptly to avoid automatic cancellation. The default timeout
39-
* for subscribing is 60 seconds. If {@link #subscribe(Subscriber)} is not invoked before the timeout, the publisher
40-
* will automatically cancel the underlying subscription to prevent resource leakage.
38+
* <b>NOTE:</b> You must subscribe to this publisher promptly to avoid automatic cancellation. The default timeout for
39+
* subscribing is 60 seconds, which starts when the response body begins streaming. If {@link #subscribe(Subscriber)} is not
40+
* invoked before the timeout, the publisher will automatically cancel the underlying subscription to prevent resource leakage.
4141
* <p>
4242
* The timeout can be customized by passing a {@link Duration} to the constructor, or disabled entirely by
43-
* passing {@link Duration#ZERO}.
43+
* passing {@link Duration#ZERO} or a negative {@link Duration}.
4444
*
4545
* @param <ResponseT> Pojo response type.
4646
* @see AsyncResponseTransformer#toPublisher()
@@ -85,7 +85,7 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
8585
}
8686

8787
private void scheduleTimeoutTask(Duration timeout) {
88-
if (timeout.equals(Duration.ZERO)) {
88+
if (timeout.equals(Duration.ZERO) || timeout.isNegative()) {
8989
return;
9090
}
9191

core/sdk-core/src/main/java/software/amazon/awssdk/core/sync/ResponseTransformer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,9 @@ public String name() {
234234
* be explicitly closed to release the connection. The unmarshalled response object can be obtained via the {@link
235235
* ResponseInputStream#response()} method.
236236
* <p>
237-
* The stream has a default timeout of 60 seconds. If no read operation occurs within this time, the connection
238-
* will be automatically aborted. Use {@link #toInputStream(Duration)} to specify a custom timeout.
237+
* The stream has a default timeout of 60 seconds that starts when the response stream is ready. If no read operation occurs
238+
* within this time, the connection will be automatically aborted. Use {@link #toInputStream(Duration)} to specify a custom
239+
* timeout.
239240
* <p>
240241
* Note that the returned stream is not subject to the retry policy or timeout settings (except for socket timeout)
241242
* of the client. No retries will be performed in the event of a socket read failure or connection reset.
@@ -262,10 +263,12 @@ public String name() {
262263
* Creates a response transformer that returns an unmanaged input stream with the response content and a custom timeout.
263264
* This input stream must be explicitly closed to release the connection.
264265
* <p>
265-
* If no read operation occurs within the specified timeout, the connection will be automatically aborted.
266-
* Pass {@link Duration#ZERO} to disable the timeout.
266+
* The timeout starts when the response stream is ready. If no read operation occurs within the specified timeout, the
267+
* connection will be automatically aborted. To disable the timeout, pass {@link Duration#ZERO} or a negative
268+
* {@link Duration}.
267269
*
268-
* @param timeout Maximum time to wait for first read operation before aborting. Use {@link Duration#ZERO} to disable timeout.
270+
* @param timeout Maximum time to wait for first read operation before aborting. Use {@link Duration#ZERO} or a negative
271+
* {@link Duration} to disable timeout.
269272
* @param <ResponseT> Type of unmarshalled response POJO.
270273
* @return ResponseTransformer instance.
271274
* @see #toInputStream()

core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseInputStreamTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,16 @@ void zeroTimeout_disablesTimeout() throws Exception {
123123
assertThat(responseInputStream.hasTimeoutTask()).isFalse();
124124
}
125125

126+
@Test
127+
void negativeTimeout_disablesTimeout() throws Exception {
128+
ResponseInputStream<Object> responseInputStream = responseInputStream(Duration.ofSeconds(-1));
129+
Thread.sleep(2000);
130+
131+
verify(abortable, never()).abort();
132+
verify(stream, never()).close();
133+
assertThat(responseInputStream.hasTimeoutTask()).isFalse();
134+
}
135+
126136
private ResponseInputStream<Object> responseInputStream(Duration timeout) {
127137
return new ResponseInputStream<>(new Object(), abortableInputStream, timeout);
128138
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ResponsePublisherTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ void zeroTimeout_disablesTimeout() throws Exception {
9494
assertThat(responsePublisher.hasTimeoutTask()).isFalse();
9595
}
9696

97+
@Test
98+
void negativeTimeout_disablesTimeout() throws Exception {
99+
responsePublisher = responsePublisher(Duration.ofSeconds(-1));
100+
Thread.sleep(2000);
101+
102+
verify(publisher, never()).subscribe(any(Subscriber.class));
103+
assertThat(responsePublisher.hasTimeoutTask()).isFalse();
104+
}
105+
97106
private ResponsePublisher<SdkResponse> responsePublisher(Duration timeout) {
98107
return new ResponsePublisher<>(response, publisher, timeout);
99108
}

0 commit comments

Comments
 (0)