Skip to content

Commit ba7998d

Browse files
committed
Add SimpMessageSendingOperations
1 parent 078cfb3 commit ba7998d

File tree

8 files changed

+139
-59
lines changed

8 files changed

+139
-59
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp;
18+
19+
import org.springframework.messaging.MessagingException;
20+
import org.springframework.messaging.core.MessagePostProcessor;
21+
import org.springframework.messaging.core.MessageSendingOperations;
22+
23+
24+
/**
25+
* @author Rossen Stoyanchev
26+
* @since 4.0
27+
*/
28+
public interface SimpMessageSendingOperations extends MessageSendingOperations<String> {
29+
30+
<T> void convertAndSendToUser(String user, String destination, T message) throws MessagingException;
31+
32+
<T> void convertAndSendToUser(String user, String destination, T message, MessagePostProcessor postProcessor)
33+
throws MessagingException;
34+
35+
}

spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.springframework.messaging.Message;
2121
import org.springframework.messaging.MessageChannel;
2222
import org.springframework.messaging.MessageDeliveryException;
23+
import org.springframework.messaging.MessagingException;
2324
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
25+
import org.springframework.messaging.core.MessagePostProcessor;
2426
import org.springframework.messaging.support.MessageBuilder;
2527
import org.springframework.util.Assert;
2628

@@ -32,18 +34,44 @@
3234
* @author Mark Fisher
3335
* @since 4.0
3436
*/
35-
public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String> {
37+
public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String>
38+
implements SimpMessageSendingOperations {
3639

37-
private final MessageChannel outputChannel;
40+
private final MessageChannel messageChannel;
41+
42+
private String userDestinationPrefix = "/user/";
3843

3944
private volatile long sendTimeout = -1;
4045

4146

42-
public SimpMessagingTemplate(MessageChannel outputChannel) {
43-
Assert.notNull(outputChannel, "outputChannel is required");
44-
this.outputChannel = outputChannel;
47+
public SimpMessagingTemplate(MessageChannel messageChannel) {
48+
Assert.notNull(messageChannel, "outputChannel is required");
49+
this.messageChannel = messageChannel;
50+
}
51+
52+
53+
/**
54+
* Configure the prefix to use for destinations targeting a specific user.
55+
* <p>The default value is "/user/".
56+
* @see org.springframework.messaging.simp.handler.UserDestinationMessageHandler
57+
*/
58+
public void setUserDestinationPrefix(String prefix) {
59+
this.userDestinationPrefix = prefix;
60+
}
61+
62+
/**
63+
* @return the userDestinationPrefix
64+
*/
65+
public String getUserDestinationPrefix() {
66+
return this.userDestinationPrefix;
4567
}
4668

69+
/**
70+
* @return the messageChannel
71+
*/
72+
public MessageChannel getMessageChannel() {
73+
return this.messageChannel;
74+
}
4775

4876
/**
4977
* Specify the timeout value to use for send operations.
@@ -54,6 +82,13 @@ public void setSendTimeout(long sendTimeout) {
5482
this.sendTimeout = sendTimeout;
5583
}
5684

85+
/**
86+
* @return the sendTimeout
87+
*/
88+
public long getSendTimeout() {
89+
return this.sendTimeout;
90+
}
91+
5792

5893
@Override
5994
public <P> void send(Message<P> message) {
@@ -64,22 +99,35 @@ public <P> void send(Message<P> message) {
6499
@Override
65100
protected void doSend(String destination, Message<?> message) {
66101
Assert.notNull(destination, "destination is required");
67-
message = addDestinationToMessage(message, destination);
102+
message = updateMessageHeaders(message, destination);
68103
long timeout = this.sendTimeout;
69104
boolean sent = (timeout >= 0)
70-
? this.outputChannel.send(message, timeout)
71-
: this.outputChannel.send(message);
105+
? this.messageChannel.send(message, timeout)
106+
: this.messageChannel.send(message);
72107
if (!sent) {
73108
throw new MessageDeliveryException(message,
74109
"failed to send message to destination '" + destination + "' within timeout: " + timeout);
75110
}
76111
}
77112

78-
protected <P> Message<P> addDestinationToMessage(Message<P> message, String destination) {
113+
protected <P> Message<P> updateMessageHeaders(Message<P> message, String destination) {
79114
Assert.notNull(destination, "destination is required");
80115
return MessageBuilder.fromMessage(message)
81116
.setHeader(SimpMessageHeaderAccessor.MESSAGE_TYPE, SimpMessageType.MESSAGE)
82117
.setHeader(SimpMessageHeaderAccessor.DESTINATIONS, Arrays.asList(destination)).build();
83118
}
84119

120+
@Override
121+
public <T> void convertAndSendToUser(String user, String destination, T message) throws MessagingException {
122+
convertAndSendToUser(user, destination, message, null);
123+
}
124+
125+
@Override
126+
public <T> void convertAndSendToUser(String user, String destination, T message,
127+
MessagePostProcessor postProcessor) throws MessagingException {
128+
129+
Assert.notNull(user, "user is required");
130+
convertAndSend(this.userDestinationPrefix + user + destination, message, postProcessor);
131+
}
132+
85133
}

spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/ReplyToMethodReturnValueHandler.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,15 @@
1717
package org.springframework.messaging.simp.annotation.support;
1818

1919
import java.security.Principal;
20-
import java.util.ArrayList;
21-
import java.util.Arrays;
22-
import java.util.List;
2320

2421
import org.springframework.core.MethodParameter;
2522
import org.springframework.messaging.Message;
2623
import org.springframework.messaging.MessageChannel;
2724
import org.springframework.messaging.core.MessagePostProcessor;
28-
import org.springframework.messaging.core.MessageSendingOperations;
2925
import org.springframework.messaging.handler.annotation.ReplyTo;
3026
import org.springframework.messaging.handler.method.HandlerMethodReturnValueHandler;
3127
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
28+
import org.springframework.messaging.simp.SimpMessageSendingOperations;
3229
import org.springframework.messaging.simp.annotation.ReplyToUser;
3330
import org.springframework.messaging.support.MessageBuilder;
3431
import org.springframework.util.Assert;
@@ -49,10 +46,10 @@
4946
*/
5047
public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
5148

52-
private final MessageSendingOperations<String> messagingTemplate;
49+
private final SimpMessageSendingOperations messagingTemplate;
5350

5451

55-
public ReplyToMethodReturnValueHandler(MessageSendingOperations<String> messagingTemplate) {
52+
public ReplyToMethodReturnValueHandler(SimpMessageSendingOperations messagingTemplate) {
5653
Assert.notNull(messagingTemplate, "messagingTemplate is required");
5754
this.messagingTemplate = messagingTemplate;
5855
}
@@ -72,25 +69,22 @@ public void handleReturnValue(Object returnValue, MethodParameter returnType, Me
7269
return;
7370
}
7471

75-
ReplyTo replyTo = returnType.getMethodAnnotation(ReplyTo.class);
76-
ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class);
72+
MessagePostProcessor postProcessor = new SessionHeaderPostProcessor(inputMessage);
7773

78-
List<String> destinations = new ArrayList<String>();
74+
ReplyTo replyTo = returnType.getMethodAnnotation(ReplyTo.class);
7975
if (replyTo != null) {
80-
destinations.addAll(Arrays.asList(replyTo.value()));
76+
for (String destination : replyTo.value()) {
77+
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
78+
}
8179
}
80+
81+
ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class);
8282
if (replyToUser != null) {
83-
Principal user = getUser(inputMessage);
83+
String user = getUser(inputMessage).getName();
8484
for (String destination : replyToUser.value()) {
85-
destinations.add("/user/" + user.getName() + destination);
85+
this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, postProcessor);
8686
}
8787
}
88-
89-
MessagePostProcessor postProcessor = new SessionIdHeaderPostProcessor(inputMessage);
90-
91-
for (String destination : destinations) {
92-
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
93-
}
9488
}
9589

9690
private Principal getUser(Message<?> inputMessage) {
@@ -103,12 +97,12 @@ private Principal getUser(Message<?> inputMessage) {
10397
}
10498

10599

106-
private final class SessionIdHeaderPostProcessor implements MessagePostProcessor {
100+
private final class SessionHeaderPostProcessor implements MessagePostProcessor {
107101

108102
private final Message<?> inputMessage;
109103

110104

111-
public SessionIdHeaderPostProcessor(Message<?> inputMessage) {
105+
public SessionHeaderPostProcessor(Message<?> inputMessage) {
112106
this.inputMessage = inputMessage;
113107
}
114108

spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,27 +74,26 @@ public void handleReturnValue(Object returnValue, MethodParameter returnType, Me
7474
"No subsriptiondId in input message. Add @ReplyTo or @ReplyToUser to method: "
7575
+ returnType.getMethod());
7676

77-
MessagePostProcessor postProcessor = new InputHeaderCopyingPostProcessor(inputHeaders);
77+
MessagePostProcessor postProcessor = new SubscriptionHeaderPostProcessor(inputHeaders);
7878
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
7979
}
8080

8181

82-
private final class InputHeaderCopyingPostProcessor implements MessagePostProcessor {
82+
private final class SubscriptionHeaderPostProcessor implements MessagePostProcessor {
8383

8484
private final SimpMessageHeaderAccessor inputHeaders;
8585

8686

87-
public InputHeaderCopyingPostProcessor(SimpMessageHeaderAccessor inputHeaders) {
87+
public SubscriptionHeaderPostProcessor(SimpMessageHeaderAccessor inputHeaders) {
8888
this.inputHeaders = inputHeaders;
8989
}
9090

9191
@Override
9292
public Message<?> postProcessMessage(Message<?> message) {
93-
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
9493
return MessageBuilder.fromMessage(message)
9594
.setHeader(SimpMessageHeaderAccessor.SESSION_ID, this.inputHeaders.getSessionId())
9695
.setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID, this.inputHeaders.getSubscriptionId())
97-
.copyHeaders(headers.toMap()).build();
96+
.build();
9897
}
9998
}
10099
}

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@
3434
import org.springframework.core.MethodParameter;
3535
import org.springframework.core.annotation.AnnotationUtils;
3636
import org.springframework.messaging.Message;
37-
import org.springframework.messaging.MessageChannel;
3837
import org.springframework.messaging.MessageHandler;
3938
import org.springframework.messaging.MessagingException;
4039
import org.springframework.messaging.handler.annotation.MessageMapping;
40+
import org.springframework.messaging.handler.annotation.ReplyTo;
4141
import org.springframework.messaging.handler.annotation.support.ExceptionHandlerMethodResolver;
4242
import org.springframework.messaging.handler.annotation.support.MessageBodyMethodArgumentResolver;
4343
import org.springframework.messaging.handler.method.HandlerMethod;
@@ -46,8 +46,8 @@
4646
import org.springframework.messaging.handler.method.HandlerMethodSelector;
4747
import org.springframework.messaging.handler.method.InvocableHandlerMethod;
4848
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
49+
import org.springframework.messaging.simp.SimpMessageSendingOperations;
4950
import org.springframework.messaging.simp.SimpMessageType;
50-
import org.springframework.messaging.simp.SimpMessagingTemplate;
5151
import org.springframework.messaging.simp.annotation.SubscribeEvent;
5252
import org.springframework.messaging.simp.annotation.UnsubscribeEvent;
5353
import org.springframework.messaging.simp.annotation.support.PrincipalMethodArgumentResolver;
@@ -68,9 +68,9 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
6868

6969
private static final Log logger = LogFactory.getLog(AnnotationMethodMessageHandler.class);
7070

71-
private final MessageChannel inboundChannel;
71+
private final SimpMessageSendingOperations inboundMessagingTemplate;
7272

73-
private final MessageChannel outboundChannel;
73+
private final SimpMessageSendingOperations outboundMessagingTemplate;
7474

7575
private MessageConverter<?> messageConverter;
7676

@@ -91,14 +91,24 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
9191

9292

9393
/**
94-
* @param inboundChannel a channel for processing incoming messages from clients
95-
* @param outboundChannel a channel for messages going out to clients
94+
* @param inboundMessagingTemplate a template for sending messages on the channel
95+
* where incoming messages from clients are sent; essentially messages sent
96+
* through this template will be re-processed by the application. One example
97+
* is the use of {@link ReplyTo} annotation on a method to send a broadcast
98+
* message.
99+
* @param outboundMessagingTemplate a template for sending messages on the client used
100+
* to send messages back out to connected clients; such messages must have all
101+
* necessary information to reach the client such as session and subscription
102+
* id's. One example is returning a value from an {@link SubscribeEvent}
103+
* method.
96104
*/
97-
public AnnotationMethodMessageHandler(MessageChannel inboundChannel, MessageChannel outboundChannel) {
98-
Assert.notNull(inboundChannel, "inboundChannel is required");
99-
Assert.notNull(outboundChannel, "outboundChannel is required");
100-
this.inboundChannel = inboundChannel;
101-
this.outboundChannel = outboundChannel;
105+
public AnnotationMethodMessageHandler(SimpMessageSendingOperations inboundMessagingTemplate,
106+
SimpMessageSendingOperations outboundMessagingTemplate) {
107+
108+
Assert.notNull(inboundMessagingTemplate, "inboundMessagingTemplate is required");
109+
Assert.notNull(outboundMessagingTemplate, "outboundMessagingTemplate is required");
110+
this.inboundMessagingTemplate = inboundMessagingTemplate;
111+
this.outboundMessagingTemplate = outboundMessagingTemplate;
102112
}
103113

104114
/**
@@ -121,14 +131,8 @@ public void afterPropertiesSet() {
121131
this.argumentResolvers.addResolver(new PrincipalMethodArgumentResolver());
122132
this.argumentResolvers.addResolver(new MessageBodyMethodArgumentResolver(this.messageConverter));
123133

124-
SimpMessagingTemplate inboundMessagingTemplate = new SimpMessagingTemplate(this.inboundChannel);
125-
inboundMessagingTemplate.setConverter(this.messageConverter);
126-
127-
SimpMessagingTemplate outboundMessagingTemplate = new SimpMessagingTemplate(this.outboundChannel);
128-
outboundMessagingTemplate.setConverter(this.messageConverter);
129-
130-
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(inboundMessagingTemplate));
131-
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(outboundMessagingTemplate));
134+
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.inboundMessagingTemplate));
135+
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.outboundMessagingTemplate));
132136
}
133137

134138
protected void initHandlerMethods() {

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionStore.java renamed to spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
* @author Rossen Stoyanchev
2222
* @since 4.0
2323
*/
24-
public interface UserSessionStore {
24+
public interface MutableUserSessionResolver extends UserSessionResolver {
2525

2626
void storeUserSessionId(String user, String sessionId);
2727

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* @author Rossen Stoyanchev
2828
* @since 4.0
2929
*/
30-
public class SimpleUserSessionResolver implements UserSessionResolver, UserSessionStore {
30+
public class SimpleUserSessionResolver implements MutableUserSessionResolver {
3131

3232
// userId -> sessionId's
3333
private final Map<String, Set<String>> userSessionIds = new ConcurrentHashMap<String, Set<String>>();

0 commit comments

Comments
 (0)