diff --git a/.changes/next-release/feature-AWSSDKforJavav2-a91bebb.json b/.changes/next-release/feature-AWSSDKforJavav2-a91bebb.json
new file mode 100644
index 000000000000..ce75baa4e048
--- /dev/null
+++ b/.changes/next-release/feature-AWSSDKforJavav2-a91bebb.json
@@ -0,0 +1,6 @@
+{
+ "type": "feature",
+ "category": "AWS SDK for Java v2",
+ "contributor": "",
+ "description": "Adds timeouts to ResponsePublisher and ResponseInputStream to close connection if response not consumed"
+}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java
index 90df32b24113..8a7d9fea9b89 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java
@@ -15,17 +15,33 @@
package software.amazon.awssdk.core;
+import java.io.IOException;
import java.io.InputStream;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import software.amazon.awssdk.annotations.SdkPublicApi;
+import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.core.io.SdkFilterInputStream;
import software.amazon.awssdk.http.Abortable;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.utils.IoUtils;
+import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
/**
* Input stream that provides access to the unmarshalled POJO response returned by the service in addition to the streamed
* contents. This input stream should be closed after all data has been read from the stream.
+ *
+ *
+ * NOTE: You must read this stream promptly to avoid automatic cancellation. The default timeout for reading is 60
+ * seconds, which starts when the response stream is ready. If {@link #read()} is not invoked before the timeout, the stream will
+ * automatically abort to prevent resource leakage.
+ *
+ * The timeout can be customized by passing a {@link Duration} to the constructor, or disabled entirely by
+ * passing {@link Duration#ZERO} or a negative {@link Duration}.
*
* Note about the Apache http client: This input stream can be used to leverage a feature of the Apache http client where
* connections are released back to the connection pool to be reused. As such, calling {@link ResponseInputStream#close() close}
@@ -43,19 +59,37 @@
@SdkPublicApi
public final class ResponseInputStream extends SdkFilterInputStream implements Abortable {
+ private static final Logger log = Logger.loggerFor(ResponseInputStream.class);
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
private final ResponseT response;
private final Abortable abortable;
+ private ScheduledFuture> timeoutTask;
+ private volatile boolean hasRead = false;
public ResponseInputStream(ResponseT resp, AbortableInputStream in) {
+ this(resp, in, null);
+ }
+
+ public ResponseInputStream(ResponseT resp, AbortableInputStream in, Duration timeout) {
super(in);
this.response = Validate.paramNotNull(resp, "response");
this.abortable = Validate.paramNotNull(in, "abortableInputStream");
+
+ Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
+ scheduleTimeoutTask(resolvedTimeout);
}
public ResponseInputStream(ResponseT resp, InputStream in) {
+ this(resp, in, null);
+ }
+
+ public ResponseInputStream(ResponseT resp, InputStream in, Duration timeout) {
super(in);
this.response = Validate.paramNotNull(resp, "response");
this.abortable = in instanceof Abortable ? (Abortable) in : null;
+
+ Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
+ scheduleTimeoutTask(resolvedTimeout);
}
/**
@@ -65,15 +99,77 @@ public ResponseT response() {
return response;
}
+ @Override
+ public int read() throws IOException {
+ cancelTimeoutTask();
+ return super.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ cancelTimeoutTask();
+ return super.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ cancelTimeoutTask();
+ return super.read(b, off, len);
+ }
+
+ private void cancelTimeoutTask() {
+ if (!hasRead && timeoutTask != null) {
+ timeoutTask.cancel(false);
+ }
+ hasRead = true;
+ }
+
+ private void scheduleTimeoutTask(Duration timeout) {
+ if (timeout.equals(Duration.ZERO) || timeout.isNegative()) {
+ return;
+ }
+
+ long timeoutInMillis = timeout.toMillis();
+ timeoutTask = TimeoutScheduler.INSTANCE.schedule(() -> {
+ if (!hasRead) {
+ log.debug(() -> String.format("InputStream was not read before timeout of [%d] milliseconds, aborting "
+ + "stream and closing connection.", timeoutInMillis));
+ abort();
+ }
+ }, timeoutInMillis, TimeUnit.MILLISECONDS);
+ }
+
+ private static final class TimeoutScheduler {
+ static final ScheduledExecutorService INSTANCE =
+ Executors.newScheduledThreadPool(1, r -> {
+ Thread t = new Thread(r, "response-input-stream-timeout-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ }
+
/**
* Close the underlying connection, dropping all remaining data in the stream, and not leaving the
* connection open to be used for future requests.
*/
@Override
public void abort() {
+ if (timeoutTask != null) {
+ timeoutTask.cancel(false);
+ }
if (abortable != null) {
abortable.abort();
}
- IoUtils.closeQuietly(in, null);
+ IoUtils.closeQuietlyV2(in, log);
+ }
+
+ @SdkTestInternalApi
+ public boolean hasTimeoutTask() {
+ return timeoutTask != null;
+ }
+
+ @SdkTestInternalApi
+ public boolean timeoutTaskDoneOrCancelled() {
+ return timeoutTask != null && timeoutTask.isDone();
}
}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java
index d7c872d89289..a7abf157a628 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java
@@ -19,6 +19,7 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -271,6 +272,10 @@ static AsyncResponseTransformer>
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
* completed after the entire response body has finished streaming.
*
+ * The publisher has a default timeout of 60 seconds that starts when the response body begins streaming. If no subscriber is
+ * registered within this time, the subscription will be automatically cancelled. Use {@link #toPublisher(Duration)} to
+ * specify a custom timeout.
+ *
* You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
* transformer is only recommended for advanced use cases.
*
@@ -293,6 +298,34 @@ static AsyncResponseTransformer();
}
+ /**
+ * Creates an {@link AsyncResponseTransformer} with a custom timeout that publishes the response body content through a
+ * {@link ResponsePublisher}, which is an {@link SdkPublisher} that also contains a reference to the {@link SdkResponse}
+ * returned by the service.
+ *
+ * When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed
+ * once the {@link SdkResponse} is available and the response body begins streaming. This behavior differs from some
+ * other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
+ * completed after the entire response body has finished streaming.
+ *
+ * The timeout starts when the response body begins streaming. If no subscriber is registered within the specified timeout,
+ * the subscription will be automatically cancelled. To disable the timeout, pass {@link Duration#ZERO} or a negative
+ * {@link Duration}.
+ *
+ * You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
+ * transformer is only recommended for advanced use cases.
+ *
+ * @param timeout Maximum time to wait for subscription before cancelling. Use {@link Duration#ZERO} or a negative
+ * {@link Duration} to disable timeout.
+ * @param Pojo response type.
+ * @return AsyncResponseTransformer instance.
+ * @see #toPublisher()
+ */
+ static AsyncResponseTransformer> toPublisher(Duration timeout) {
+ return new PublisherAsyncResponseTransformer<>(timeout);
+ }
+
/**
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an {@link InputStream}.
*
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java
index d8f87899f785..bb44b4568f16 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java
@@ -16,16 +16,31 @@
package software.amazon.awssdk.core.async;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkPublicApi;
+import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.core.SdkResponse;
+import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
/**
* An {@link SdkPublisher} that publishes response body content and also contains a reference to the {@link SdkResponse} returned
* by the service.
+ *
+ * NOTE: You must subscribe to this publisher promptly to avoid automatic cancellation. The default timeout for
+ * subscribing is 60 seconds, which starts when the response body begins streaming. If {@link #subscribe(Subscriber)} is not
+ * invoked before the timeout, the publisher will automatically cancel the underlying subscription to prevent resource leakage.
+ *
+ * The timeout can be customized by passing a {@link Duration} to the constructor, or disabled entirely by
+ * passing {@link Duration#ZERO} or a negative {@link Duration}.
*
* @param Pojo response type.
* @see AsyncResponseTransformer#toPublisher()
@@ -33,12 +48,23 @@
@SdkPublicApi
public final class ResponsePublisher implements SdkPublisher {
+ private static final Logger log = Logger.loggerFor(ResponsePublisher.class);
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
private final ResponseT response;
private final SdkPublisher publisher;
+ private ScheduledFuture> timeoutTask;
+ private volatile boolean subscribed = false;
public ResponsePublisher(ResponseT response, SdkPublisher publisher) {
+ this(response, publisher, null);
+ }
+
+ public ResponsePublisher(ResponseT response, SdkPublisher publisher, Duration timeout) {
this.response = Validate.paramNotNull(response, "response");
this.publisher = Validate.paramNotNull(publisher, "publisher");
+
+ Duration resolvedTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
+ scheduleTimeoutTask(resolvedTimeout);
}
/**
@@ -50,9 +76,59 @@ public ResponseT response() {
@Override
public void subscribe(Subscriber super ByteBuffer> subscriber) {
+ subscribed = true;
+ if (timeoutTask != null) {
+ timeoutTask.cancel(false);
+ }
+
publisher.subscribe(subscriber);
}
+ private void scheduleTimeoutTask(Duration timeout) {
+ if (timeout.equals(Duration.ZERO) || timeout.isNegative()) {
+ return;
+ }
+
+ long timeoutInMillis = timeout.toMillis();
+ timeoutTask = TimeoutScheduler.INSTANCE.schedule(() -> {
+ if (!subscribed) {
+ log.debug(() -> String.format("Publisher was not consumed before timeout of [%d] milliseconds, cancelling "
+ + "subscription and closing connection.", timeoutInMillis));
+
+ publisher.subscribe(new CancellingSubscriber());
+ }
+ }, timeoutInMillis, TimeUnit.MILLISECONDS);
+ }
+
+ private static final class TimeoutScheduler {
+ static final ScheduledExecutorService INSTANCE =
+ Executors.newScheduledThreadPool(1, r -> {
+ Thread t = new Thread(r, "response-publisher-timeout-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ }
+
+ private static class CancellingSubscriber implements Subscriber {
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ s.cancel();
+ }
+
+ @Override
+ public void onNext(ByteBuffer b) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ }
+
@Override
public String toString() {
return ToString.builder("ResponsePublisher")
@@ -84,4 +160,14 @@ public int hashCode() {
result = 31 * result + (publisher != null ? publisher.hashCode() : 0);
return result;
}
+
+ @SdkTestInternalApi
+ public boolean hasTimeoutTask() {
+ return timeoutTask != null;
+ }
+
+ @SdkTestInternalApi
+ public boolean timeoutTaskDoneOrCancelled() {
+ return timeoutTask != null && timeoutTask.isDone();
+ }
}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java
index 3f0f9fa19fce..5acd685f7d99 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java
@@ -16,6 +16,7 @@
package software.amazon.awssdk.core.internal.async;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkResponse;
@@ -35,6 +36,14 @@ public final class PublisherAsyncResponseTransformer> future;
private volatile ResponseT response;
+ private Duration timeout;
+
+ public PublisherAsyncResponseTransformer() {
+ }
+
+ public PublisherAsyncResponseTransformer(Duration timeout) {
+ this.timeout = timeout;
+ }
@Override
public CompletableFuture> prepare() {
@@ -50,7 +59,7 @@ public void onResponse(ResponseT response) {
@Override
public void onStream(SdkPublisher publisher) {
- future.complete(new ResponsePublisher<>(response, publisher));
+ future.complete(new ResponsePublisher<>(response, publisher, timeout));
}
@Override
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/sync/ResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/sync/ResponseTransformer.java
index f883b591829e..c4371f319ce2 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/sync/ResponseTransformer.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/sync/ResponseTransformer.java
@@ -26,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Map;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
@@ -233,11 +234,16 @@ public String name() {
* be explicitly closed to release the connection. The unmarshalled response object can be obtained via the {@link
* ResponseInputStream#response()} method.
*
+ * The stream has a default timeout of 60 seconds that starts when the response stream is ready. If no read operation occurs
+ * within this time, the connection will be automatically aborted. Use {@link #toInputStream(Duration)} to specify a custom
+ * timeout.
+ *
* Note that the returned stream is not subject to the retry policy or timeout settings (except for socket timeout)
* of the client. No retries will be performed in the event of a socket read failure or connection reset.
*
* @param Type of unmarshalled response POJO.
* @return ResponseTransformer instance.
+ * @see #toInputStream(Duration)
*/
static ResponseTransformer> toInputStream() {
return unmanaged(new ResponseTransformer>() {
@@ -253,6 +259,34 @@ public String name() {
});
}
+ /**
+ * Creates a response transformer that returns an unmanaged input stream with the response content and a custom timeout.
+ * This input stream must be explicitly closed to release the connection.
+ *
+ * The timeout starts when the response stream is ready. If no read operation occurs within the specified timeout, the
+ * connection will be automatically aborted. To disable the timeout, pass {@link Duration#ZERO} or a negative
+ * {@link Duration}.
+ *
+ * @param timeout Maximum time to wait for first read operation before aborting. Use {@link Duration#ZERO} or a negative
+ * {@link Duration} to disable timeout.
+ * @param Type of unmarshalled response POJO.
+ * @return ResponseTransformer instance.
+ * @see #toInputStream()
+ */
+ static ResponseTransformer> toInputStream(Duration timeout) {
+ return unmanaged(new ResponseTransformer>() {
+ @Override
+ public ResponseInputStream transform(ResponseT response, AbortableInputStream inputStream) {
+ return new ResponseInputStream<>(response, inputStream, timeout);
+ }
+
+ @Override
+ public String name() {
+ return TransformerType.STREAM.getName();
+ }
+ });
+ }
+
/**
* Static helper method to create a response transformer that allows the connection to be left open. Useful for creating a
* {@link ResponseTransformer} with a lambda or method reference rather than an anonymous inner class.
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseInputStreamTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseInputStreamTest.java
index 79dbd44addf8..6710465a899d 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseInputStreamTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseInputStreamTest.java
@@ -15,64 +15,125 @@
package software.amazon.awssdk.core;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Duration;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import software.amazon.awssdk.http.Abortable;
import software.amazon.awssdk.http.AbortableInputStream;
class ResponseInputStreamTest {
+
+ InputStream stream;
+ Abortable abortable;
+ AbortableInputStream abortableInputStream;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ stream = Mockito.mock(InputStream.class);
+ abortable = Mockito.mock(Abortable.class);
+ abortableInputStream = AbortableInputStream.create(stream, abortable);
+ }
+
@Test
void abort_withAbortable_closesUnderlyingStream() throws IOException {
- InputStream stream = Mockito.mock(InputStream.class);
- Abortable abortable = Mockito.mock(Abortable.class);
- AbortableInputStream abortableInputStream = AbortableInputStream.create(stream, abortable);
ResponseInputStream