Skip to content

Commit 2a48ad8

Browse files
committed
Refactor and polish spring-messaging
Remove base class for STOMP-related message handler classes (AbstractSimpMessageHandler), polish subclasses and fix issues with more significant updates to STOMP broker relay. Introduce base class for SubscribableChannel implementations providing consistent logging for all channel implementations.
1 parent f5f3f66 commit 2a48ad8

18 files changed

+512
-523
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSimpMessageHandler.java

Lines changed: 0 additions & 164 deletions
This file was deleted.

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
3434

3535

3636
@Override
37-
public void addSubscription(Message<?> message) {
37+
public final void registerSubscription(Message<?> message) {
3838
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
3939
if (!SimpMessageType.SUBSCRIBE.equals(headers.getMessageType())) {
4040
logger.error("Expected SUBSCRIBE message: " + message);
@@ -55,14 +55,17 @@ public void addSubscription(Message<?> message) {
5555
logger.error("Ignoring destination. No destination in message: " + message);
5656
return;
5757
}
58+
if (logger.isDebugEnabled()) {
59+
logger.debug("Subscribe request: " + message);
60+
}
5861
addSubscriptionInternal(sessionId, subscriptionId, destination, message);
5962
}
6063

6164
protected abstract void addSubscriptionInternal(String sessionId, String subscriptionId,
6265
String destination, Message<?> message);
6366

6467
@Override
65-
public void removeSubscription(Message<?> message) {
68+
public final void unregisterSubscription(Message<?> message) {
6669
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
6770
if (!SimpMessageType.UNSUBSCRIBE.equals(headers.getMessageType())) {
6871
logger.error("Expected UNSUBSCRIBE message: " + message);
@@ -78,17 +81,19 @@ public void removeSubscription(Message<?> message) {
7881
logger.error("Ignoring subscription. No subscriptionId in message: " + message);
7982
return;
8083
}
84+
if (logger.isDebugEnabled()) {
85+
logger.debug("Unubscribe request: " + message);
86+
}
8187
removeSubscriptionInternal(sessionId, subscriptionId, message);
8288
}
8389

8490
protected abstract void removeSubscriptionInternal(String sessionId, String subscriptionId, Message<?> message);
8591

8692
@Override
87-
public void removeSessionSubscriptions(String sessionId) {
88-
}
93+
public abstract void unregisterAllSubscriptions(String sessionId);
8994

9095
@Override
91-
public MultiValueMap<String, String> findSubscriptions(Message<?> message) {
96+
public final MultiValueMap<String, String> findSubscriptions(Message<?> message) {
9297
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
9398
if (!SimpMessageType.MESSAGE.equals(headers.getMessageType())) {
9499
logger.error("Unexpected message type: " + message);
@@ -99,6 +104,9 @@ public MultiValueMap<String, String> findSubscriptions(Message<?> message) {
99104
logger.error("Ignoring destination. No destination in message: " + message);
100105
return null;
101106
}
107+
if (logger.isTraceEnabled()) {
108+
logger.trace("Find subscriptions, destination=" + headers.getDestination());
109+
}
102110
return findSubscriptionsInternal(destination, message);
103111
}
104112

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationSimpMessageHandler.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
import java.lang.annotation.Annotation;
2020
import java.lang.reflect.Method;
2121
import java.util.Arrays;
22-
import java.util.Collection;
2322
import java.util.HashMap;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Set;
2726
import java.util.concurrent.ConcurrentHashMap;
2827

28+
import org.apache.commons.logging.Log;
29+
import org.apache.commons.logging.LogFactory;
2930
import org.springframework.beans.BeansException;
3031
import org.springframework.beans.factory.InitializingBean;
3132
import org.springframework.context.ApplicationContext;
@@ -34,6 +35,8 @@
3435
import org.springframework.core.annotation.AnnotationUtils;
3536
import org.springframework.messaging.Message;
3637
import org.springframework.messaging.MessageChannel;
38+
import org.springframework.messaging.MessageHandler;
39+
import org.springframework.messaging.MessagingException;
3740
import org.springframework.messaging.handler.annotation.MessageMapping;
3841
import org.springframework.messaging.handler.annotation.support.MessageBodyArgumentResolver;
3942
import org.springframework.messaging.handler.annotation.support.MessageExceptionHandlerMethodResolver;
@@ -60,8 +63,9 @@
6063
* @author Rossen Stoyanchev
6164
* @since 4.0
6265
*/
63-
public class AnnotationSimpMessageHandler extends AbstractSimpMessageHandler
64-
implements ApplicationContextAware, InitializingBean {
66+
public class AnnotationSimpMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean {
67+
68+
private static final Log logger = LogFactory.getLog(AnnotationSimpMessageHandler.class);
6569

6670
private final MessageChannel outboundChannel;
6771

@@ -104,11 +108,6 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
104108
this.applicationContext = applicationContext;
105109
}
106110

107-
@Override
108-
protected Collection<SimpMessageType> getSupportedMessageTypes() {
109-
return Arrays.asList(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE, SimpMessageType.UNSUBSCRIBE);
110-
}
111-
112111
@Override
113112
public void afterPropertiesSet() {
114113

@@ -183,18 +182,20 @@ protected HandlerMethod createHandlerMethod(Object handler, Method method) {
183182
}
184183

185184
@Override
186-
public void handlePublish(Message<?> message) {
187-
handleMessageInternal(message, this.messageMethods);
188-
}
185+
public void handleMessage(Message<?> message) throws MessagingException {
189186

190-
@Override
191-
public void handleSubscribe(Message<?> message) {
192-
handleMessageInternal(message, this.subscribeMethods);
193-
}
187+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
188+
SimpMessageType messageType = headers.getMessageType();
194189

195-
@Override
196-
public void handleUnsubscribe(Message<?> message) {
197-
handleMessageInternal(message, this.unsubscribeMethods);
190+
if (SimpMessageType.MESSAGE.equals(messageType)) {
191+
handleMessageInternal(message, this.messageMethods);
192+
}
193+
else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
194+
handleMessageInternal(message, this.subscribeMethods);
195+
}
196+
else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
197+
handleMessageInternal(message, this.unsubscribeMethods);
198+
}
198199
}
199200

200201
private void handleMessageInternal(final Message<?> message, Map<MappingInfo, HandlerMethod> handlerMethods) {

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultSubscriptionRegistry.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,14 @@ protected void removeSubscriptionInternal(String sessionId, String subscriptionI
7474
}
7575

7676
@Override
77-
public void removeSessionSubscriptions(String sessionId) {
77+
public void unregisterAllSubscriptions(String sessionId) {
7878
SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId);
79-
this.destinationCache.removeSessionSubscriptions(info);
79+
if (info != null) {
80+
if (logger.isDebugEnabled()) {
81+
logger.debug("Unregistering subscriptions for sessionId=" + sessionId);
82+
}
83+
this.destinationCache.removeSessionSubscriptions(info);
84+
}
8085
}
8186

8287
@Override

0 commit comments

Comments
 (0)