Skip to content

Commit 1e9960e

Browse files
committed
Add WebSocketTransportRegistration
Issue: SPR-11527
1 parent 545c4ef commit 1e9960e

File tree

8 files changed

+218
-17
lines changed

8 files changed

+218
-17
lines changed

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,11 @@
3131
*/
3232
public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer {
3333

34+
35+
@Override
36+
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
37+
}
38+
3439
@Override
3540
public void configureClientInboundChannel(ChannelRegistration registration) {
3641
}

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,35 +58,49 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry {
5858

5959

6060
public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler,
61-
UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler,
62-
MessageBrokerRegistry brokerRegistry) {
61+
WebSocketTransportRegistration transportRegistration, UserSessionRegistry userSessionRegistry,
62+
TaskScheduler defaultSockJsTaskScheduler, MessageBrokerRegistry brokerRegistry) {
6363

64-
Assert.notNull(webSocketHandler);
65-
Assert.notNull(userSessionRegistry);
64+
Assert.notNull(webSocketHandler, "'webSocketHandler' is required ");
65+
Assert.notNull(transportRegistration, "'transportRegistration' is required");
66+
Assert.notNull(userSessionRegistry, "'userSessionRegistry' is required");
6667

6768
this.webSocketHandler = webSocketHandler;
6869
this.subProtocolWebSocketHandler = unwrapSubProtocolWebSocketHandler(webSocketHandler);
70+
71+
if (transportRegistration.getSendTimeLimit() != null) {
72+
this.subProtocolWebSocketHandler.setSendTimeLimit(transportRegistration.getSendTimeLimit());
73+
}
74+
if (transportRegistration.getSendBufferSizeLimit() != null) {
75+
this.subProtocolWebSocketHandler.setSendBufferSizeLimit(transportRegistration.getSendBufferSizeLimit());
76+
}
77+
6978
this.stompHandler = new StompSubProtocolHandler();
7079
this.stompHandler.setUserSessionRegistry(userSessionRegistry);
80+
81+
if (transportRegistration.getMessageBufferSizeLimit() != null) {
82+
this.stompHandler.setMessageBufferSizeLimit(transportRegistration.getMessageBufferSizeLimit());
83+
}
84+
7185
this.sockJsScheduler = defaultSockJsTaskScheduler;
86+
7287
if(brokerRegistry.getMessageBufferSizeLimit() != null) {
7388
this.stompHandler.setMessageBufferSizeLimit(brokerRegistry.getMessageBufferSizeLimit());
7489
}
7590
}
7691

77-
private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) {
78-
WebSocketHandler actual = WebSocketHandlerDecorator.unwrap(webSocketHandler);
79-
Assert.isInstanceOf(SubProtocolWebSocketHandler.class, actual,
80-
"No SubProtocolWebSocketHandler found: " + webSocketHandler);
92+
private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler wsHandler) {
93+
WebSocketHandler actual = WebSocketHandlerDecorator.unwrap(wsHandler);
94+
Assert.isInstanceOf(SubProtocolWebSocketHandler.class, actual, "No SubProtocolWebSocketHandler in " + wsHandler);
8195
return (SubProtocolWebSocketHandler) actual;
8296
}
8397

8498

8599
@Override
86100
public StompWebSocketEndpointRegistration addEndpoint(String... paths) {
87101
this.subProtocolWebSocketHandler.addProtocolHandler(this.stompHandler);
88-
WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration(
89-
paths, this.webSocketHandler, this.sockJsScheduler);
102+
WebMvcStompWebSocketEndpointRegistration registration =
103+
new WebMvcStompWebSocketEndpointRegistration(paths, this.webSocketHandler, this.sockJsScheduler);
90104
this.registrations.add(registration);
91105
return registration;
92106
}

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.springframework.context.annotation.Bean;
2020
import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration;
21+
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
22+
import org.springframework.messaging.simp.user.UserSessionRegistry;
2123
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
2224
import org.springframework.web.servlet.HandlerMapping;
2325
import org.springframework.web.socket.WebSocketHandler;
@@ -36,15 +38,26 @@
3638
*/
3739
public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration {
3840

41+
private WebSocketTransportRegistration transportRegistration;
42+
43+
3944
protected WebSocketMessageBrokerConfigurationSupport() {
4045
}
4146

4247
@Bean
4348
public HandlerMapping stompWebSocketHandlerMapping() {
49+
50+
WebSocketHandler webSocketHandler = subProtocolWebSocketHandler();
51+
UserSessionRegistry sessionRegistry = userSessionRegistry();
52+
WebSocketTransportRegistration transportRegistration = getTransportRegistration();
53+
ThreadPoolTaskScheduler taskScheduler = messageBrokerSockJsTaskScheduler();
54+
MessageBrokerRegistry brokerRegistry = getBrokerRegistry();
55+
4456
WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
45-
subProtocolWebSocketHandler(), userSessionRegistry(),
46-
messageBrokerSockJsTaskScheduler(), getBrokerRegistry());
57+
webSocketHandler, transportRegistration, sessionRegistry, taskScheduler, brokerRegistry);
58+
4759
registerStompEndpoints(registry);
60+
4861
return registry.getHandlerMapping();
4962
}
5063

@@ -53,6 +66,17 @@ public WebSocketHandler subProtocolWebSocketHandler() {
5366
return new SubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
5467
}
5568

69+
protected final WebSocketTransportRegistration getTransportRegistration() {
70+
if (this.transportRegistration == null) {
71+
this.transportRegistration = new WebSocketTransportRegistration();
72+
configureWebSocketTransport(this.transportRegistration);
73+
}
74+
return this.transportRegistration;
75+
}
76+
77+
protected void configureWebSocketTransport(WebSocketTransportRegistration registry) {
78+
}
79+
5680
/**
5781
* The default TaskScheduler to use if none is configured via
5882
* {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e.

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,10 +35,17 @@
3535
public interface WebSocketMessageBrokerConfigurer {
3636

3737
/**
38-
* Configure STOMP over WebSocket endpoints.
38+
* Register STOMP endpoints mapping each to a specific URL and (optionally)
39+
* enabling and configuring SockJS fallback options.
3940
*/
4041
void registerStompEndpoints(StompEndpointRegistry registry);
4142

43+
/**
44+
* Configure options related to the processing of messages received from and
45+
* sent to WebSocket clients.
46+
*/
47+
void configureWebSocketTransport(WebSocketTransportRegistration registry);
48+
4249
/**
4350
* Configure the {@link org.springframework.messaging.MessageChannel} used for
4451
* incoming messages from WebSocket clients. By default the channel is backed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2002-2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.socket.config.annotation;
17+
18+
19+
/**
20+
* Configure the processing of messages received from and sent to WebSocket clients.
21+
*
22+
* @author Rossen Stoyanchev
23+
* @since 4.0.3
24+
*/
25+
public class WebSocketTransportRegistration {
26+
27+
private Integer messageBufferSizeLimit;
28+
29+
private Integer sendTimeLimit;
30+
31+
private Integer sendBufferSizeLimit;
32+
33+
34+
/**
35+
* Configure the maximum size of the buffer to use when an incoming message
36+
* for a sub-protocol (e.g. STOMP) has been split into multiple WebSocket
37+
* messages or multiple HTTP POSTs when SockJS fallback options are in use.
38+
*
39+
* <p>In theory a WebSocket message can be almost unlimited in size.
40+
* In practice WebSocket servers impose limits on incoming message size.
41+
* STOMP clients for example tend to split large messages around 16K
42+
* boundaries. Therefore a server must be able to buffer partial content
43+
* and decode when enough data is received. Use this property to configure
44+
* the max size of the buffer to use.
45+
*
46+
* <p>The default value is 64K (i.e. 64 * 1024).
47+
*
48+
* <p><strong>NOTE</strong> that the current version 1.2 of the STOMP spec
49+
* does not specifically discuss how to send STOMP messages over WebSocket.
50+
* Version 2 of the spec will but in the mean time existing client libraries
51+
* have already established a practice that servers must handle.
52+
*/
53+
public WebSocketTransportRegistration setMessageBufferSizeLimit(int bufferSizeLimit) {
54+
this.messageBufferSizeLimit = bufferSizeLimit;
55+
return this;
56+
}
57+
58+
/**
59+
* Protected accessor for internal use.
60+
*/
61+
protected Integer getMessageBufferSizeLimit() {
62+
return this.messageBufferSizeLimit;
63+
}
64+
65+
/**
66+
* Configure a time limit (in milliseconds) for the maximum amount of a time
67+
* allowed when sending messages to a WebSocket session or writing to an
68+
* HTTP response when SockJS fallback option are in use.
69+
*
70+
* <p>In general WebSocket servers expect that messages to a single WebSocket
71+
* session are sent from a single thread at a time. This is automatically
72+
* guaranteed when using {@code @EnableWebSocketMessageBroker} configuration.
73+
* If message sending is slow, or at least slower than rate of messages sending,
74+
* subsequent messages are buffered until either the {@code sendTimeLimit}
75+
* or the {@code sendBufferSizeLimit} are reached at which point the session
76+
* state is cleared and an attempt is made to close the session.
77+
*
78+
* <p><strong>NOTE</strong> that the session time limit is checked only
79+
* on attempts to send additional messages. So if only a single message is
80+
* sent and it hangs, the session will not time out until another message is
81+
* sent or the underlying physical socket times out. So this is not a
82+
* replacement for WebSocket server or HTTP connection timeout but is rather
83+
* intended to control the extent of buffering of unsent messages.
84+
*
85+
* <p><strong>NOTE</strong> that closing the session may not succeed in
86+
* actually closing the physical socket and may also hang. This is true
87+
* especially when using blocking IO such as the BIO connector in Tomcat
88+
* that is used by default on Tomcat 7. Therefore it is recommended to ensure
89+
* the server is using non-blocking IO such as Tomcat's NIO connector that
90+
* is used by default on Tomcat 8. If you must use blocking IO consider
91+
* customizing OS-level TCP settings, for example
92+
* {@code /proc/sys/net/ipv4/tcp_retries2} on Linux.
93+
*
94+
* <p>The default value is 10 seconds (i.e. 10 * 10000).
95+
*
96+
* @param timeLimit the timeout value in milliseconds; the value must be
97+
* greater than 0, otherwise it is ignored.
98+
*/
99+
public WebSocketTransportRegistration setSendTimeLimit(int timeLimit) {
100+
this.sendTimeLimit = timeLimit;
101+
return this;
102+
}
103+
104+
/**
105+
* Protected accessor for internal use.
106+
*/
107+
protected Integer getSendTimeLimit() {
108+
return this.sendTimeLimit;
109+
}
110+
111+
/**
112+
* Configure the maximum amount of data to buffer when sending messages
113+
* to a WebSocket session, or an HTTP response when SockJS fallback
114+
* option are in use.
115+
*
116+
* <p>In general WebSocket servers expect that messages to a single WebSocket
117+
* session are sent from a single thread at a time. This is automatically
118+
* guaranteed when using {@code @EnableWebSocketMessageBroker} configuration.
119+
* If message sending is slow, or at least slower than rate of messages sending,
120+
* subsequent messages are buffered until either the {@code sendTimeLimit}
121+
* or the {@code sendBufferSizeLimit} are reached at which point the session
122+
* state is cleared and an attempt is made to close the session.
123+
*
124+
* <p><strong>NOTE</strong> that closing the session may not succeed in
125+
* actually closing the physical socket and may also hang. This is true
126+
* especially when using blocking IO such as the BIO connector in Tomcat
127+
* configured by default on Tomcat 7. Therefore it is recommended to ensure
128+
* the server is using non-blocking IO such as Tomcat's NIO connector used
129+
* by default on Tomcat 8. If you must use blocking IO consider customizing
130+
* OS-level TCP settings, for example {@code /proc/sys/net/ipv4/tcp_retries2}
131+
* on Linux.
132+
*
133+
* <p>The default value is 512K (i.e. 512 * 1024).
134+
*
135+
* @param sendBufferSizeLimit the maximum number of bytes to buffer when
136+
* sending messages; if the value is less than or equal to 0 then buffering
137+
* is effectively disabled.
138+
*/
139+
public WebSocketTransportRegistration setSendBufferSizeLimit(int sendBufferSizeLimit) {
140+
this.sendBufferSizeLimit = sendBufferSizeLimit;
141+
return this;
142+
}
143+
144+
/**
145+
* Protected accessor for internal use.
146+
*/
147+
protected Integer getSendBufferSizeLimit() {
148+
return this.sendBufferSizeLimit;
149+
}
150+
}

spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.commons.logging.Log;
2020
import org.apache.commons.logging.LogFactory;
21+
import org.springframework.util.Assert;
2122
import org.springframework.web.socket.CloseStatus;
2223
import org.springframework.web.socket.WebSocketMessage;
2324
import org.springframework.web.socket.WebSocketSession;

spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public class SubProtocolWebSocketHandler
7878

7979
private int sendTimeLimit = 10 * 1000;
8080

81-
private int sendBufferSizeLimit = 64 * 1024;
81+
private int sendBufferSizeLimit = 512 * 1024;
8282

8383
private Object lifecycleMonitor = new Object();
8484

spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void setup() {
5858
this.userSessionRegistry = new DefaultUserSessionRegistry();
5959
this.messageBrokerRegistry = new MessageBrokerRegistry(inChannel, outChannel);
6060
TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class);
61-
this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, userSessionRegistry,
61+
this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, transportRegistration, userSessionRegistry,
6262
taskScheduler, messageBrokerRegistry);
6363
}
6464

0 commit comments

Comments
 (0)