Skip to content

Commit 0e68258

Browse files
sugmanueadwsingh
andauthored
Delay publishing the initial event until fully wired (#865)
* Delay publishing the initial event until fully wired There's a race condition that can happen when we add the initial request using onNext, that call will call flush while the downstream subscriber is not yet set. To avoid this we just enqueue the initial request, and wait till the upstream requests more input. * Make spotless happy * Avoid exposing the enqueueItem method to keep th Rx contract * Update core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamFrameEncodingProcessor.java Co-authored-by: Adwait Kumar Singh <[email protected]> * Avoid flushing twice if onError is called --------- Co-authored-by: Adwait Kumar Singh <[email protected]>
1 parent 8c69855 commit 0e68258

File tree

6 files changed

+58
-20
lines changed

6 files changed

+58
-20
lines changed

aws/aws-event-streams/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ extra["moduleName"] = "software.amazon.smithy.java.aws.events"
99

1010
dependencies {
1111
api(project(":core"))
12+
implementation(project(":logging"))
1213
api("software.amazon.eventstream:eventstream:1.0.1")
1314
}

aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/RpcEventStreamsUtil.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ public static Flow.Publisher<ByteBuffer> bodyForEventStreaming(
2929
SerializableStruct input
3030
) {
3131
Flow.Publisher<SerializableStruct> eventStream = input.getMemberValue(streamingMember(input.schema()));
32-
var publisher = EventStreamFrameEncodingProcessor.create(eventStream, eventStreamEncodingFactory);
33-
// Queue the input as the initial-request.
34-
publisher.onNext(input);
35-
return publisher;
32+
return EventStreamFrameEncodingProcessor.create(eventStream, eventStreamEncodingFactory, input);
3633
}
3734

3835
public static <O extends SerializableStruct> CompletableFuture<O> deserializeResponse(
@@ -62,7 +59,7 @@ public void onError(Throwable throwable) {
6259

6360
@Override
6461
public void onComplete() {
65-
result.completeExceptionally(new RuntimeException("Unexpected vent stream completion"));
62+
result.completeExceptionally(new RuntimeException("Unexpected event stream completion"));
6663
}
6764
});
6865

core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies {
1414
api(project(":io"))
1515
api(project(":retries-api"))
1616
api(libs.smithy.model)
17+
implementation(project(":logging"))
1718
}
1819

1920
jmh {}

core/src/main/java/software/amazon/smithy/java/core/serde/BufferingFlatMapProcessor.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.concurrent.atomic.AtomicLong;
1313
import java.util.concurrent.atomic.AtomicReference;
1414
import java.util.stream.Stream;
15+
import software.amazon.smithy.java.logging.InternalLogger;
1516

1617
/**
1718
* A processor abstraction that maps inputs of type I from an upstream publisher to 0-n items of type O
@@ -27,12 +28,12 @@
2728
public abstract class BufferingFlatMapProcessor<I, O> implements
2829
Flow.Processor<I, O>,
2930
Flow.Subscription {
31+
private static final InternalLogger LOG = InternalLogger.getLogger(BufferingFlatMapProcessor.class);
3032
private static final Throwable COMPLETE_SENTINEL = new RuntimeException();
3133

32-
private final AtomicReference<Throwable> terminalEvent = new AtomicReference<>();
34+
private final AtomicReference<Throwable> terminalEventHolder = new AtomicReference<>();
3335
private final AtomicLong pendingRequests = new AtomicLong();
3436
private final AtomicInteger pendingFlushes = new AtomicInteger();
35-
private final Flow.Publisher<I> publisher;
3637
private final BlockingQueue<O> queue = new LinkedBlockingQueue<>();
3738

3839
private volatile Flow.Subscription upstreamSubscription;
@@ -42,7 +43,6 @@ public abstract class BufferingFlatMapProcessor<I, O> implements
4243
public BufferingFlatMapProcessor(
4344
Flow.Publisher<I> publisher
4445
) {
45-
this.publisher = publisher;
4646
publisher.subscribe(this);
4747
}
4848

@@ -67,28 +67,46 @@ public final void onNext(I item) {
6767
try {
6868
map(item).forEach(this::addToQueue);
6969
} catch (Exception e) {
70+
LOG.warn("Malformed input", e);
7071
onError(new SerializationException("Malformed input", e));
7172
return;
7273
}
7374
flush();
7475
}
7576

77+
/**
78+
* Used to add the initial message to the queue. This message won't be sent until
79+
* a flush happens, either by a calling {@link #request(long)} or {@link #onNext(Object)}.
80+
* <p>
81+
* This method will re-throw any exception caught when calling {@link #map} without calling
82+
* {@link #onError(Throwable)} since it's assumed that the processor is not yet fully setup
83+
* when this method is called.
84+
*/
85+
protected final void enqueueItem(I item) {
86+
try {
87+
map(item).forEach(this::addToQueue);
88+
} catch (RuntimeException e) {
89+
LOG.warn("Malformed input", e);
90+
throw e;
91+
}
92+
}
93+
7694
private void addToQueue(O item) {
7795
queue.add(item);
7896
}
7997

8098
@Override
8199
public final void onError(Throwable t) {
82100
upstreamSubscription.cancel();
83-
terminalEvent.compareAndSet(null, t);
101+
terminalEventHolder.compareAndSet(null, t);
84102
if (upstreamSubscription != null && downstream != null) {
85103
flush();
86104
}
87105
}
88106

89107
@Override
90108
public final void onComplete() {
91-
terminalEvent.compareAndSet(null, COMPLETE_SENTINEL);
109+
terminalEventHolder.compareAndSet(null, COMPLETE_SENTINEL);
92110
if (upstreamSubscription != null && downstream != null) {
93111
flush();
94112
}
@@ -107,6 +125,10 @@ public final void request(long n) {
107125

108126
private void flush() {
109127
if (upstreamSubscription == null || downstream == null) {
128+
LOG.warn("flush() requested before upstream and downstream fully wired, " +
129+
"upstreamSubscription is null: {}, downstream is null: {}",
130+
upstreamSubscription == null,
131+
downstream == null);
110132
onError(new IllegalStateException("flush() requested before upstream and downstream fully wired."));
111133
return;
112134
}
@@ -126,8 +148,8 @@ private void flush() {
126148
Flow.Subscriber<? super O> subscriber = downstream;
127149
long delivered = sendMessages(subscriber, pending);
128150
boolean empty = queue.isEmpty();
129-
Throwable term = terminalEvent.get();
130-
if (term != null && attemptTermination(subscriber, term, empty)) {
151+
Throwable terminalEvent = terminalEventHolder.get();
152+
if (terminalEvent != null && attemptTermination(subscriber, terminalEvent, empty)) {
131153
terminated = true;
132154
return;
133155
}
@@ -184,12 +206,12 @@ protected void handleError(Throwable error, Flow.Subscriber<? super O> subscribe
184206
/**
185207
* @return true if this decoder is in a terminal state
186208
*/
187-
private boolean attemptTermination(Flow.Subscriber<? super O> subscriber, Throwable term, boolean done) {
209+
private boolean attemptTermination(Flow.Subscriber<? super O> subscriber, Throwable terminalEvent, boolean done) {
188210
if (done && subscriber != null) {
189-
if (term == COMPLETE_SENTINEL) {
211+
if (terminalEvent == COMPLETE_SENTINEL) {
190212
subscriber.onComplete();
191213
} else {
192-
handleError(term, subscriber);
214+
handleError(terminalEvent, subscriber);
193215
}
194216
return true;
195217
}
@@ -238,7 +260,7 @@ private static long accumulate(long current, long n) {
238260
}
239261
}
240262

241-
private static long accumulate(AtomicLong l, long n) {
242-
return l.accumulateAndGet(n, BufferingFlatMapProcessor::accumulate);
263+
private static void accumulate(AtomicLong l, long n) {
264+
l.accumulateAndGet(n, BufferingFlatMapProcessor::accumulate);
243265
}
244266
}

core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamFrameEncodingProcessor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import java.util.stream.Stream;
1111
import software.amazon.smithy.java.core.schema.SerializableStruct;
1212
import software.amazon.smithy.java.core.serde.BufferingFlatMapProcessor;
13+
import software.amazon.smithy.java.logging.InternalLogger;
1314

1415
public final class EventStreamFrameEncodingProcessor<F extends Frame<?>, T extends SerializableStruct>
1516
extends BufferingFlatMapProcessor<T, ByteBuffer> {
17+
private static final InternalLogger LOG = InternalLogger.getLogger(EventStreamFrameEncodingProcessor.class);
1618
private final EventEncoder<F> eventEncoder;
1719
private final FrameEncoder<F> encoder;
1820

@@ -30,10 +32,24 @@ public static <F extends Frame<?>> EventStreamFrameEncodingProcessor<F, Serializ
3032
Flow.Publisher<SerializableStruct> publisher,
3133
EventEncoderFactory<F> encoderFactory
3234
) {
33-
return new EventStreamFrameEncodingProcessor<>(
35+
var processor = new EventStreamFrameEncodingProcessor<>(
3436
publisher,
3537
encoderFactory.newEventEncoder(),
3638
encoderFactory.newFrameEncoder());
39+
return processor;
40+
}
41+
42+
public static <F extends Frame<?>> EventStreamFrameEncodingProcessor<F, SerializableStruct> create(
43+
Flow.Publisher<SerializableStruct> publisher,
44+
EventEncoderFactory<F> encoderFactory,
45+
SerializableStruct firstItem
46+
) {
47+
var processor = new EventStreamFrameEncodingProcessor<>(
48+
publisher,
49+
encoderFactory.newEventEncoder(),
50+
encoderFactory.newFrameEncoder());
51+
processor.enqueueItem(firstItem);
52+
return processor;
3753
}
3854

3955
@Override
@@ -44,6 +60,7 @@ protected Stream<ByteBuffer> map(T item) {
4460
@Override
4561
protected void handleError(Throwable error, Flow.Subscriber<? super ByteBuffer> subscriber) {
4662
subscriber.onNext(encoder.encode(eventEncoder.encodeFailure(error)));
63+
LOG.warn("Unexpected error", error);
4764
subscriber.onComplete();
4865
}
4966
}

http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public ResponseSerializer errorSchema(Schema errorSchema) {
121121
*
122122
* @return Returns the created response.
123123
*/
124+
@SuppressWarnings("unchecked")
124125
public HttpResponse serializeResponse() {
125126
Objects.requireNonNull(shapeValue, "shapeValue is not set");
126127
Objects.requireNonNull(operation, "operation is not set");
@@ -151,8 +152,7 @@ public HttpResponse serializeResponse() {
151152

152153
var eventStream = (Flow.Publisher<SerializableStruct>) serializer.getEventStream();
153154
if (eventStream != null && operation instanceof OutputEventStreamingApiOperation<?, ?, ?>) {
154-
builder.body(
155-
EventStreamFrameEncodingProcessor.create(eventStream, eventEncoderFactory));
155+
builder.body(EventStreamFrameEncodingProcessor.create(eventStream, eventEncoderFactory));
156156
serializer.setContentType(eventEncoderFactory.contentType());
157157
} else if (serializer.hasBody()) {
158158
builder.body(serializer.getBody());

0 commit comments

Comments
 (0)