Skip to content

Commit d73c2e2

Browse files
committed
Polish handling of STOMP message headers
1 parent ba7998d commit d73c2e2

17 files changed

+259
-212
lines changed

spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageSendingTemplate.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public void setDefaultDestination(D defaultDestination) {
4141
this.defaultDestination = defaultDestination;
4242
}
4343

44+
public D getDefaultDestination() {
45+
return this.defaultDestination;
46+
}
47+
4448
/**
4549
* Set the {@link MessageConverter} that is to be used to convert
4650
* between Messages and objects for this template.
@@ -82,7 +86,7 @@ public <P> void send(D destination, Message<P> message) {
8286
this.doSend(destination, message);
8387
}
8488

85-
protected abstract void doSend(D destination, Message<?> message) ;
89+
protected abstract void doSend(D destination, Message<?> message);
8690

8791

8892
@Override

spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageHeaderAccessor.java

Lines changed: 23 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.messaging.simp;
1818

1919
import java.security.Principal;
20-
import java.util.Arrays;
2120
import java.util.List;
2221
import java.util.Map;
2322

@@ -26,7 +25,6 @@
2625
import org.springframework.messaging.MessageHeaders;
2726
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
2827
import org.springframework.util.Assert;
29-
import org.springframework.util.CollectionUtils;
3028

3129

3230
/**
@@ -43,35 +41,25 @@
4341
*/
4442
public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
4543

46-
public static final String DESTINATIONS = "destinations";
44+
public static final String DESTINATION_HEADER = "destination";
4745

48-
public static final String MESSAGE_TYPE = "messageType";
46+
public static final String MESSAGE_TYPE_HEADER = "messageType";
4947

50-
// TODO
51-
public static final String PROTOCOL_MESSAGE_TYPE = "protocolMessageType";
48+
public static final String SESSION_ID_HEADER = "sessionId";
5249

53-
public static final String SESSION_ID = "sessionId";
50+
public static final String SUBSCRIPTION_ID_HEADER = "subscriptionId";
5451

55-
public static final String SUBSCRIPTION_ID = "subscriptionId";
56-
57-
public static final String USER = "user";
52+
public static final String USER_HEADER = "user";
5853

5954

6055
/**
6156
* A constructor for creating new message headers.
6257
* This constructor is protected. See factory methods in this and sub-classes.
6358
*/
64-
protected SimpMessageHeaderAccessor(SimpMessageType messageType, Object protocolMessageType,
65-
Map<String, List<String>> externalSourceHeaders) {
66-
59+
protected SimpMessageHeaderAccessor(SimpMessageType messageType, Map<String, List<String>> externalSourceHeaders) {
6760
super(externalSourceHeaders);
68-
6961
Assert.notNull(messageType, "messageType is required");
70-
setHeader(MESSAGE_TYPE, messageType);
71-
72-
if (protocolMessageType != null) {
73-
setHeader(PROTOCOL_MESSAGE_TYPE, protocolMessageType);
74-
}
62+
setHeader(MESSAGE_TYPE_HEADER, messageType);
7563
}
7664

7765
/**
@@ -89,14 +77,14 @@ protected SimpMessageHeaderAccessor(Message<?> message) {
8977
* {@link SimpMessageType#MESSAGE}.
9078
*/
9179
public static SimpMessageHeaderAccessor create() {
92-
return new SimpMessageHeaderAccessor(SimpMessageType.MESSAGE, null, null);
80+
return new SimpMessageHeaderAccessor(SimpMessageType.MESSAGE, null);
9381
}
9482

9583
/**
9684
* Create {@link SimpMessageHeaderAccessor} for a new {@link Message} of a specific type.
9785
*/
9886
public static SimpMessageHeaderAccessor create(SimpMessageType messageType) {
99-
return new SimpMessageHeaderAccessor(messageType, null, null);
87+
return new SimpMessageHeaderAccessor(messageType, null);
10088
}
10189

10290
/**
@@ -106,39 +94,23 @@ public static SimpMessageHeaderAccessor wrap(Message<?> message) {
10694
return new SimpMessageHeaderAccessor(message);
10795
}
10896

109-
110-
public SimpMessageType getMessageType() {
111-
return (SimpMessageType) getHeader(MESSAGE_TYPE);
112-
}
113-
114-
protected void setProtocolMessageType(Object protocolMessageType) {
115-
setHeader(PROTOCOL_MESSAGE_TYPE, protocolMessageType);
97+
public void setMessageTypeIfNotSet(SimpMessageType messageType) {
98+
if (getMessageType() == null) {
99+
setHeader(MESSAGE_TYPE_HEADER, messageType);
100+
}
116101
}
117102

118-
protected Object getProtocolMessageType() {
119-
return getHeader(PROTOCOL_MESSAGE_TYPE);
103+
public SimpMessageType getMessageType() {
104+
return (SimpMessageType) getHeader(MESSAGE_TYPE_HEADER);
120105
}
121106

122107
public void setDestination(String destination) {
123108
Assert.notNull(destination, "destination is required");
124-
setHeader(DESTINATIONS, Arrays.asList(destination));
109+
setHeader(DESTINATION_HEADER, destination);
125110
}
126111

127-
@SuppressWarnings("unchecked")
128112
public String getDestination() {
129-
List<String> destinations = (List<String>) getHeader(DESTINATIONS);
130-
return CollectionUtils.isEmpty(destinations) ? null : destinations.get(0);
131-
}
132-
133-
@SuppressWarnings("unchecked")
134-
public List<String> getDestinations() {
135-
List<String> destinations = (List<String>) getHeader(DESTINATIONS);
136-
return CollectionUtils.isEmpty(destinations) ? null : destinations;
137-
}
138-
139-
public void setDestinations(List<String> destinations) {
140-
Assert.notNull(destinations, "destinations are required");
141-
setHeader(DESTINATIONS, destinations);
113+
return (String) getHeader(DESTINATION_HEADER);
142114
}
143115

144116
public MediaType getContentType() {
@@ -150,27 +122,27 @@ public void setContentType(MediaType contentType) {
150122
}
151123

152124
public String getSubscriptionId() {
153-
return (String) getHeader(SUBSCRIPTION_ID);
125+
return (String) getHeader(SUBSCRIPTION_ID_HEADER);
154126
}
155127

156128
public void setSubscriptionId(String subscriptionId) {
157-
setHeader(SUBSCRIPTION_ID, subscriptionId);
129+
setHeader(SUBSCRIPTION_ID_HEADER, subscriptionId);
158130
}
159131

160132
public String getSessionId() {
161-
return (String) getHeader(SESSION_ID);
133+
return (String) getHeader(SESSION_ID_HEADER);
162134
}
163135

164136
public void setSessionId(String sessionId) {
165-
setHeader(SESSION_ID, sessionId);
137+
setHeader(SESSION_ID_HEADER, sessionId);
166138
}
167139

168140
public Principal getUser() {
169-
return (Principal) getHeader(USER);
141+
return (Principal) getHeader(USER_HEADER);
170142
}
171143

172144
public void setUser(Principal principal) {
173-
setHeader(USER, principal);
145+
setHeader(USER_HEADER, principal);
174146
}
175147

176148
}

spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageSendingOperations.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,31 @@
2222

2323

2424
/**
25+
* A specialization of {@link MessageSendingOperations} with methods for use with
26+
* the Spring Framework support for simple messaging protocols (like STOMP).
27+
*
2528
* @author Rossen Stoyanchev
2629
* @since 4.0
2730
*/
2831
public interface SimpMessageSendingOperations extends MessageSendingOperations<String> {
2932

33+
/**
34+
* Send a message to a specific user.
35+
*
36+
* @param user the user that should receive the message.
37+
* @param destination the destination to send the message to.
38+
* @param message the message to send
39+
*/
3040
<T> void convertAndSendToUser(String user, String destination, T message) throws MessagingException;
3141

42+
/**
43+
* Send a message to a specific user.
44+
*
45+
* @param user the user that should receive the message.
46+
* @param destination the destination to send the message to.
47+
* @param message the message to send
48+
* @param postProcessor a postProcessor to post-process or modify the created message
49+
*/
3250
<T> void convertAndSendToUser(String user, String destination, T message, MessagePostProcessor postProcessor)
3351
throws MessagingException;
3452

spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package org.springframework.messaging.simp;
1717

18-
import java.util.Arrays;
19-
2018
import org.springframework.messaging.Message;
2119
import org.springframework.messaging.MessageChannel;
2220
import org.springframework.messaging.MessageDeliveryException;
@@ -56,6 +54,7 @@ public SimpMessagingTemplate(MessageChannel messageChannel) {
5654
* @see org.springframework.messaging.simp.handler.UserDestinationMessageHandler
5755
*/
5856
public void setUserDestinationPrefix(String prefix) {
57+
Assert.notNull(prefix, "userDestinationPrefix is required");
5958
this.userDestinationPrefix = prefix;
6059
}
6160

@@ -92,31 +91,32 @@ public long getSendTimeout() {
9291

9392
@Override
9493
public <P> void send(Message<P> message) {
95-
// TODO: maybe look up destination of current message (via ThreadLocal)
96-
this.send(getRequiredDefaultDestination(), message);
94+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
95+
String destination = headers.getDestination();
96+
destination = (destination != null) ? destination : getRequiredDefaultDestination();
97+
doSend(getRequiredDefaultDestination(), message);
9798
}
9899

99100
@Override
100101
protected void doSend(String destination, Message<?> message) {
101102
Assert.notNull(destination, "destination is required");
102-
message = updateMessageHeaders(message, destination);
103+
104+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
105+
headers.setDestination(destination);
106+
headers.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
107+
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
108+
103109
long timeout = this.sendTimeout;
104110
boolean sent = (timeout >= 0)
105111
? this.messageChannel.send(message, timeout)
106112
: this.messageChannel.send(message);
113+
107114
if (!sent) {
108115
throw new MessageDeliveryException(message,
109116
"failed to send message to destination '" + destination + "' within timeout: " + timeout);
110117
}
111118
}
112119

113-
protected <P> Message<P> updateMessageHeaders(Message<P> message, String destination) {
114-
Assert.notNull(destination, "destination is required");
115-
return MessageBuilder.fromMessage(message)
116-
.setHeader(SimpMessageHeaderAccessor.MESSAGE_TYPE, SimpMessageType.MESSAGE)
117-
.setHeader(SimpMessageHeaderAccessor.DESTINATIONS, Arrays.asList(destination)).build();
118-
}
119-
120120
@Override
121121
public <T> void convertAndSendToUser(String user, String destination, T message) throws MessagingException {
122122
convertAndSendToUser(user, destination, message, null);

spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/ReplyToMethodReturnValueHandler.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.messaging.simp.annotation.support;
1818

19-
import java.security.Principal;
20-
2119
import org.springframework.core.MethodParameter;
2220
import org.springframework.messaging.Message;
2321
import org.springframework.messaging.MessageChannel;
@@ -69,7 +67,10 @@ public void handleReturnValue(Object returnValue, MethodParameter returnType, Me
6967
return;
7068
}
7169

72-
MessagePostProcessor postProcessor = new SessionHeaderPostProcessor(inputMessage);
70+
SimpMessageHeaderAccessor inputHeaders = SimpMessageHeaderAccessor.wrap(inputMessage);
71+
72+
String sessionId = inputHeaders.getSessionId();
73+
MessagePostProcessor postProcessor = new SessionHeaderPostProcessor(sessionId);
7374

7475
ReplyTo replyTo = returnType.getMethodAnnotation(ReplyTo.class);
7576
if (replyTo != null) {
@@ -80,37 +81,30 @@ public void handleReturnValue(Object returnValue, MethodParameter returnType, Me
8081

8182
ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class);
8283
if (replyToUser != null) {
83-
String user = getUser(inputMessage).getName();
84+
if (inputHeaders.getUser() == null) {
85+
throw new MissingSessionUserException(inputMessage);
86+
}
87+
String user = inputHeaders.getUser().getName();
8488
for (String destination : replyToUser.value()) {
8589
this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, postProcessor);
8690
}
8791
}
8892
}
8993

90-
private Principal getUser(Message<?> inputMessage) {
91-
SimpMessageHeaderAccessor inputHeaders = SimpMessageHeaderAccessor.wrap(inputMessage);
92-
Principal user = inputHeaders.getUser();
93-
if (user == null) {
94-
throw new MissingSessionUserException(inputMessage);
95-
}
96-
return user;
97-
}
98-
9994

10095
private final class SessionHeaderPostProcessor implements MessagePostProcessor {
10196

102-
private final Message<?> inputMessage;
103-
97+
private final String sessionId;
10498

105-
public SessionHeaderPostProcessor(Message<?> inputMessage) {
106-
this.inputMessage = inputMessage;
99+
public SessionHeaderPostProcessor(String sessionId) {
100+
this.sessionId = sessionId;
107101
}
108102

109103
@Override
110104
public Message<?> postProcessMessage(Message<?> message) {
111-
String headerName = SimpMessageHeaderAccessor.SESSION_ID;
112-
String sessionId = (String) this.inputMessage.getHeaders().get(headerName);
113-
return MessageBuilder.fromMessage(message).setHeader(headerName, sessionId).build();
105+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
106+
headers.setSessionId(this.sessionId);
107+
return MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
114108
}
115109
}
116110
}

spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,32 +68,37 @@ public void handleReturnValue(Object returnValue, MethodParameter returnType, Me
6868
}
6969

7070
SimpMessageHeaderAccessor inputHeaders = SimpMessageHeaderAccessor.wrap(message);
71+
String sessionId = inputHeaders.getSessionId();
72+
String subscriptionId = inputHeaders.getSubscriptionId();
7173
String destination = inputHeaders.getDestination();
7274

7375
Assert.state(inputHeaders.getSubscriptionId() != null,
7476
"No subsriptiondId in input message. Add @ReplyTo or @ReplyToUser to method: "
7577
+ returnType.getMethod());
7678

77-
MessagePostProcessor postProcessor = new SubscriptionHeaderPostProcessor(inputHeaders);
79+
MessagePostProcessor postProcessor = new SubscriptionHeaderPostProcessor(sessionId, subscriptionId);
7880
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
7981
}
8082

8183

8284
private final class SubscriptionHeaderPostProcessor implements MessagePostProcessor {
8385

84-
private final SimpMessageHeaderAccessor inputHeaders;
86+
private final String sessionId;
8587

88+
private final String subscriptionId;
8689

87-
public SubscriptionHeaderPostProcessor(SimpMessageHeaderAccessor inputHeaders) {
88-
this.inputHeaders = inputHeaders;
90+
91+
public SubscriptionHeaderPostProcessor(String sessionId, String subscriptionId) {
92+
this.sessionId = sessionId;
93+
this.subscriptionId = subscriptionId;
8994
}
9095

9196
@Override
9297
public Message<?> postProcessMessage(Message<?> message) {
93-
return MessageBuilder.fromMessage(message)
94-
.setHeader(SimpMessageHeaderAccessor.SESSION_ID, this.inputHeaders.getSessionId())
95-
.setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID, this.inputHeaders.getSubscriptionId())
96-
.build();
98+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
99+
headers.setSessionId(this.sessionId);
100+
headers.setSubscriptionId(this.subscriptionId);
101+
return MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
97102
}
98103
}
99104
}

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
*/
2424
public interface MutableUserSessionResolver extends UserSessionResolver {
2525

26-
void storeUserSessionId(String user, String sessionId);
26+
void addUserSessionId(String user, String sessionId);
2727

28-
void deleteUserSessionId(String user, String sessionId);
28+
void removeUserSessionId(String user, String sessionId);
2929

3030
}

0 commit comments

Comments
 (0)