Skip to content

Commit 5d20b75

Browse files
committed
Add support for sending private messages
The new UserDestinationMessageHandler resolves messages with destinations prefixed with "/user/{username}" and resolves them into a destination to which the user is currently subscribed by appending the user session id. For example a destination such as "/user/john/queue/trade-confirmation" would resolve "/trade-confirmation/i9oqdfzo" assuming "i9oqdfzo" is the user's session id.
1 parent 2a48ad8 commit 5d20b75

File tree

7 files changed

+360
-3
lines changed

7 files changed

+360
-3
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@
6363
* @author Rossen Stoyanchev
6464
* @since 4.0
6565
*/
66-
public class AnnotationSimpMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean {
66+
public class AnnotationMethodMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean {
6767

68-
private static final Log logger = LogFactory.getLog(AnnotationSimpMessageHandler.class);
68+
private static final Log logger = LogFactory.getLog(AnnotationMethodMessageHandler.class);
6969

7070
private final MessageChannel outboundChannel;
7171

@@ -91,7 +91,7 @@ public class AnnotationSimpMessageHandler implements MessageHandler, Application
9191
* @param inboundChannel a channel for processing incoming messages from clients
9292
* @param outboundChannel a channel for messages going out to clients
9393
*/
94-
public AnnotationSimpMessageHandler(MessageChannel outboundChannel) {
94+
public AnnotationMethodMessageHandler(MessageChannel outboundChannel) {
9595
Assert.notNull(outboundChannel, "outboundChannel is required");
9696
this.outboundChannel = outboundChannel;
9797
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.handler;
18+
19+
import java.util.Collections;
20+
import java.util.Map;
21+
import java.util.Set;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.CopyOnWriteArraySet;
24+
25+
26+
/**
27+
* @author Rossen Stoyanchev
28+
* @since 4.0
29+
*/
30+
public class InMemoryUserSessionResolver implements UserSessionResolver, UserSessionStore {
31+
32+
// userId -> sessionId's
33+
private final Map<String, Set<String>> userSessionIds = new ConcurrentHashMap<String, Set<String>>();
34+
35+
36+
@Override
37+
public void storeUserSessionId(String user, String sessionId) {
38+
Set<String> sessionIds = this.userSessionIds.get(user);
39+
if (sessionIds == null) {
40+
sessionIds = new CopyOnWriteArraySet<String>();
41+
this.userSessionIds.put(user, sessionIds);
42+
}
43+
sessionIds.add(sessionId);
44+
}
45+
46+
@Override
47+
public void deleteUserSessionId(String user, String sessionId) {
48+
Set<String> sessionIds = this.userSessionIds.get(user);
49+
if (sessionIds != null) {
50+
if (sessionIds.remove(sessionId) && sessionIds.isEmpty()) {
51+
this.userSessionIds.remove(user);
52+
}
53+
}
54+
}
55+
56+
@Override
57+
public Set<String> resolveUserSessionIds(String user) {
58+
Set<String> sessionIds = this.userSessionIds.get(user);
59+
return (sessionIds != null) ? sessionIds : Collections.<String>emptySet();
60+
}
61+
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.handler;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
import org.springframework.messaging.Message;
22+
import org.springframework.messaging.MessageHandler;
23+
import org.springframework.messaging.MessagingException;
24+
import org.springframework.messaging.core.MessageSendingOperations;
25+
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
26+
import org.springframework.messaging.simp.SimpMessageType;
27+
import org.springframework.messaging.support.MessageBuilder;
28+
import org.springframework.util.Assert;
29+
import org.springframework.util.StringUtils;
30+
31+
32+
/**
33+
*
34+
* Supports destinations prefixed with "/user/{username}" and resolves them into a
35+
* destination to which the user is currently subscribed by appending the user session id.
36+
* For example a destination such as "/user/john/queue/trade-confirmation" would resolve
37+
* to "/trade-confirmation/i9oqdfzo" if "i9oqdfzo" is the user's session id.
38+
*
39+
* @author Rossen Stoyanchev
40+
* @since 4.0
41+
*/
42+
public class UserDestinationMessageHandler implements MessageHandler {
43+
44+
private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class);
45+
46+
private final MessageSendingOperations<String> messagingTemplate;
47+
48+
private String prefix = "/user/";
49+
50+
private UserSessionResolver userSessionResolver = new InMemoryUserSessionResolver();
51+
52+
53+
public UserDestinationMessageHandler(MessageSendingOperations<String> messagingTemplate) {
54+
this.messagingTemplate = messagingTemplate;
55+
}
56+
57+
/**
58+
* <p>The default prefix is "/user".
59+
* @param prefix the prefix to set
60+
*/
61+
public void setPrefix(String prefix) {
62+
Assert.hasText(prefix, "prefix is required");
63+
this.prefix = prefix.endsWith("/") ? prefix : prefix + "/";
64+
}
65+
66+
/**
67+
* @return the prefix
68+
*/
69+
public String getPrefix() {
70+
return this.prefix;
71+
}
72+
73+
/**
74+
* @param userSessionResolver the userSessionResolver to set
75+
*/
76+
public void setUserSessionResolver(UserSessionResolver userSessionResolver) {
77+
this.userSessionResolver = userSessionResolver;
78+
}
79+
80+
/**
81+
* @return the userSessionResolver
82+
*/
83+
public UserSessionResolver getUserSessionResolver() {
84+
return this.userSessionResolver;
85+
}
86+
87+
/**
88+
* @return the messagingTemplate
89+
*/
90+
public MessageSendingOperations<String> getMessagingTemplate() {
91+
return this.messagingTemplate;
92+
}
93+
94+
@Override
95+
public void handleMessage(Message<?> message) throws MessagingException {
96+
97+
if (!shouldHandle(message)) {
98+
return;
99+
}
100+
101+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
102+
String destination = headers.getDestination();
103+
104+
if (logger.isTraceEnabled()) {
105+
logger.trace("Processing message to destination " + destination);
106+
}
107+
108+
UserDestinationParser destinationParser = new UserDestinationParser(destination);
109+
String user = destinationParser.getUser();
110+
111+
if (user == null) {
112+
if (logger.isErrorEnabled()) {
113+
logger.error("Ignoring message, expected destination \"" + this.prefix
114+
+ "{userId}/**\": " + destination);
115+
}
116+
return;
117+
}
118+
119+
for (String sessionId : this.userSessionResolver.resolveUserSessionIds(user)) {
120+
121+
String targetDestination = destinationParser.getTargetDestination(sessionId);
122+
headers.setDestination(targetDestination);
123+
message = MessageBuilder.fromMessage(message).copyHeaders(headers.toMap()).build();
124+
125+
if (logger.isTraceEnabled()) {
126+
logger.trace("Sending message to resolved target destination " + targetDestination);
127+
}
128+
this.messagingTemplate.send(targetDestination, message);
129+
}
130+
}
131+
132+
protected boolean shouldHandle(Message<?> message) {
133+
134+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
135+
SimpMessageType messageType = headers.getMessageType();
136+
String destination = headers.getDestination();
137+
138+
if (!SimpMessageType.MESSAGE.equals(messageType)) {
139+
return false;
140+
}
141+
142+
if (!StringUtils.hasText(destination)) {
143+
if (logger.isErrorEnabled()) {
144+
logger.error("Ignoring message, no destination: " + headers);
145+
}
146+
return false;
147+
}
148+
else if (!destination.startsWith(this.prefix)) {
149+
return false;
150+
}
151+
152+
return true;
153+
}
154+
155+
156+
private class UserDestinationParser {
157+
158+
private final String user;
159+
160+
private final String targetDestination;
161+
162+
163+
public UserDestinationParser(String destination) {
164+
165+
int userStartIndex = prefix.length();
166+
int userEndIndex = destination.indexOf('/', userStartIndex);
167+
168+
if (userEndIndex > 0) {
169+
this.user = destination.substring(userStartIndex, userEndIndex);
170+
this.targetDestination = destination.substring(userEndIndex);
171+
}
172+
else {
173+
this.user = null;
174+
this.targetDestination = null;
175+
}
176+
}
177+
178+
public String getUser() {
179+
return this.user;
180+
}
181+
182+
public String getTargetDestination(String sessionId) {
183+
return (this.targetDestination != null) ? this.targetDestination + "/" + sessionId : null;
184+
}
185+
}
186+
187+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.handler;
18+
19+
import java.util.Set;
20+
21+
22+
/**
23+
* A strategy for resolving a user name to one or more session id's.
24+
*
25+
* @author Rossen Stoyanchev
26+
* @since 4.0
27+
*/
28+
public interface UserSessionResolver {
29+
30+
/**
31+
* Retrieve the sessionId(s) associated with the given user.
32+
*
33+
* @param user the user name
34+
* @return a Set with zero, one, or more, current session id's.
35+
*/
36+
Set<String> resolveUserSessionIds(String user);
37+
38+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.handler;
18+
19+
20+
/**
21+
* @author Rossen Stoyanchev
22+
* @since 4.0
23+
*/
24+
public interface UserSessionStore {
25+
26+
void storeUserSessionId(String user, String sessionId);
27+
28+
void deleteUserSessionId(String user, String sessionId);
29+
30+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* MessageHandler implementation and supporting classes for message processing.
3+
*/
4+
package org.springframework.messaging.simp.handler;

0 commit comments

Comments
 (0)