Skip to content

Commit 32e5f57

Browse files
committed
Ensure matching user destination returned
Before this change, when a client subscribed to a "user" destination (e.g. /user/foo), actual messages received in response to that subscription contained the server-translated, unique user destination (e.g. /foo-user123). This is not an issue for clients such as stomp.js since the subscription is unique and sufficient to match subscription responses. However, other STOMP clients do additional checks on the destination of the subscription and the response. This change ensures that messages sent to clients on user destionations always contain a destination that matches the one on the original subscription. Issue: SPR-11423
1 parent 741b4b2 commit 32e5f57

File tree

8 files changed

+269
-87
lines changed

8 files changed

+269
-87
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,21 @@
3030
import java.util.Set;
3131

3232
/**
33-
* A default implementation of {@link UserDestinationResolver}.
33+
* A default implementation of {@link UserDestinationResolver} that relies
34+
* on the {@link org.springframework.messaging.simp.user.UserSessionRegistry}
35+
* provided to the constructor to find the sessionIds associated with a user
36+
* and then uses the sessionId to make the target destination unique.
3437
* <p>
35-
* Uses the {@link org.springframework.messaging.simp.user.UserSessionRegistry}
36-
* provided to the constructor to find the sessionIds associated with a user.
38+
* When a user attempts to subscribe to "/user/queue/position-updates", the
39+
* "/user" prefix is removed and a unique suffix added, resulting in something
40+
* like "/queue/position-updates-useri9oqdfzo" where the suffix is based on the
41+
* user's session and ensures it does not collide with any other users attempting
42+
* to subscribe to "/user/queue/position-updates".
43+
* <p>
44+
* When a message is sent to a user with a destination such as
45+
* "/user/{username}/queue/position-updates", the "/user/{username}" prefix is
46+
* removed and the suffix added, resulting in something like
47+
* "/queue/position-updates-useri9oqdfzo".
3748
*
3849
* @author Rossen Stoyanchev
3950
* @since 4.0
@@ -87,30 +98,32 @@ public UserSessionRegistry getUserSessionRegistry() {
8798
}
8899

89100
@Override
90-
public Set<String> resolveDestination(Message<?> message) {
101+
public UserDestinationResult resolveDestination(Message<?> message) {
91102

92103
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
93-
UserDestinationInfo info = getUserDestinationInfo(headers);
104+
DestinationInfo info = parseUserDestination(headers);
94105
if (info == null) {
95-
return Collections.emptySet();
106+
return null;
96107
}
97108

98-
Set<String> result = new HashSet<String>();
109+
Set<String> targetDestinations = new HashSet<String>();
99110
for (String sessionId : info.getSessionIds()) {
100-
result.add(getTargetDestination(
101-
headers.getDestination(), info.getDestination(), sessionId, info.getUser()));
111+
targetDestinations.add(getTargetDestination(
112+
headers.getDestination(), info.getDestinationWithoutPrefix(), sessionId, info.getUser()));
102113
}
103114

104-
return result;
115+
return new UserDestinationResult(headers.getDestination(),
116+
targetDestinations, info.getSubscribeDestination(), info.getUser());
105117
}
106118

107-
private UserDestinationInfo getUserDestinationInfo(SimpMessageHeaderAccessor headers) {
119+
private DestinationInfo parseUserDestination(SimpMessageHeaderAccessor headers) {
108120

109121
String destination = headers.getDestination();
110122

111-
String targetUser;
112-
String targetDestination;
113-
Set<String> targetSessionIds;
123+
String destinationWithoutPrefix;
124+
String subscribeDestination;
125+
String user;
126+
Set<String> sessionIds;
114127

115128
Principal principal = headers.getUser();
116129
SimpMessageType messageType = headers.getMessageType();
@@ -127,9 +140,10 @@ private UserDestinationInfo getUserDestinationInfo(SimpMessageHeaderAccessor hea
127140
logger.error("Ignoring message, no session id available");
128141
return null;
129142
}
130-
targetUser = principal.getName();
131-
targetDestination = destination.substring(this.destinationPrefix.length()-1);
132-
targetSessionIds = Collections.singleton(headers.getSessionId());
143+
destinationWithoutPrefix = destination.substring(this.destinationPrefix.length()-1);
144+
subscribeDestination = destination;
145+
user = principal.getName();
146+
sessionIds = Collections.singleton(headers.getSessionId());
133147
}
134148
else if (SimpMessageType.MESSAGE.equals(messageType)) {
135149
if (!checkDestination(destination, this.destinationPrefix)) {
@@ -138,10 +152,11 @@ else if (SimpMessageType.MESSAGE.equals(messageType)) {
138152
int startIndex = this.destinationPrefix.length();
139153
int endIndex = destination.indexOf('/', startIndex);
140154
Assert.isTrue(endIndex > 0, "Expected destination pattern \"/principal/{userId}/**\"");
141-
targetUser = destination.substring(startIndex, endIndex);
142-
targetUser = StringUtils.replace(targetUser, "%2F", "/");
143-
targetDestination = destination.substring(endIndex);
144-
targetSessionIds = this.userSessionRegistry.getSessionIds(targetUser);
155+
destinationWithoutPrefix = destination.substring(endIndex);
156+
subscribeDestination = this.destinationPrefix.substring(0, startIndex-1) + destinationWithoutPrefix;
157+
user = destination.substring(startIndex, endIndex);
158+
user = StringUtils.replace(user, "%2F", "/");
159+
sessionIds = this.userSessionRegistry.getSessionIds(user);
145160
}
146161
else {
147162
if (logger.isTraceEnabled()) {
@@ -150,7 +165,7 @@ else if (SimpMessageType.MESSAGE.equals(messageType)) {
150165
return null;
151166
}
152167

153-
return new UserDestinationInfo(targetUser, targetDestination, targetSessionIds);
168+
return new DestinationInfo(destinationWithoutPrefix, subscribeDestination, user, sessionIds);
154169
}
155170

156171
protected boolean checkDestination(String destination, String requiredPrefix) {
@@ -167,33 +182,53 @@ protected boolean checkDestination(String destination, String requiredPrefix) {
167182
return true;
168183
}
169184

170-
protected String getTargetDestination(String origDestination, String targetDestination,
171-
String sessionId, String user) {
185+
/**
186+
* Return the target destination to use. Provided as input are the original source
187+
* destination, as well as the same destination with the target prefix removed.
188+
*
189+
* @param sourceDestination the source destination from the input message
190+
* @param sourceDestinationWithoutPrefix the source destination with the target prefix removed
191+
* @param sessionId an active user session id
192+
* @param user the user
193+
* @return the target destination
194+
*/
195+
protected String getTargetDestination(String sourceDestination,
196+
String sourceDestinationWithoutPrefix, String sessionId, String user) {
172197

173-
return targetDestination + "-user" + sessionId;
198+
return sourceDestinationWithoutPrefix + "-user" + sessionId;
174199
}
175200

176201

177-
private static class UserDestinationInfo {
202+
private static class DestinationInfo {
178203

179-
private final String user;
204+
private final String destinationWithoutPrefix;
205+
206+
private final String subscribeDestination;
180207

181-
private final String destination;
208+
private final String user;
182209

183210
private final Set<String> sessionIds;
184211

185-
private UserDestinationInfo(String user, String destination, Set<String> sessionIds) {
212+
213+
private DestinationInfo(String destinationWithoutPrefix, String subscribeDestination, String user,
214+
Set<String> sessionIds) {
215+
186216
this.user = user;
187-
this.destination = destination;
217+
this.destinationWithoutPrefix = destinationWithoutPrefix;
218+
this.subscribeDestination = subscribeDestination;
188219
this.sessionIds = sessionIds;
189220
}
190221

191-
public String getUser() {
192-
return this.user;
222+
public String getDestinationWithoutPrefix() {
223+
return this.destinationWithoutPrefix;
193224
}
194225

195-
public String getDestination() {
196-
return this.destination;
226+
public String getSubscribeDestination() {
227+
return this.subscribeDestination;
228+
}
229+
230+
public String getUser() {
231+
return this.user;
197232
}
198233

199234
public Set<String> getSessionIds() {

spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@
2727
import org.springframework.messaging.MessagingException;
2828
import org.springframework.messaging.SubscribableChannel;
2929
import org.springframework.messaging.core.MessageSendingOperations;
30+
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
31+
import org.springframework.messaging.simp.SimpMessageType;
3032
import org.springframework.messaging.simp.SimpMessagingTemplate;
33+
import org.springframework.messaging.support.MessageBuilder;
34+
import org.springframework.messaging.support.MessageHeaderAccessor;
3135
import org.springframework.util.Assert;
3236
import org.springframework.util.CollectionUtils;
3337

@@ -43,6 +47,8 @@
4347
*/
4448
public class UserDestinationMessageHandler implements MessageHandler, SmartLifecycle {
4549

50+
public static final String SUBSCRIBE_DESTINATION = "subscribeDestination";
51+
4652
private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class);
4753

4854

@@ -140,11 +146,21 @@ public final void stop(Runnable callback) {
140146
@Override
141147
public void handleMessage(Message<?> message) throws MessagingException {
142148

143-
Set<String> destinations = this.userDestinationResolver.resolveDestination(message);
144-
if (CollectionUtils.isEmpty(destinations)) {
149+
UserDestinationResult result = this.userDestinationResolver.resolveDestination(message);
150+
if (result == null) {
151+
return;
152+
}
153+
Set<String> destinations = result.getTargetDestinations();
154+
if (destinations.isEmpty()) {
145155
return;
146156
}
147157

158+
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.wrap(message);
159+
if (SimpMessageType.MESSAGE.equals(headerAccessor.getMessageType())) {
160+
headerAccessor.setHeader(SUBSCRIBE_DESTINATION, result.getSubscribeDestination());
161+
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headerAccessor).build();
162+
}
163+
148164
for (String targetDestination : destinations) {
149165
if (logger.isDebugEnabled()) {
150166
logger.debug("Sending message to resolved destination=" + targetDestination);
Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,38 +21,40 @@
2121
import java.util.Set;
2222

2323
/**
24-
* A strategy for resolving unique, user destinations per session. User destinations
25-
* provide a user with the ability to subscribe to a queue unique to their session
26-
* as well others with the ability to send messages to those queues.
24+
* A strategy for resolving a "user" destination and translating it to one or more
25+
* actual destinations unique to the user's active session(s).
2726
* <p>
28-
* When a user attempts to subscribe to "/user/queue/position-updates", the
29-
* "/user" prefix is removed and a unique suffix added, resulting in something
30-
* like "/queue/position-updates-useri9oqdfzo" where the suffix is based on the
31-
* user's session and ensures it does not collide with any other users attempting
32-
* to subscribe to "/user/queue/position-updates".
27+
* For messages sent to a user, the destination must contain the name of the target
28+
* user, The name, extracted from the destination, is used to look up the active
29+
* user session(s), and then translate the destination accordingly.
3330
* <p>
34-
* When a message is sent to a user with a destination such as
35-
* "/user/{username}/queue/position-updates", the "/user/{username}" prefix is
36-
* removed and the suffix added, resulting in something like
37-
* "/queue/position-updates-useri9oqdfzo".
31+
* For SUBSCRIBE and UNSUBSCRIBE messages, the user is the user associated with
32+
* the message. In other words the destination does not contain the user name.
33+
* <p>
34+
* See the documentation on implementations for specific examples.
3835
*
3936
* @author Rossen Stoyanchev
4037
* @since 4.0
4138
*
39+
* @see org.springframework.messaging.simp.user.DefaultUserDestinationResolver
4240
* @see UserDestinationMessageHandler
4341
*/
4442
public interface UserDestinationResolver {
4543

4644
/**
47-
* Resolve the destination of the message to a set of actual target destinations
48-
* to use. If the message is SUBSCRIBE/UNSUBSCRIBE, the returned set will contain
49-
* only target destination. If the message represents data being sent to a user,
50-
* the returned set may contain multiple target destinations, one for each active
51-
* session of the target user.
45+
* Resolve the destination of the message to a set of actual target destinations.
46+
* <p>
47+
* If the message is SUBSCRIBE/UNSUBSCRIBE, the returned set will contain a
48+
* single translated target destination.
49+
* <p>
50+
* If the message represents data being sent to a user, the returned set may
51+
* contain multiple target destinations, one for each active user session.
52+
*
53+
* @param message the message with a user destination to be resolved
5254
*
53-
* @param message the message to resolve
54-
* @return the resolved unique user destination(s) or an empty Set
55+
* @return the result of the resolution, or {@code null} if the resolution
56+
* fails (e.g. not a user destination, or no user info available, etc)
5557
*/
56-
Set<String> resolveDestination(Message<?> message);
58+
UserDestinationResult resolveDestination(Message<?> message);
5759

5860
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.user;
18+
19+
import org.springframework.util.Assert;
20+
21+
import java.util.Collections;
22+
import java.util.Set;
23+
24+
/**
25+
* A simple container for the result of parsing and translating a "user" destination
26+
* in some source message into a set of actual target destinations by calling
27+
* {@link org.springframework.messaging.simp.user.UserDestinationResolver}.
28+
*
29+
* @author Rossen Stoyanchev
30+
* @since 4.0.2
31+
*/
32+
public class UserDestinationResult {
33+
34+
private final String sourceDestination;
35+
36+
private final Set<String> targetDestinations;
37+
38+
private final String subscribeDestination;
39+
40+
private final String user;
41+
42+
43+
public UserDestinationResult(String sourceDestination,
44+
Set<String> targetDestinations, String subscribeDestination, String user) {
45+
46+
Assert.notNull(sourceDestination, "'sourceDestination' must not be null");
47+
Assert.notNull(targetDestinations, "'targetDestinations' must not be null");
48+
Assert.notNull(subscribeDestination, "'subscribeDestination' must not be null");
49+
Assert.notNull(user, "'user' must not be null");
50+
51+
this.sourceDestination = sourceDestination;
52+
this.targetDestinations = targetDestinations;
53+
this.subscribeDestination = subscribeDestination;
54+
this.user = user;
55+
}
56+
57+
58+
/**
59+
* The "user" destination as found in the headers of the source message.
60+
*
61+
* @return a destination, never {@code null}
62+
*/
63+
public String getSourceDestination() {
64+
return this.sourceDestination;
65+
}
66+
67+
/**
68+
* The result of parsing the source destination and translating it into a set
69+
* of actual target destinations to use.
70+
*
71+
* @return a set of destination values, possibly an empty set
72+
*/
73+
public Set<String> getTargetDestinations() {
74+
return this.targetDestinations;
75+
}
76+
77+
/**
78+
* The canonical form of the user destination as would be required to subscribe.
79+
* This may be useful to ensure that messages received by clients contain the
80+
* original destination they used to subscribe.
81+
*
82+
* @return a destination, never {@code null}
83+
*/
84+
public String getSubscribeDestination() {
85+
return this.subscribeDestination;
86+
}
87+
88+
/**
89+
* The user associated with the user destination.
90+
*
91+
* @return the user name, never {@code null}
92+
*/
93+
public String getUser() {
94+
return this.user;
95+
}
96+
}

0 commit comments

Comments
 (0)