Skip to content

Commit afdca28

Browse files
committed
Avoid resume-suspend race condition
This commit turns suspendReading() into a readingPaused() notification that is invoked after a succession of reads stops because there is no more demand. Sub-classes can use this notification to suspend, if that applies to them. Most importantly the notification is guaranteed not to overlap with checkOnDataAvailable() which means that suspend does not need to be atomic and guarded against resume. The two can and do compete all the time when reading ends with no demand, and a request for demand arrives concurrently. Issue: SPR-16207
1 parent 4a87d3d commit afdca28

File tree

9 files changed

+29
-93
lines changed

9 files changed

+29
-93
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,14 @@ public final void onError(Throwable t) {
116116
protected abstract T read() throws IOException;
117117

118118
/**
119-
* Suspend reading, if the underlying API provides such a mechanism.
119+
* Invoked when reading is paused due to a lack of demand.
120+
* <p><strong>Note:</strong> This method is guaranteed not to compete with
121+
* {@link #checkOnDataAvailable()} so it can be used to safely suspend
122+
* reading, if the underlying API supports it, i.e. without competing with
123+
* an implicit call to resume via {@code checkOnDataAvailable()}.
124+
* @since 5.0.2
120125
*/
121-
protected abstract void suspendReading();
126+
protected abstract void readingPaused();
122127

123128

124129
// Private methods for use in State...
@@ -280,31 +285,14 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
280285
if (Operators.validate(n)) {
281286
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
282287
// Did a concurrent read transition to NO_DEMAND just before us?
283-
if (publisher.changeState(NO_DEMAND, DEMAND)) {
288+
if (publisher.changeState(NO_DEMAND, this)) {
284289
publisher.checkOnDataAvailable();
285290
}
286291
}
287292
}
288293

289294
@Override
290295
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
291-
for (;;) {
292-
if (!read(publisher)) {
293-
return;
294-
}
295-
// Maybe demand arrived between readAndPublish and READING->NO_DEMAND?
296-
long r = publisher.demand;
297-
if (r == 0 || publisher.changeState(NO_DEMAND, this)) {
298-
break;
299-
}
300-
}
301-
}
302-
303-
/**
304-
* @return whether to exit the read loop; false means stop trying
305-
* to read, true means check demand one more time.
306-
*/
307-
<T> boolean read(AbstractListenerReadPublisher<T> publisher) {
308296
if (publisher.changeState(this, READING)) {
309297
try {
310298
boolean demandAvailable = publisher.readAndPublish();
@@ -313,18 +301,22 @@ <T> boolean read(AbstractListenerReadPublisher<T> publisher) {
313301
publisher.checkOnDataAvailable();
314302
}
315303
}
316-
else if (publisher.changeState(READING, NO_DEMAND)) {
317-
publisher.suspendReading();
318-
return true;
304+
else {
305+
publisher.readingPaused();
306+
if (publisher.changeState(READING, NO_DEMAND)) {
307+
// Demand may have arrived since readAndPublish returned
308+
long r = publisher.demand;
309+
if (r > 0 && publisher.changeState(NO_DEMAND, this)) {
310+
publisher.checkOnDataAvailable();
311+
}
312+
}
319313
}
320314
}
321315
catch (IOException ex) {
322316
publisher.onError(ex);
323317
}
324318
}
325-
// Either competing onDataAvailable calls (via request or container callback)
326-
// Or a concurrent completion
327-
return false;
319+
// Else, either competing onDataAvailable (request vs container), or concurrent completion
328320
}
329321
},
330322

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ protected DataBuffer read() throws IOException {
267267
}
268268

269269
@Override
270-
protected void suspendReading() {
270+
protected void readingPaused() {
271271
// no-op
272272
}
273273

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -150,32 +150,16 @@ private void registerListeners(HttpServerExchange exchange) {
150150

151151
@Override
152152
protected void checkOnDataAvailable() {
153-
// TODO: The onDataAvailable() call below can cause a StackOverflowError
154-
// since this method is being called from onDataAvailable() itself.
155-
if (isReadPossible()) {
156-
onDataAvailable();
157-
}
158-
}
159-
160-
private boolean isReadPossible() {
161-
if (!this.channel.isReadResumed()) {
162-
this.channel.resumeReads();
163-
}
164-
return this.channel.isReadResumed();
153+
this.channel.resumeReads();
154+
// We are allowed to try, it will return null if data is not available
155+
onDataAvailable();
165156
}
166157

167158
@Override
168-
protected void suspendReading() {
159+
protected void readingPaused() {
169160
this.channel.suspendReads();
170161
}
171162

172-
@Override
173-
public void onAllDataRead() {
174-
this.channel.getReadSetter().set(null);
175-
this.channel.resumeReads();
176-
super.onAllDataRead();
177-
}
178-
179163
@Override
180164
@Nullable
181165
protected DataBuffer read() throws IOException {

spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ protected DataBuffer read() throws IOException {
6868
}
6969

7070
@Override
71-
protected void suspendReading() {
71+
protected void readingPaused() {
7272
// No-op
7373
}
7474

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,6 @@ public Mono<Void> send(Publisher<WebSocketMessage> messages) {
149149
*/
150150
protected abstract void resumeReceiving();
151151

152-
/**
153-
* Whether receiving new message(s) is suspended.
154-
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
155-
* flow control for receiving messages, then this method as well as
156-
* {@link #canSuspendReceiving()} should both return {@code false}.
157-
* @return returns {@code true} if receiving new message(s) is suspended,
158-
* or otherwise {@code false}.
159-
* @since 5.0.2
160-
*/
161-
protected abstract boolean isSuspended();
162-
163152
/**
164153
* Send the given WebSocket message.
165154
*/
@@ -231,16 +220,14 @@ private final class WebSocketReceivePublisher extends AbstractListenerReadPublis
231220

232221
@Override
233222
protected void checkOnDataAvailable() {
234-
if (isSuspended()) {
235-
resumeReceiving();
236-
}
223+
resumeReceiving();
237224
if (!this.pendingMessages.isEmpty()) {
238225
onDataAvailable();
239226
}
240227
}
241228

242229
@Override
243-
protected void suspendReading() {
230+
protected void readingPaused() {
244231
suspendReceiving();
245232
}
246233

@@ -250,14 +237,6 @@ protected WebSocketMessage read() throws IOException {
250237
return (WebSocketMessage) this.pendingMessages.poll();
251238
}
252239

253-
@Override
254-
public void onAllDataRead() {
255-
if (isSuspended()) {
256-
resumeReceiving();
257-
}
258-
super.onAllDataRead();
259-
}
260-
261240
void handleMessage(WebSocketMessage webSocketMessage) {
262241
this.pendingMessages.offer(webSocketMessage);
263242
onDataAvailable();

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,10 @@ protected void suspendReceiving() {
7474
@Override
7575
protected void resumeReceiving() {
7676
SuspendToken tokenToUse = this.suspendToken;
77-
Assert.state(tokenToUse != null, "Not suspended");
78-
tokenToUse.resume();
7977
this.suspendToken = null;
80-
}
81-
82-
@Override
83-
protected boolean isSuspended() {
84-
return this.suspendToken != null;
78+
if (tokenToUse != null) {
79+
tokenToUse.resume();
80+
}
8581
}
8682

8783
@Override

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ protected void resumeReceiving() {
7171
// no-op
7272
}
7373

74-
@Override
75-
protected boolean isSuspended() {
76-
return false;
77-
}
78-
7974
@Override
8075
protected boolean sendMessage(WebSocketMessage message) throws IOException {
8176
ByteBuffer buffer = message.getPayload().asByteBuffer();

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,4 @@ protected void resumeReceiving() {
7171
}
7272
}
7373

74-
@Override
75-
protected boolean isSuspended() {
76-
return this.suspended == 1;
77-
}
78-
7974
}

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ protected void resumeReceiving() {
7171
getDelegate().resumeReceives();
7272
}
7373

74-
@Override
75-
protected boolean isSuspended() {
76-
return !getDelegate().isReceivesResumed();
77-
}
78-
7974
@Override
8075
protected boolean sendMessage(WebSocketMessage message) throws IOException {
8176
ByteBuffer buffer = message.getPayload().asByteBuffer();

0 commit comments

Comments
 (0)