Skip to content

Commit b814875

Browse files
violetaggrstoyanchev
authored andcommitted
Fix race condition in transition from UNSUBSCRIBED->COMPLETED
- Ensure completion signal (normal/exception) will be delivered to the subscriber when transition from UNSUBSCRIBED->COMPLETED - According to the specification "Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber" so ensure onComplete/onError signals will be called AFTER onSubscribe signal. Issue: SPR-16207
1 parent 41b13a4 commit b814875

File tree

2 files changed

+68
-18
lines changed

2 files changed

+68
-18
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
5252

5353
private volatile long demand;
5454

55+
private volatile boolean publisherCompleted;
56+
57+
@Nullable
58+
private volatile Throwable publisherError;
59+
5560
@SuppressWarnings("rawtypes")
5661
private static final AtomicLongFieldUpdater<AbstractListenerReadPublisher> DEMAND_FIELD_UPDATER =
5762
AtomicLongFieldUpdater.newUpdater(AbstractListenerReadPublisher.class, "demand");
@@ -208,15 +213,54 @@ private enum State {
208213
<T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
209214
Assert.notNull(publisher, "Publisher must not be null");
210215
Assert.notNull(subscriber, "Subscriber must not be null");
211-
if (publisher.changeState(this, NO_DEMAND)) {
216+
if (publisher.changeState(this, SUBSCRIBING)) {
212217
Subscription subscription = new ReadSubscription(publisher);
213218
publisher.subscriber = subscriber;
214219
subscriber.onSubscribe(subscription);
220+
publisher.changeState(SUBSCRIBING, NO_DEMAND);
221+
if (publisher.publisherCompleted) {
222+
publisher.onAllDataRead();
223+
}
224+
Throwable publisherError = publisher.publisherError;
225+
if (publisherError != null) {
226+
publisher.onError(publisherError);
227+
}
215228
}
216229
else {
217230
throw new IllegalStateException(toString());
218231
}
219232
}
233+
234+
@Override
235+
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
236+
publisher.publisherCompleted = true;
237+
}
238+
239+
@Override
240+
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
241+
publisher.publisherError = t;
242+
}
243+
},
244+
245+
SUBSCRIBING {
246+
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
247+
if (Operators.validate(n)) {
248+
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
249+
if (publisher.changeState(this, DEMAND)) {
250+
publisher.checkOnDataAvailable();
251+
}
252+
}
253+
}
254+
255+
@Override
256+
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
257+
publisher.publisherCompleted = true;
258+
}
259+
260+
@Override
261+
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
262+
publisher.publisherError = t;
263+
}
220264
},
221265

222266
/**

spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,17 @@ private enum State {
119119
@Override
120120
void subscribe(WriteResultPublisher publisher, Subscriber<? super Void> subscriber) {
121121
Assert.notNull(subscriber, "Subscriber must not be null");
122-
publisher.subscriber = subscriber;
123-
if (publisher.changeState(this, SUBSCRIBED)) {
122+
if (publisher.changeState(this, SUBSCRIBING)) {
124123
Subscription subscription = new ResponseBodyWriteResultSubscription(publisher);
124+
publisher.subscriber = subscriber;
125125
subscriber.onSubscribe(subscription);
126+
publisher.changeState(SUBSCRIBING, SUBSCRIBED);
126127
if (publisher.publisherCompleted) {
127128
publisher.publishComplete();
128129
}
129-
else {
130-
Throwable publisherError = publisher.publisherError;
131-
if (publisherError != null) {
132-
publisher.publishError(publisherError);
133-
}
130+
Throwable publisherError = publisher.publisherError;
131+
if (publisherError != null) {
132+
publisher.publishError(publisherError);
134133
}
135134
}
136135
else {
@@ -147,6 +146,21 @@ void publishError(WriteResultPublisher publisher, Throwable t) {
147146
}
148147
},
149148

149+
SUBSCRIBING {
150+
@Override
151+
void request(WriteResultPublisher publisher, long n) {
152+
Operators.validate(n);
153+
}
154+
@Override
155+
void publishComplete(WriteResultPublisher publisher) {
156+
publisher.publisherCompleted = true;
157+
}
158+
@Override
159+
void publishError(WriteResultPublisher publisher, Throwable t) {
160+
publisher.publisherError = t;
161+
}
162+
},
163+
150164
SUBSCRIBED {
151165
@Override
152166
void request(WriteResultPublisher publisher, long n) {
@@ -183,14 +197,6 @@ void request(WriteResultPublisher publisher, long n) {
183197
void cancel(WriteResultPublisher publisher) {
184198
// ignore
185199
}
186-
@Override
187-
void publishComplete(WriteResultPublisher publisher) {
188-
// ignore
189-
}
190-
@Override
191-
void publishError(WriteResultPublisher publisher, Throwable t) {
192-
// ignore
193-
}
194200
};
195201

196202
void subscribe(WriteResultPublisher publisher, Subscriber<? super Void> subscriber) {
@@ -208,11 +214,11 @@ void cancel(WriteResultPublisher publisher) {
208214
}
209215

210216
void publishComplete(WriteResultPublisher publisher) {
211-
throw new IllegalStateException(toString());
217+
// ignore
212218
}
213219

214220
void publishError(WriteResultPublisher publisher, Throwable t) {
215-
throw new IllegalStateException(toString());
221+
// ignore
216222
}
217223
}
218224

0 commit comments

Comments
 (0)