Skip to content

Commit a247d5f

Browse files
committed
Guard against exceptions from ApplicationListener
Issue: SPR-11578
1 parent 8780464 commit a247d5f

File tree

2 files changed

+67
-15
lines changed

2 files changed

+67
-15
lines changed

spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.commons.logging.Log;
2929
import org.apache.commons.logging.LogFactory;
3030

31+
import org.springframework.context.ApplicationEvent;
3132
import org.springframework.context.ApplicationEventPublisher;
3233
import org.springframework.context.ApplicationEventPublisherAware;
3334
import org.springframework.messaging.Message;
@@ -62,12 +63,10 @@
6263
public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationEventPublisherAware {
6364

6465
/**
65-
* This protocol handler supports assembling large STOMP messages split into
66-
* multiple WebSocket messages. STOMP clients (like stomp.js) split large STOMP
67-
* messages at 16K boundaries.
68-
*
69-
* <p>We need to ensure the WebSocket server buffer is configured to support
70-
* that size at a minimum plus a little extra for any potential SockJS framing.
66+
* This handler supports assembling large STOMP messages split into multiple
67+
* WebSocket messages and STOMP clients (like stomp.js) indeed split large STOMP
68+
* messages at 16K boundaries. Therefore the WebSocket server input message
69+
* buffer size must allow 16K at least plus a little extra for SockJS framing.
7170
*/
7271
public static final int MINIMUM_WEBSOCKET_MESSAGE_SIZE = 16 * 1024 + 256;
7372

@@ -188,8 +187,8 @@ public void handleMessageFromClient(WebSocketSession session,
188187

189188
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
190189

191-
if (SimpMessageType.CONNECT.equals(headers.getMessageType()) && this.eventPublisher != null) {
192-
this.eventPublisher.publishEvent(new SessionConnectEvent(this, message));
190+
if (this.eventPublisher != null && StompCommand.CONNECT.equals(headers.getMessageType())) {
191+
publishEvent(new SessionConnectEvent(this, message));
193192
}
194193

195194
outputChannel.send(message);
@@ -201,6 +200,15 @@ public void handleMessageFromClient(WebSocketSession session,
201200
}
202201
}
203202

203+
private void publishEvent(ApplicationEvent event) {
204+
try {
205+
this.eventPublisher.publishEvent(event);
206+
}
207+
catch (Throwable ex) {
208+
logger.error("Failed to publish event " + event, ex);
209+
}
210+
}
211+
204212
protected void sendErrorMessage(WebSocketSession session, Throwable error) {
205213

206214
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
@@ -257,8 +265,8 @@ else if (SimpMessageType.MESSAGE.equals(headers.getMessageType())) {
257265
try {
258266
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
259267

260-
if (headers.getCommand() == StompCommand.CONNECTED && this.eventPublisher != null) {
261-
this.eventPublisher.publishEvent(new SessionConnectedEvent(this, (Message<byte[]>) message));
268+
if (this.eventPublisher != null && StompCommand.CONNECTED.equals(headers.getMessageType())) {
269+
publishEvent(new SessionConnectedEvent(this, (Message<byte[]>) message));
262270
}
263271

264272
byte[] bytes = this.stompEncoder.encode((Message<byte[]>) message);
@@ -364,7 +372,7 @@ public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus,
364372
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
365373

366374
if (this.eventPublisher != null) {
367-
this.eventPublisher.publishEvent(new SessionDisconnectEvent(this, session.getId(), closeStatus));
375+
publishEvent(new SessionDisconnectEvent(this, session.getId(), closeStatus));
368376
}
369377

370378
outputChannel.send(message);

spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.web.socket.sockjs.transport.SockJsSession;
5151

5252
import static org.junit.Assert.*;
53+
import static org.junit.Assert.assertEquals;
5354
import static org.mockito.Mockito.*;
5455

5556
/**
@@ -172,13 +173,13 @@ public void eventPublication() {
172173
this.protocolHandler.afterSessionStarted(this.session, this.channel);
173174

174175
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
175-
TextMessage textMessage = new TextMessage(new StompEncoder().encode(
176-
MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()));
176+
Message<byte[]> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
177+
TextMessage textMessage = new TextMessage(new StompEncoder().encode(message));
177178
this.protocolHandler.handleMessageFromClient(this.session, textMessage, this.channel);
178179

179180
headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
180-
Message<byte[]> connectedMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
181-
this.protocolHandler.handleMessageToClient(this.session, connectedMessage);
181+
message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
182+
this.protocolHandler.handleMessageToClient(this.session, message);
182183

183184
this.protocolHandler.afterSessionEnded(this.session, CloseStatus.BAD_DATA, this.channel);
184185

@@ -188,6 +189,49 @@ public void eventPublication() {
188189
assertEquals(SessionDisconnectEvent.class, publisher.events.get(2).getClass());
189190
}
190191

192+
@Test
193+
public void eventPublicationWithExceptions() {
194+
195+
ApplicationEventPublisher publisher = new ApplicationEventPublisher() {
196+
197+
@Override
198+
public void publishEvent(ApplicationEvent event) {
199+
throw new IllegalStateException();
200+
}
201+
};
202+
203+
UserSessionRegistry registry = new DefaultUserSessionRegistry();
204+
this.protocolHandler.setUserSessionRegistry(registry);
205+
this.protocolHandler.setApplicationEventPublisher(publisher);
206+
this.protocolHandler.afterSessionStarted(this.session, this.channel);
207+
208+
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
209+
Message<byte[]> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
210+
TextMessage textMessage = new TextMessage(new StompEncoder().encode(message));
211+
this.protocolHandler.handleMessageFromClient(this.session, textMessage, this.channel);
212+
213+
verify(this.channel).send(this.messageCaptor.capture());
214+
Message<?> actual = this.messageCaptor.getValue();
215+
assertNotNull(actual);
216+
assertEquals(StompCommand.CONNECT, StompHeaderAccessor.wrap(actual).getCommand());
217+
reset(this.channel);
218+
219+
headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
220+
message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
221+
this.protocolHandler.handleMessageToClient(this.session, message);
222+
223+
assertEquals(1, this.session.getSentMessages().size());
224+
textMessage = (TextMessage) this.session.getSentMessages().get(0);
225+
assertEquals("CONNECTED\n" + "user-name:joe\n" + "\n" + "\u0000", textMessage.getPayload());
226+
227+
this.protocolHandler.afterSessionEnded(this.session, CloseStatus.BAD_DATA, this.channel);
228+
229+
verify(this.channel).send(this.messageCaptor.capture());
230+
actual = this.messageCaptor.getValue();
231+
assertNotNull(actual);
232+
assertEquals(StompCommand.DISCONNECT, StompHeaderAccessor.wrap(actual).getCommand());
233+
}
234+
191235
@Test
192236
public void handleMessageToClientUserDestination() {
193237

0 commit comments

Comments
 (0)