Skip to content

Commit d8099ad

Browse files
violetaggrstoyanchev
authored andcommitted
AbstractListenerWebSocketSession: suspend the channel when there is no demand
Issues: SPR-16207
1 parent 06b2ab3 commit d8099ad

File tree

5 files changed

+64
-13
lines changed

5 files changed

+64
-13
lines changed

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.web.reactive.socket.adapter;
1818

1919
import java.io.IOException;
20+
import java.util.Queue;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
import org.reactivestreams.Publisher;
@@ -25,6 +26,7 @@
2526
import reactor.core.publisher.Flux;
2627
import reactor.core.publisher.Mono;
2728
import reactor.core.publisher.MonoProcessor;
29+
import reactor.util.concurrent.Queues;
2830

2931
import org.springframework.core.io.buffer.DataBufferFactory;
3032
import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
@@ -147,6 +149,17 @@ public Mono<Void> send(Publisher<WebSocketMessage> messages) {
147149
*/
148150
protected abstract void resumeReceiving();
149151

152+
/**
153+
* Returns {@code true} if receiving new message(s) is suspended otherwise
154+
* {@code false}.
155+
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
156+
* flow control for receiving messages, and this method should return
157+
* {@code false} and {@link #canSuspendReceiving()} should return {@code false}.
158+
* @return returns {@code true} if receiving new message(s) is suspended
159+
* otherwise {@code false}.
160+
*/
161+
protected abstract boolean isSuspended();
162+
150163
/**
151164
* Send the given WebSocket message.
152165
*/
@@ -213,32 +226,39 @@ public void onComplete() {
213226

214227
private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {
215228

216-
@Nullable
217-
private volatile WebSocketMessage webSocketMessage;
229+
private volatile Queue<Object> pendingWebSocketMessages = Queues.unbounded().get();
218230

219231
@Override
220232
protected void checkOnDataAvailable() {
221-
if (this.webSocketMessage != null) {
233+
if (isSuspended()) {
234+
resumeReceiving();
235+
}
236+
if (!pendingWebSocketMessages.isEmpty()) {
222237
onDataAvailable();
223238
}
224239
}
225240

241+
@Override
242+
protected void suspendReading() {
243+
suspendReceiving();
244+
}
245+
226246
@Override
227247
@Nullable
228248
protected WebSocketMessage read() throws IOException {
229-
if (this.webSocketMessage != null) {
230-
WebSocketMessage result = this.webSocketMessage;
231-
this.webSocketMessage = null;
249+
return (WebSocketMessage) pendingWebSocketMessages.poll();
250+
}
251+
252+
@Override
253+
public void onAllDataRead() {
254+
if (isSuspended()) {
232255
resumeReceiving();
233-
return result;
234256
}
235-
236-
return null;
257+
super.onAllDataRead();
237258
}
238259

239260
void handleMessage(WebSocketMessage webSocketMessage) {
240-
this.webSocketMessage = webSocketMessage;
241-
suspendReceiving();
261+
this.pendingWebSocketMessages.offer(webSocketMessage);
242262
onDataAvailable();
243263
}
244264
}

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ protected void resumeReceiving() {
7979
this.suspendToken = null;
8080
}
8181

82+
@Override
83+
protected boolean isSuspended() {
84+
return this.suspendToken != null;
85+
}
86+
8287
@Override
8388
protected boolean sendMessage(WebSocketMessage message) throws IOException {
8489
ByteBuffer buffer = message.getPayload().asByteBuffer();

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ protected void resumeReceiving() {
7171
// no-op
7272
}
7373

74+
@Override
75+
protected boolean isSuspended() {
76+
return false;
77+
}
78+
7479
@Override
7580
protected boolean sendMessage(WebSocketMessage message) throws IOException {
7681
ByteBuffer buffer = message.getPayload().asByteBuffer();

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.web.reactive.socket.adapter;
1818

19+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20+
1921
import javax.websocket.Session;
2022

2123
import org.apache.tomcat.websocket.WsSession;
@@ -33,6 +35,9 @@
3335
* @since 5.0
3436
*/
3537
public class TomcatWebSocketSession extends StandardWebSocketSession {
38+
private static final AtomicIntegerFieldUpdater<TomcatWebSocketSession> SUSPENDED =
39+
AtomicIntegerFieldUpdater.newUpdater(TomcatWebSocketSession.class, "suspended");
40+
private volatile int suspended;
3641

3742

3843
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
@@ -53,12 +58,21 @@ protected boolean canSuspendReceiving() {
5358

5459
@Override
5560
protected void suspendReceiving() {
56-
((WsSession) getDelegate()).suspend();
61+
if (SUSPENDED.compareAndSet(this, 0, 1)) {
62+
((WsSession) getDelegate()).suspend();
63+
}
5764
}
5865

5966
@Override
6067
protected void resumeReceiving() {
61-
((WsSession) getDelegate()).resume();
68+
if (SUSPENDED.compareAndSet(this, 1, 0)) {
69+
((WsSession) getDelegate()).resume();
70+
}
71+
}
72+
73+
@Override
74+
protected boolean isSuspended() {
75+
return this.suspended == 1;
6276
}
6377

6478
}

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,21 @@ protected boolean canSuspendReceiving() {
6161
return true;
6262
}
6363

64+
@Override
6465
protected void suspendReceiving() {
6566
getDelegate().suspendReceives();
6667
}
6768

69+
@Override
6870
protected void resumeReceiving() {
6971
getDelegate().resumeReceives();
7072
}
7173

74+
@Override
75+
protected boolean isSuspended() {
76+
return !getDelegate().isReceivesResumed();
77+
}
78+
7279
@Override
7380
protected boolean sendMessage(WebSocketMessage message) throws IOException {
7481
ByteBuffer buffer = message.getPayload().asByteBuffer();

0 commit comments

Comments
 (0)