Skip to content

Commit c11484b

Browse files
committed
Add WebSocket transport configuration support
Issue: SPR-11527
1 parent 1e9960e commit c11484b

File tree

14 files changed

+184
-100
lines changed

14 files changed

+184
-100
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ public class MessageBrokerRegistry {
4747

4848
private ChannelRegistration brokerChannelRegistration = new ChannelRegistration();
4949

50-
private Integer messageBufferSizeLimit;
51-
5250

5351
public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) {
5452
Assert.notNull(clientInboundChannel);
@@ -121,23 +119,6 @@ public ChannelRegistration configureBrokerChannel() {
121119
return this.brokerChannelRegistration;
122120
}
123121

124-
/**
125-
* Configure the message buffer size limit in bytes.
126-
* @since 4.0.3
127-
*/
128-
public MessageBrokerRegistry setMessageBufferSizeLimit(Integer messageBufferSizeLimit) {
129-
this.messageBufferSizeLimit = messageBufferSizeLimit;
130-
return this;
131-
}
132-
133-
/**
134-
* Get the message buffer size limit in bytes.
135-
* @since 4.0.3
136-
*/
137-
public Integer getMessageBufferSizeLimit() {
138-
return this.messageBufferSizeLimit;
139-
}
140-
141122
protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {
142123
if ((this.simpleBrokerRegistration == null) && (this.brokerRelayRegistration == null)) {
143124
enableSimpleBroker();

spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,8 @@ public BeanDefinition parse(Element element, ParserContext parserCxt) {
124124
beanName = registerBeanDef(beanDef, parserCxt, source);
125125
RuntimeBeanReference userSessionRegistry = new RuntimeBeanReference(beanName);
126126

127-
String frameBufferSizeAttribute = element.getAttribute("message-buffer-size");
128-
Integer messageBufferSizeLimit = frameBufferSizeAttribute.isEmpty() ? null : Integer.parseInt(frameBufferSizeAttribute);
129-
130127
RuntimeBeanReference subProtocolWsHandler = registerSubProtocolWebSocketHandler(
131-
clientInChannel, clientOutChannel, userSessionRegistry, messageBufferSizeLimit, parserCxt, source);
128+
element, clientInChannel, clientOutChannel, userSessionRegistry, parserCxt, source);
132129

133130
for(Element stompEndpointElem : DomUtils.getChildElementsByTagName(element, "stomp-endpoint")) {
134131

@@ -229,16 +226,12 @@ else if (!channelName.equals("brokerChannel")) {
229226
return new RuntimeBeanReference(channelName);
230227
}
231228

232-
private RuntimeBeanReference registerSubProtocolWebSocketHandler(
229+
private RuntimeBeanReference registerSubProtocolWebSocketHandler(Element element,
233230
RuntimeBeanReference clientInChannel, RuntimeBeanReference clientOutChannel,
234-
RuntimeBeanReference userSessionRegistry, Integer messageBufferSizeLimit,
235-
ParserContext parserCxt, Object source) {
231+
RuntimeBeanReference userSessionRegistry, ParserContext parserCxt, Object source) {
236232

237233
RootBeanDefinition stompHandlerDef = new RootBeanDefinition(StompSubProtocolHandler.class);
238234
stompHandlerDef.getPropertyValues().add("userSessionRegistry", userSessionRegistry);
239-
if(messageBufferSizeLimit != null) {
240-
stompHandlerDef.getPropertyValues().add("messageBufferSizeLimit", messageBufferSizeLimit);
241-
}
242235
registerBeanDef(stompHandlerDef, parserCxt, source);
243236

244237
ConstructorArgumentValues cavs = new ConstructorArgumentValues();
@@ -248,6 +241,23 @@ private RuntimeBeanReference registerSubProtocolWebSocketHandler(
248241
RootBeanDefinition subProtocolWshDef = new RootBeanDefinition(SubProtocolWebSocketHandler.class, cavs, null);
249242
subProtocolWshDef.getPropertyValues().addPropertyValue("protocolHandlers", stompHandlerDef);
250243
String subProtocolWshName = registerBeanDef(subProtocolWshDef, parserCxt, source);
244+
245+
Element transportElem = DomUtils.getChildElementByTagName(element, "transport");
246+
if (transportElem != null) {
247+
String messageSize = transportElem.getAttribute("message-size");
248+
if (messageSize != null) {
249+
stompHandlerDef.getPropertyValues().add("messageSizeLimit", messageSize);
250+
}
251+
String sendTimeLimit = transportElem.getAttribute("send-timeout");
252+
if (sendTimeLimit != null) {
253+
subProtocolWshDef.getPropertyValues().add("sendTimeLimit", sendTimeLimit);
254+
}
255+
String sendBufferSizeLimit = transportElem.getAttribute("send-buffer-size");
256+
if (sendBufferSizeLimit != null) {
257+
subProtocolWshDef.getPropertyValues().add("sendBufferSizeLimit", sendBufferSizeLimit);
258+
}
259+
}
260+
251261
return new RuntimeBeanReference(subProtocolWshName);
252262
}
253263

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSoc
3333

3434

3535
@Override
36-
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
36+
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
3737
}
3838

3939
@Override

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ protected void registerStompEndpoints(StompEndpointRegistry registry) {
5858
}
5959
}
6060

61+
@Override
62+
protected void configureWebSocketTransport(WebSocketTransportRegistration registration) {
63+
for (WebSocketMessageBrokerConfigurer c : this.configurers) {
64+
c.configureWebSocketTransport(registration);
65+
}
66+
}
67+
6168
@Override
6269
protected void configureClientInboundChannel(ChannelRegistration registration) {
6370
for (WebSocketMessageBrokerConfigurer c : this.configurers) {

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
2121
import java.util.List;
2222
import java.util.Map;
2323

24-
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
2524
import org.springframework.messaging.simp.user.UserSessionRegistry;
2625
import org.springframework.scheduling.TaskScheduler;
2726
import org.springframework.util.Assert;
@@ -58,8 +57,8 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry {
5857

5958

6059
public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler,
61-
WebSocketTransportRegistration transportRegistration, UserSessionRegistry userSessionRegistry,
62-
TaskScheduler defaultSockJsTaskScheduler, MessageBrokerRegistry brokerRegistry) {
60+
WebSocketTransportRegistration transportRegistration,
61+
UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) {
6362

6463
Assert.notNull(webSocketHandler, "'webSocketHandler' is required ");
6564
Assert.notNull(transportRegistration, "'transportRegistration' is required");
@@ -78,15 +77,11 @@ public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler,
7877
this.stompHandler = new StompSubProtocolHandler();
7978
this.stompHandler.setUserSessionRegistry(userSessionRegistry);
8079

81-
if (transportRegistration.getMessageBufferSizeLimit() != null) {
82-
this.stompHandler.setMessageBufferSizeLimit(transportRegistration.getMessageBufferSizeLimit());
80+
if (transportRegistration.getMessageSizeLimit() != null) {
81+
this.stompHandler.setMessageSizeLimit(transportRegistration.getMessageSizeLimit());
8382
}
8483

8584
this.sockJsScheduler = defaultSockJsTaskScheduler;
86-
87-
if(brokerRegistry.getMessageBufferSizeLimit() != null) {
88-
this.stompHandler.setMessageBufferSizeLimit(brokerRegistry.getMessageBufferSizeLimit());
89-
}
9085
}
9186

9287
private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler wsHandler) {

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,9 @@ public HandlerMapping stompWebSocketHandlerMapping() {
5151
UserSessionRegistry sessionRegistry = userSessionRegistry();
5252
WebSocketTransportRegistration transportRegistration = getTransportRegistration();
5353
ThreadPoolTaskScheduler taskScheduler = messageBrokerSockJsTaskScheduler();
54-
MessageBrokerRegistry brokerRegistry = getBrokerRegistry();
5554

5655
WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
57-
webSocketHandler, transportRegistration, sessionRegistry, taskScheduler, brokerRegistry);
56+
webSocketHandler, transportRegistration, sessionRegistry, taskScheduler);
5857

5958
registerStompEndpoints(registry);
6059

spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketTransportRegistration.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,17 @@
2424
*/
2525
public class WebSocketTransportRegistration {
2626

27-
private Integer messageBufferSizeLimit;
27+
private Integer messageSizeLimit;
2828

2929
private Integer sendTimeLimit;
3030

3131
private Integer sendBufferSizeLimit;
3232

3333

3434
/**
35-
* Configure the maximum size of the buffer to use when an incoming message
36-
* for a sub-protocol (e.g. STOMP) has been split into multiple WebSocket
37-
* messages or multiple HTTP POSTs when SockJS fallback options are in use.
35+
* Configure the maximum size for an incoming sub-protocol message.
36+
* For example a STOMP message may be received as multiple WebSocket messages
37+
* or multiple HTTP POST requests when SockJS fallback options are in use.
3838
*
3939
* <p>In theory a WebSocket message can be almost unlimited in size.
4040
* In practice WebSocket servers impose limits on incoming message size.
@@ -50,16 +50,16 @@ public class WebSocketTransportRegistration {
5050
* Version 2 of the spec will but in the mean time existing client libraries
5151
* have already established a practice that servers must handle.
5252
*/
53-
public WebSocketTransportRegistration setMessageBufferSizeLimit(int bufferSizeLimit) {
54-
this.messageBufferSizeLimit = bufferSizeLimit;
53+
public WebSocketTransportRegistration setMessageSizeLimit(int messageSizeLimit) {
54+
this.messageSizeLimit = messageSizeLimit;
5555
return this;
5656
}
5757

5858
/**
5959
* Protected accessor for internal use.
6060
*/
61-
protected Integer getMessageBufferSizeLimit() {
62-
return this.messageBufferSizeLimit;
61+
protected Integer getMessageSizeLimit() {
62+
return this.messageSizeLimit;
6363
}
6464

6565
/**

spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.springframework.messaging.simp.stomp.BufferingStompDecoder;
3535
import org.springframework.messaging.simp.stomp.StompCommand;
3636
import org.springframework.messaging.simp.stomp.StompConversionException;
37-
import org.springframework.messaging.simp.stomp.StompDecoder;
3837
import org.springframework.messaging.simp.stomp.StompEncoder;
3938
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
4039
import org.springframework.messaging.simp.user.DestinationUserNameProvider;
@@ -69,7 +68,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler {
6968
private static final Log logger = LogFactory.getLog(StompSubProtocolHandler.class);
7069

7170

72-
private int messageBufferSizeLimit = 64 * 1024;
71+
private int messageSizeLimit = 64 * 1024;
7372

7473
private final Map<String, BufferingStompDecoder> decoders = new ConcurrentHashMap<String, BufferingStompDecoder>();
7574

@@ -79,29 +78,26 @@ public class StompSubProtocolHandler implements SubProtocolHandler {
7978

8079

8180
/**
82-
* Configure the maximum size of the buffer used when a STOMP message has been
83-
* split over multiple WebSocket messages.
84-
*
85-
* <p>While the STOMP spec version 1.2 (current as of 4.0.3) does not discuss
86-
* STOMP over WebSocket explicitly, a number of clients already split messages
87-
* around 16K boundaries. Therefore partial content must be buffered before a
88-
* full message can be assembled.
81+
* Configure the maximum size allowed for an incoming STOMP message.
82+
* Since a STOMP message can be received in multiple WebSocket messages,
83+
* buffering may be required and therefore it is necessary to know the maximum
84+
* allowed message size.
8985
*
9086
* <p>By default this property is set to 64K.
9187
*
9288
* @since 4.0.3
9389
*/
94-
public void setMessageBufferSizeLimit(int messageBufferSizeLimit) {
95-
this.messageBufferSizeLimit = messageBufferSizeLimit;
90+
public void setMessageSizeLimit(int messageSizeLimit) {
91+
this.messageSizeLimit = messageSizeLimit;
9692
}
9793

9894
/**
9995
* Get the configured message buffer size limit in bytes.
10096
*
10197
* @since 4.0.3
10298
*/
103-
public int getMessageBufferSizeLimit() {
104-
return this.messageBufferSizeLimit;
99+
public int getMessageSizeLimit() {
100+
return this.messageSizeLimit;
105101
}
106102

107103
/**
@@ -316,7 +312,7 @@ public String resolveSessionId(Message<?> message) {
316312

317313
@Override
318314
public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) {
319-
this.decoders.put(session.getId(), new BufferingStompDecoder(getMessageBufferSizeLimit()));
315+
this.decoders.put(session.getId(), new BufferingStompDecoder(getMessageSizeLimit()));
320316
}
321317

322318
@Override

0 commit comments

Comments
 (0)