Skip to content

Commit 41b13a4

Browse files
violetaggrstoyanchev
authored andcommitted
Ensure onComplete/onError events will be delivered
Issue: SPR-16207
1 parent 6040178 commit 41b13a4

File tree

4 files changed

+57
-9
lines changed

4 files changed

+57
-9
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,9 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
312312
}
313313

314314
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
315-
publisher.changeState(this, COMPLETED);
315+
if (!publisher.changeState(this, COMPLETED)) {
316+
publisher.state.get().cancel(publisher);
317+
}
316318
}
317319

318320
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
@@ -325,6 +327,9 @@ <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
325327
publisher.subscriber.onComplete();
326328
}
327329
}
330+
else {
331+
publisher.state.get().onAllDataRead(publisher);
332+
}
328333
}
329334

330335
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
@@ -333,6 +338,9 @@ <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
333338
publisher.subscriber.onError(t);
334339
}
335340
}
341+
else {
342+
publisher.state.get().onError(publisher, t);
343+
}
336344
}
337345
}
338346

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
190190
if (processor.changeState(this, COMPLETED)) {
191191
processor.resultPublisher.publishComplete();
192192
}
193+
else {
194+
processor.state.get().onComplete(processor);
195+
}
193196
}
194197
},
195198

@@ -212,6 +215,9 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
212215
else if (processor.changeState(this, COMPLETED)) {
213216
processor.resultPublisher.publishComplete();
214217
}
218+
else {
219+
processor.state.get().onComplete(processor);
220+
}
215221
}
216222
else {
217223
if (processor.changeState(this, REQUESTED)) {
@@ -238,6 +244,9 @@ public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor
238244
if (processor.changeState(this, COMPLETED)) {
239245
processor.resultPublisher.publishComplete();
240246
}
247+
else {
248+
processor.state.get().onComplete(processor);
249+
}
241250
}
242251
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
243252
// ignore
@@ -275,6 +284,9 @@ public <T> void onError(AbstractListenerWriteFlushProcessor<T> processor, Throwa
275284
if (processor.changeState(this, COMPLETED)) {
276285
processor.resultPublisher.publishError(ex);
277286
}
287+
else {
288+
processor.state.get().onError(processor, ex);
289+
}
278290
}
279291

280292
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
251251
processor.writingComplete();
252252
processor.resultPublisher.publishComplete();
253253
}
254+
else {
255+
processor.state.get().onComplete(processor);
256+
}
254257
}
255258
},
256259

@@ -274,15 +277,29 @@ public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
274277
if (writeCompleted) {
275278
processor.releaseData();
276279
if (!processor.subscriberCompleted) {
277-
processor.changeState(WRITING, REQUESTED);
278-
processor.suspendWriting();
279-
Assert.state(processor.subscription != null, "No subscription");
280-
processor.subscription.request(1);
280+
if (processor.changeState(WRITING, REQUESTED)) {
281+
if (processor.subscriberCompleted) {
282+
if (processor.changeState(REQUESTED, COMPLETED)) {
283+
processor.writingComplete();
284+
processor.resultPublisher.publishComplete();
285+
} else {
286+
processor.state.get().onComplete(processor);
287+
}
288+
}
289+
else {
290+
processor.suspendWriting();
291+
Assert.state(processor.subscription != null, "No subscription");
292+
processor.subscription.request(1);
293+
}
294+
}
281295
}
282296
else {
283-
processor.changeState(WRITING, COMPLETED);
284-
processor.writingComplete();
285-
processor.resultPublisher.publishComplete();
297+
if (processor.changeState(WRITING, COMPLETED)) {
298+
processor.writingComplete();
299+
processor.resultPublisher.publishComplete();
300+
} else {
301+
processor.state.get().onComplete(processor);
302+
}
286303
}
287304
}
288305
else {
@@ -343,6 +360,9 @@ public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable e
343360
processor.writingComplete();
344361
processor.resultPublisher.publishError(ex);
345362
}
363+
else {
364+
processor.state.get().onError(processor, ex);
365+
}
346366
}
347367

348368
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {

spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,19 @@ void publishComplete(WriteResultPublisher publisher) {
158158
Assert.state(publisher.subscriber != null, "No subscriber");
159159
publisher.subscriber.onComplete();
160160
}
161+
else {
162+
publisher.state.get().publishComplete(publisher);
163+
}
161164
}
162165
@Override
163166
void publishError(WriteResultPublisher publisher, Throwable t) {
164167
if (publisher.changeState(this, COMPLETED)) {
165168
Assert.state(publisher.subscriber != null, "No subscriber");
166169
publisher.subscriber.onError(t);
167170
}
171+
else {
172+
publisher.state.get().publishError(publisher, t);
173+
}
168174
}
169175
},
170176

@@ -196,7 +202,9 @@ void request(WriteResultPublisher publisher, long n) {
196202
}
197203

198204
void cancel(WriteResultPublisher publisher) {
199-
publisher.changeState(this, COMPLETED);
205+
if (!publisher.changeState(this, COMPLETED)) {
206+
publisher.state.get().cancel(publisher);
207+
}
200208
}
201209

202210
void publishComplete(WriteResultPublisher publisher) {

0 commit comments

Comments
 (0)