Skip to content

Commit 27bd63b

Browse files
committed
Propagate exception to inputstream subscriber
Within InputStreamResponseTransformer, if exceptionOccurred is invoked after we already have an InputStream subscriber, propate the exception to that subscriber.
1 parent 263c0f2 commit 27bd63b

File tree

2 files changed

+123
-1
lines changed

2 files changed

+123
-1
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamResponseTransformer.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import java.nio.ByteBuffer;
1919
import java.util.concurrent.CompletableFuture;
20+
import org.reactivestreams.Subscriber;
21+
import org.reactivestreams.Subscription;
2022
import software.amazon.awssdk.annotations.SdkInternalApi;
2123
import software.amazon.awssdk.core.ResponseInputStream;
2224
import software.amazon.awssdk.core.SdkResponse;
@@ -35,6 +37,7 @@ public class InputStreamResponseTransformer<ResponseT extends SdkResponse>
3537

3638
private volatile CompletableFuture<ResponseInputStream<ResponseT>> future;
3739
private volatile ResponseT response;
40+
private volatile WaitForSubscribeOnErrorWrapper subscriber;
3841

3942
@Override
4043
public CompletableFuture<ResponseInputStream<ResponseT>> prepare() {
@@ -51,17 +54,74 @@ public void onResponse(ResponseT response) {
5154
@Override
5255
public void onStream(SdkPublisher<ByteBuffer> publisher) {
5356
AbortableInputStreamSubscriber inputStreamSubscriber = AbortableInputStreamSubscriber.builder().build();
54-
publisher.subscribe(inputStreamSubscriber);
57+
WaitForSubscribeOnErrorWrapper waitForSubscribeSubscriber = new WaitForSubscribeOnErrorWrapper(inputStreamSubscriber);
58+
59+
this.subscriber = waitForSubscribeSubscriber;
60+
61+
publisher.subscribe(waitForSubscribeSubscriber);
5562
future.complete(new ResponseInputStream<>(response, inputStreamSubscriber));
5663
}
5764

5865
@Override
5966
public void exceptionOccurred(Throwable error) {
6067
future.completeExceptionally(error);
68+
if (subscriber != null) {
69+
this.subscriber.onError(error);
70+
}
6171
}
6272

6373
@Override
6474
public String name() {
6575
return TransformerType.STREAM.getName();
6676
}
77+
78+
// Simple wrapper subscriber that ensures we don't forward the `onError` to the delegate until onSubscribe is called, to be
79+
// compliant with the reactive streams spec. We use onError for forwarding the exception given to exceptionOccurred.
80+
private static final class WaitForSubscribeOnErrorWrapper implements Subscriber<ByteBuffer> {
81+
private final Object lock = new Object();
82+
private final AbortableInputStreamSubscriber delegate;
83+
84+
private boolean subscribed = false;
85+
private Throwable transformerException;
86+
87+
88+
private WaitForSubscribeOnErrorWrapper(AbortableInputStreamSubscriber delegate) {
89+
this.delegate = delegate;
90+
}
91+
92+
@Override
93+
public void onSubscribe(Subscription s) {
94+
synchronized (lock) {
95+
subscribed = true;
96+
delegate.onSubscribe(s);
97+
98+
if (transformerException != null) {
99+
delegate.onError(transformerException);
100+
transformerException = null;
101+
}
102+
}
103+
}
104+
105+
@Override
106+
public void onNext(ByteBuffer byteBuffer) {
107+
this.delegate.onNext(byteBuffer);
108+
}
109+
110+
@Override
111+
public void onError(Throwable t) {
112+
synchronized (lock) {
113+
if (subscribed) {
114+
delegate.onError(t);
115+
} else {
116+
// We're not subscribed yet, save the exception for until we are.
117+
transformerException = t;
118+
}
119+
}
120+
}
121+
122+
@Override
123+
public void onComplete() {
124+
this.delegate.onComplete();
125+
}
126+
}
67127
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamResponseTransformerTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@
1616
package software.amazon.awssdk.core.internal.async;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatNoException;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1921

2022
import java.io.IOException;
2123
import java.io.InputStream;
2224
import java.nio.ByteBuffer;
2325
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Phaser;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2430
import org.junit.jupiter.api.BeforeEach;
2531
import org.junit.jupiter.api.Test;
32+
import org.reactivestreams.Subscription;
2633
import software.amazon.awssdk.core.ResponseInputStream;
2734
import software.amazon.awssdk.core.SdkResponse;
2835
import software.amazon.awssdk.core.async.SdkPublisher;
@@ -80,4 +87,59 @@ public void inputStreamArrayReadsAreFromPublisher() throws IOException {
8087
assertThat(data[2]).isEqualTo((byte) 2);
8188
assertThat(stream.read(data)).isEqualTo(-1);
8289
}
90+
91+
@Test
92+
void exceptionOccurred_onStreamCalled_exceptionPropagatedToRead() {
93+
ResponseInputStream<SdkResponse> inputStream = resultFuture.join();
94+
transformer.exceptionOccurred(new IOException("I/O issue"));
95+
assertThatThrownBy(inputStream::read).isInstanceOf(IOException.class);
96+
}
97+
98+
@Test
99+
void exceptionOccurred_onStreamNotCalled_doesNotFail() {
100+
InputStreamResponseTransformer<SdkResponse> responseTransformer = new InputStreamResponseTransformer<>();
101+
responseTransformer.prepare();
102+
responseTransformer.onResponse(response);
103+
assertThatNoException().isThrownBy(() -> responseTransformer.exceptionOccurred(new IOException("I/O issue")));
104+
}
105+
106+
@Test
107+
public void exceptionOccurred_onStreamCalled_onSubscribeNotCalled_doesNotThrow() throws Exception {
108+
ExecutorService exec = Executors.newSingleThreadExecutor();
109+
110+
for (int i = 0; i < 1024; ++i) {
111+
InputStreamResponseTransformer<SdkResponse> responseTransformer = new InputStreamResponseTransformer<>();
112+
CompletableFuture<ResponseInputStream<SdkResponse>> cf = responseTransformer.prepare();
113+
responseTransformer.onResponse(response);
114+
115+
AtomicBoolean onSubscribeCalled = new AtomicBoolean();
116+
117+
Phaser phaser = new Phaser(2);
118+
119+
responseTransformer.exceptionOccurred(new IOException("I/O issue"));
120+
phaser.arrive();
121+
122+
exec.submit(() -> {
123+
phaser.arriveAndAwaitAdvance();
124+
125+
responseTransformer.onStream(SdkPublisher.adapt(s -> {
126+
// the exception should not be forwarded to the inputstream subscriber if onSubscribe is not yet called
127+
128+
s.onSubscribe(new Subscription() {
129+
@Override
130+
public void request(long n) {
131+
}
132+
133+
@Override
134+
public void cancel() {
135+
}
136+
});
137+
onSubscribeCalled.set(true);
138+
}));
139+
}).get();
140+
141+
assertThatThrownBy(() -> cf.join().read()).hasMessageContaining("I/O issue");
142+
assertThat(onSubscribeCalled.get()).isTrue();
143+
}
144+
}
83145
}

0 commit comments

Comments
 (0)