Skip to content

Commit 2a481c5

Browse files
violetaggrstoyanchev
authored andcommitted
Fix race conditions in AbstractListenerReadPublisher
Transition from DEMAND->NO_DEMAND: Two concurrent threads enter DEMAND.request and DEMAND.onDataAvailable. And DEMAND.onDataAvailable finishes before DEMAND.request to be able to update the demand field then a request for reading will be lost. Transition from READING->NO_DEMAND: readAndPublish() returns false because there is no demand but before switching the states READING.request is invoked again a request for reading will be lost. Changing READING->DEMAND/NO_DEMAND is made conditional so that the operations will be executed only if changing states succeeds. When in READING state detect completion before each next item in order to exit sooner, if completed. Issue: SPR-16207
1 parent b814875 commit 2a481c5

File tree

1 file changed

+31
-6
lines changed

1 file changed

+31
-6
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ protected void suspendReading() {
132132
*/
133133
private boolean readAndPublish() throws IOException {
134134
long r;
135-
while ((r = demand) > 0) {
135+
while ((r = demand) > 0 && !publisherCompleted) {
136136
T data = read();
137137
if (data != null) {
138138
if (r != Long.MAX_VALUE) {
@@ -292,27 +292,45 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
292292
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
293293
if (Operators.validate(n)) {
294294
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
295+
if (publisher.changeState(NO_DEMAND, DEMAND)) {
296+
publisher.checkOnDataAvailable();
297+
}
295298
}
296299
}
297300

298301
@Override
299302
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
303+
for (;;) {
304+
if (!read(publisher)) {
305+
return;
306+
}
307+
long r = publisher.demand;
308+
if (r == 0 || publisher.changeState(NO_DEMAND, this)) {
309+
break;
310+
}
311+
}
312+
}
313+
314+
<T> boolean read(AbstractListenerReadPublisher<T> publisher) {
300315
if (publisher.changeState(this, READING)) {
301316
try {
302317
boolean demandAvailable = publisher.readAndPublish();
303318
if (demandAvailable) {
304-
publisher.changeState(READING, DEMAND);
305-
publisher.checkOnDataAvailable();
319+
if (publisher.changeState(READING, DEMAND)) {
320+
publisher.checkOnDataAvailable();
321+
return false;
322+
}
306323
}
307-
else {
308-
publisher.changeState(READING, NO_DEMAND);
324+
else if (publisher.changeState(READING, NO_DEMAND)) {
309325
publisher.suspendReading();
310326
}
311327
}
312328
catch (IOException ex) {
313329
publisher.onError(ex);
314330
}
331+
return true;
315332
}
333+
return false;
316334
}
317335
},
318336

@@ -321,6 +339,9 @@ <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
321339
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
322340
if (Operators.validate(n)) {
323341
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
342+
if (publisher.changeState(NO_DEMAND, DEMAND)) {
343+
publisher.checkOnDataAvailable();
344+
}
324345
}
325346
}
326347
},
@@ -356,7 +377,10 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
356377
}
357378

358379
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
359-
if (!publisher.changeState(this, COMPLETED)) {
380+
if (publisher.changeState(this, COMPLETED)) {
381+
publisher.publisherCompleted = true;
382+
}
383+
else {
360384
publisher.state.get().cancel(publisher);
361385
}
362386
}
@@ -367,6 +391,7 @@ <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
367391

368392
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
369393
if (publisher.changeState(this, COMPLETED)) {
394+
publisher.publisherCompleted = true;
370395
if (publisher.subscriber != null) {
371396
publisher.subscriber.onComplete();
372397
}

0 commit comments

Comments
 (0)