@@ -115,9 +115,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
115
115
* Since a STOMP message can be received in multiple WebSocket messages,
116
116
* buffering may be required and therefore it is necessary to know the maximum
117
117
* allowed message size.
118
- *
119
118
* <p>By default this property is set to 64K.
120
- *
121
119
* @since 4.0.3
122
120
*/
123
121
public void setMessageSizeLimit (int messageSizeLimit ) {
@@ -126,7 +124,6 @@ public void setMessageSizeLimit(int messageSizeLimit) {
126
124
127
125
/**
128
126
* Get the configured message buffer size limit in bytes.
129
- *
130
127
* @since 4.0.3
131
128
*/
132
129
public int getMessageSizeLimit () {
@@ -142,7 +139,7 @@ public void setUserSessionRegistry(UserSessionRegistry registry) {
142
139
}
143
140
144
141
/**
145
- * @return the configured UserSessionRegistry.
142
+ * Return the configured UserSessionRegistry.
146
143
*/
147
144
public UserSessionRegistry getUserSessionRegistry () {
148
145
return this .userSessionRegistry ;
@@ -152,7 +149,6 @@ public UserSessionRegistry getUserSessionRegistry() {
152
149
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
153
150
* messages created from decoded STOMP frames and other messages sent to the
154
151
* client inbound channel.
155
- *
156
152
* <p>By default this property is not set.
157
153
*/
158
154
public void setHeaderInitializer (MessageHeaderInitializer headerInitializer ) {
@@ -161,7 +157,7 @@ public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
161
157
}
162
158
163
159
/**
164
- * @return the configured header initializer.
160
+ * Return the configured header initializer.
165
161
*/
166
162
public MessageHeaderInitializer getHeaderInitializer () {
167
163
return this .headerInitializer ;
@@ -274,7 +270,6 @@ else if (StompCommand.UNSUBSCRIBE.equals(headerAccessor.getCommand())) {
274
270
logger .error ("Failed to send client message to application via MessageChannel" +
275
271
" in session " + session .getId () + ". Sending STOMP ERROR to client." , ex );
276
272
sendErrorMessage (session , ex );
277
-
278
273
}
279
274
}
280
275
}
@@ -300,13 +295,14 @@ private void publishEvent(ApplicationEvent event) {
300
295
this .eventPublisher .publishEvent (event );
301
296
}
302
297
catch (Throwable ex ) {
303
- logger .error ("Error publishing " + event + "." , ex );
298
+ logger .error ("Error publishing " + event , ex );
304
299
}
305
300
}
306
301
307
302
protected void sendErrorMessage (WebSocketSession session , Throwable error ) {
308
303
StompHeaderAccessor headerAccessor = StompHeaderAccessor .create (StompCommand .ERROR );
309
304
headerAccessor .setMessage (error .getMessage ());
305
+
310
306
byte [] bytes = this .stompEncoder .encode (headerAccessor .getMessageHeaders (), EMPTY_PAYLOAD );
311
307
try {
312
308
session .sendMessage (new TextMessage (bytes ));
@@ -327,8 +323,10 @@ public void handleMessageToClient(WebSocketSession session, Message<?> message)
327
323
logger .error ("Expected byte[] payload. Ignoring " + message + "." );
328
324
return ;
329
325
}
326
+
330
327
StompHeaderAccessor stompAccessor = getStompHeaderAccessor (message );
331
328
StompCommand command = stompAccessor .getCommand ();
329
+
332
330
if (StompCommand .MESSAGE .equals (command )) {
333
331
if (stompAccessor .getSubscriptionId () == null ) {
334
332
logger .warn ("No STOMP \" subscription\" header in " + message );
@@ -374,7 +372,7 @@ else if (StompCommand.CONNECTED.equals(command)) {
374
372
}
375
373
catch (Throwable ex ) {
376
374
// Could be part of normal workflow (e.g. browser tab closed)
377
- logger .debug ("Failed to send WebSocket message to client in session " + session .getId () + "." , ex );
375
+ logger .debug ("Failed to send WebSocket message to client in session " + session .getId (), ex );
378
376
command = StompCommand .ERROR ;
379
377
}
380
378
finally {
@@ -393,7 +391,7 @@ private StompHeaderAccessor getStompHeaderAccessor(Message<?> message) {
393
391
MessageHeaderAccessor accessor = MessageHeaderAccessor .getAccessor (message , MessageHeaderAccessor .class );
394
392
if (accessor == null ) {
395
393
// Shouldn't happen (only broker broadcasts directly to clients)
396
- throw new IllegalStateException ("No header accessor in " + message + "." );
394
+ throw new IllegalStateException ("No header accessor in " + message );
397
395
}
398
396
StompHeaderAccessor stompAccessor ;
399
397
if (accessor instanceof StompHeaderAccessor ) {
@@ -415,7 +413,7 @@ else if (stompAccessor.getCommand() == null || StompCommand.SEND.equals(stompAcc
415
413
else {
416
414
// Shouldn't happen (only broker broadcasts directly to clients)
417
415
throw new IllegalStateException (
418
- "Unexpected header accessor type: " + accessor .getClass () + " in " + message + "." );
416
+ "Unexpected header accessor type: " + accessor .getClass () + " in " + message );
419
417
}
420
418
return stompAccessor ;
421
419
}
@@ -465,13 +463,15 @@ private StompHeaderAccessor afterStompSessionConnected(Message<?> message, Stomp
465
463
this .userSessionRegistry .registerSessionId (userName , session .getId ());
466
464
}
467
465
}
466
+
468
467
long [] heartbeat = accessor .getHeartbeat ();
469
468
if (heartbeat [1 ] > 0 ) {
470
469
session = WebSocketSessionDecorator .unwrap (session );
471
470
if (session instanceof SockJsSession ) {
472
471
((SockJsSession ) session ).disableHeartbeat ();
473
472
}
474
473
}
474
+
475
475
return accessor ;
476
476
}
477
477
@@ -499,11 +499,13 @@ public void afterSessionStarted(WebSocketSession session, MessageChannel outputC
499
499
@ Override
500
500
public void afterSessionEnded (WebSocketSession session , CloseStatus closeStatus , MessageChannel outputChannel ) {
501
501
this .decoders .remove (session .getId ());
502
+
502
503
Principal principal = session .getPrincipal ();
503
504
if (principal != null && this .userSessionRegistry != null ) {
504
505
String userName = getSessionRegistryUserName (principal );
505
506
this .userSessionRegistry .unregisterSessionId (userName , session .getId ());
506
507
}
508
+
507
509
Message <byte []> message = createDisconnectMessage (session );
508
510
SimpAttributes simpAttributes = SimpAttributes .fromMessage (message );
509
511
try {
@@ -535,15 +537,15 @@ public String toString() {
535
537
return "StompSubProtocolHandler" + getSupportedProtocols ();
536
538
}
537
539
538
- private class Stats {
540
+
541
+ private static class Stats {
539
542
540
543
private final AtomicInteger connect = new AtomicInteger ();
541
544
542
545
private final AtomicInteger connected = new AtomicInteger ();
543
546
544
547
private final AtomicInteger disconnect = new AtomicInteger ();
545
548
546
-
547
549
public void incrementConnectCount () {
548
550
this .connect .incrementAndGet ();
549
551
}
@@ -556,7 +558,6 @@ public void incrementDisconnectCount() {
556
558
this .disconnect .incrementAndGet ();
557
559
}
558
560
559
-
560
561
public String toString () {
561
562
return "processed CONNECT(" + this .connect .get () + ")-CONNECTED(" +
562
563
this .connected .get () + ")-DISCONNECT(" + this .disconnect .get () + ")" ;
0 commit comments