Skip to content

Commit 7500b14

Browse files
committed
Option to preserve publish order
Issue: SPR-13989
1 parent 430250c commit 7500b14

File tree

17 files changed

+496
-95
lines changed

17 files changed

+496
-95
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public abstract class AbstractBrokerMessageHandler
5858

5959
private final Collection<String> destinationPrefixes;
6060

61+
private boolean preservePublishOrder = false;
62+
6163
@Nullable
6264
private ApplicationEventPublisher eventPublisher;
6365

@@ -132,6 +134,31 @@ public void setApplicationEventPublisher(@Nullable ApplicationEventPublisher pub
132134
this.eventPublisher = publisher;
133135
}
134136

137+
/**
138+
* Whether the client must receive messages in the order of publication.
139+
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
140+
* not be processed in the same order because the channel is backed by a
141+
* ThreadPoolExecutor that in turn does not guarantee processing in order.
142+
* <p>When this flag is set to {@code true} messages within the same session
143+
* will be sent to the {@code "clientOutboundChannel"} one at a time in
144+
* order to preserve the order of publication. Enable this only if needed
145+
* since there is some performance overhead to keep messages in order.
146+
* @param preservePublishOrder whether to publish in order
147+
* @since 5.1
148+
*/
149+
public void setPreservePublishOrder(boolean preservePublishOrder) {
150+
OrderedMessageSender.configureOutboundChannel(this.clientOutboundChannel, preservePublishOrder);
151+
this.preservePublishOrder = preservePublishOrder;
152+
}
153+
154+
/**
155+
* Whether to ensure messages are received in the order of publication.
156+
* @since 5.1
157+
*/
158+
public boolean isPreservePublishOrder() {
159+
return this.preservePublishOrder;
160+
}
161+
135162
@Nullable
136163
public ApplicationEventPublisher getApplicationEventPublisher() {
137164
return this.eventPublisher;
@@ -269,6 +296,16 @@ protected void publishBrokerUnavailableEvent() {
269296
}
270297
}
271298

299+
/**
300+
* Get the MessageChannel to use for sending messages to clients, possibly
301+
* a per-session wrapper when {@code preservePublishOrder=true}.
302+
* @since 5.1
303+
*/
304+
protected MessageChannel getClientOutboundChannelForSession(String sessionId) {
305+
return this.preservePublishOrder ?
306+
new OrderedMessageSender(getClientOutboundChannel(), logger) : getClientOutboundChannel();
307+
}
308+
272309

273310
/**
274311
* Detect unsent DISCONNECT messages and process them anyway.
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.simp.broker;
17+
18+
import java.util.Queue;
19+
import java.util.concurrent.ConcurrentLinkedQueue;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
22+
import org.apache.commons.logging.Log;
23+
24+
import org.springframework.messaging.Message;
25+
import org.springframework.messaging.MessageChannel;
26+
import org.springframework.messaging.MessageHandler;
27+
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
28+
import org.springframework.messaging.support.ExecutorChannelInterceptor;
29+
import org.springframework.messaging.support.ExecutorSubscribableChannel;
30+
import org.springframework.messaging.support.MessageHeaderAccessor;
31+
import org.springframework.util.Assert;
32+
33+
/**
34+
* Submit messages to an ExecutorSubscribableChannel, one at a time. The channel
35+
* must have been configured with {@link #configureOutboundChannel}.
36+
*
37+
* @author Rossen Stoyanchev
38+
* @since 5.1
39+
*/
40+
class OrderedMessageSender implements MessageChannel {
41+
42+
static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask";
43+
44+
45+
private final MessageChannel channel;
46+
47+
private final Log logger;
48+
49+
private final Queue<Message<?>> messages = new ConcurrentLinkedQueue<>();
50+
51+
private final AtomicBoolean sendInProgress = new AtomicBoolean(false);
52+
53+
54+
public OrderedMessageSender(MessageChannel channel, Log logger) {
55+
this.channel = channel;
56+
this.logger = logger;
57+
}
58+
59+
60+
public boolean send(Message<?> message) {
61+
return send(message, -1);
62+
}
63+
64+
@Override
65+
public boolean send(Message<?> message, long timeout) {
66+
this.messages.add(message);
67+
trySend();
68+
return true;
69+
}
70+
71+
private void trySend() {
72+
73+
// Take sendInProgress flag only if queue is not empty
74+
if (this.messages.isEmpty()) {
75+
return;
76+
}
77+
78+
if (this.sendInProgress.compareAndSet(false, true)) {
79+
sendNextMessage();
80+
}
81+
}
82+
83+
private void sendNextMessage() {
84+
for (;;) {
85+
Message<?> message = this.messages.poll();
86+
if (message != null) {
87+
try {
88+
addCompletionCallback(message);
89+
if (this.channel.send(message)) {
90+
return;
91+
}
92+
}
93+
catch (Throwable ex) {
94+
if (logger.isErrorEnabled()) {
95+
logger.error("Failed to send " + message, ex);
96+
}
97+
}
98+
}
99+
else {
100+
// We ran out of messages..
101+
this.sendInProgress.set(false);
102+
trySend();
103+
break;
104+
}
105+
}
106+
}
107+
108+
private void addCompletionCallback(Message<?> msg) {
109+
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(msg, SimpMessageHeaderAccessor.class);
110+
Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor");
111+
accessor.setHeader(COMPLETION_TASK_HEADER, (Runnable) this::sendNextMessage);
112+
}
113+
114+
115+
/**
116+
* Install or remove an {@link ExecutorChannelInterceptor} that invokes a
117+
* completion task once the message is handled.
118+
* @param channel the channel to configure
119+
* @param preservePublishOrder whether preserve order is on or off based on
120+
* which an interceptor is either added or removed.
121+
*/
122+
static void configureOutboundChannel(MessageChannel channel, boolean preservePublishOrder) {
123+
if (preservePublishOrder) {
124+
Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel,
125+
"An ExecutorSubscribableChannel is required for `preservePublishOrder`");
126+
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel;
127+
if (execChannel.getInterceptors().stream().noneMatch(i -> i instanceof CallbackInterceptor)) {
128+
execChannel.addInterceptor(0, new CallbackInterceptor());
129+
}
130+
}
131+
else if (channel instanceof ExecutorSubscribableChannel) {
132+
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel;
133+
execChannel.getInterceptors().stream().filter(i -> i instanceof CallbackInterceptor)
134+
.findFirst()
135+
.map(execChannel::removeInterceptor);
136+
137+
}
138+
}
139+
140+
141+
private static class CallbackInterceptor implements ExecutorChannelInterceptor {
142+
143+
@Override
144+
public void afterMessageHandled(Message<?> msg, MessageChannel ch, MessageHandler handler, Exception ex) {
145+
Runnable task = (Runnable) msg.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER);
146+
if (task != null) {
147+
task.run();
148+
}
149+
}
150+
}
151+
152+
}

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -306,18 +306,19 @@ protected void handleMessageInternal(Message<?> message) {
306306
else if (SimpMessageType.CONNECT.equals(messageType)) {
307307
logMessage(message);
308308
if (sessionId != null) {
309-
long[] clientHeartbeat = SimpMessageHeaderAccessor.getHeartbeat(headers);
310-
long[] serverHeartbeat = getHeartbeatValue();
309+
long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
310+
long[] heartbeatOut = getHeartbeatValue();
311311
Principal user = SimpMessageHeaderAccessor.getUser(headers);
312-
this.sessions.put(sessionId, new SessionInfo(sessionId, user, clientHeartbeat, serverHeartbeat));
312+
MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);
313+
this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));
313314
SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
314315
initHeaders(connectAck);
315316
connectAck.setSessionId(sessionId);
316317
if (user != null) {
317318
connectAck.setUser(user);
318319
}
319320
connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
320-
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, serverHeartbeat);
321+
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);
321322
Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
322323
getClientOutboundChannel().send(messageOut);
323324
}
@@ -391,19 +392,20 @@ protected void sendMessageToSubscribers(@Nullable String destination, Message<?>
391392
headerAccessor.setSessionId(sessionId);
392393
headerAccessor.setSubscriptionId(subscriptionId);
393394
headerAccessor.copyHeadersIfAbsent(message.getHeaders());
395+
headerAccessor.setLeaveMutable(true);
394396
Object payload = message.getPayload();
395397
Message<?> reply = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
396-
try {
397-
getClientOutboundChannel().send(reply);
398-
}
399-
catch (Throwable ex) {
400-
if (logger.isErrorEnabled()) {
401-
logger.error("Failed to send " + message, ex);
398+
SessionInfo info = this.sessions.get(sessionId);
399+
if (info != null) {
400+
try {
401+
info.getClientOutboundChannel().send(reply);
402402
}
403-
}
404-
finally {
405-
SessionInfo info = this.sessions.get(sessionId);
406-
if (info != null) {
403+
catch (Throwable ex) {
404+
if (logger.isErrorEnabled()) {
405+
logger.error("Failed to send " + message, ex);
406+
}
407+
}
408+
finally {
407409
info.setLastWriteTime(now);
408410
}
409411
}
@@ -427,6 +429,8 @@ private static class SessionInfo {
427429
@Nullable
428430
private final Principal user;
429431

432+
private final MessageChannel clientOutboundChannel;
433+
430434
private final long readInterval;
431435

432436
private final long writeInterval;
@@ -435,11 +439,13 @@ private static class SessionInfo {
435439

436440
private volatile long lastWriteTime;
437441

438-
public SessionInfo(String sessionId, @Nullable Principal user,
442+
443+
public SessionInfo(String sessionId, @Nullable Principal user, MessageChannel outboundChannel,
439444
@Nullable long[] clientHeartbeat, @Nullable long[] serverHeartbeat) {
440445

441446
this.sessionId = sessionId;
442447
this.user = user;
448+
this.clientOutboundChannel = outboundChannel;
443449
if (clientHeartbeat != null && serverHeartbeat != null) {
444450
this.readInterval = (clientHeartbeat[0] > 0 && serverHeartbeat[1] > 0 ?
445451
Math.max(clientHeartbeat[0], serverHeartbeat[1]) * HEARTBEAT_MULTIPLIER : 0);
@@ -462,6 +468,10 @@ public Principal getUser() {
462468
return this.user;
463469
}
464470

471+
public MessageChannel getClientOutboundChannel() {
472+
return this.clientOutboundChannel;
473+
}
474+
465475
public long getReadInterval() {
466476
return this.readInterval;
467477
}
@@ -505,8 +515,9 @@ public void run() {
505515
accessor.setUser(user);
506516
}
507517
initHeaders(accessor);
518+
accessor.setLeaveMutable(true);
508519
MessageHeaders headers = accessor.getMessageHeaders();
509-
getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
520+
info.getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
510521
}
511522
}
512523
}

spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class MessageBrokerRegistry {
5454
@Nullable
5555
private String userDestinationPrefix;
5656

57+
private boolean preservePublishOrder;
58+
5759
@Nullable
5860
private PathMatcher pathMatcher;
5961

@@ -160,6 +162,30 @@ protected String getUserDestinationPrefix() {
160162
return this.userDestinationPrefix;
161163
}
162164

165+
/**
166+
* Whether the client must receive messages in the order of publication.
167+
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
168+
* not be processed in the same order because the channel is backed by a
169+
* ThreadPoolExecutor that in turn does not guarantee processing in order.
170+
* <p>When this flag is set to {@code true} messages within the same session
171+
* will be sent to the {@code "clientOutboundChannel"} one at a time in
172+
* order to preserve the order of publication. Enable this only if needed
173+
* since there is some performance overhead to keep messages in order.
174+
* @param preservePublishOrder whether to publish in order
175+
* @since 5.1
176+
*/
177+
public void setPreservePublishOrder(boolean preservePublishOrder) {
178+
this.preservePublishOrder = preservePublishOrder;
179+
}
180+
181+
/**
182+
* Whether to ensure messages are received in the order of publication.
183+
* @since 5.1
184+
*/
185+
protected boolean isPreservePublishOrder() {
186+
return this.preservePublishOrder;
187+
}
188+
163189
/**
164190
* Configure the PathMatcher to use to match the destinations of incoming
165191
* messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.
@@ -209,6 +235,7 @@ protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerC
209235
SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel);
210236
handler.setPathMatcher(this.pathMatcher);
211237
handler.setCacheLimit(this.cacheLimit);
238+
handler.setPreservePublishOrder(this.preservePublishOrder);
212239
return handler;
213240
}
214241
return null;
@@ -217,7 +244,9 @@ protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerC
217244
@Nullable
218245
protected StompBrokerRelayMessageHandler getStompBrokerRelay(SubscribableChannel brokerChannel) {
219246
if (this.brokerRelayRegistration != null) {
220-
return this.brokerRelayRegistration.getMessageHandler(brokerChannel);
247+
StompBrokerRelayMessageHandler relay = this.brokerRelayRegistration.getMessageHandler(brokerChannel);
248+
relay.setPreservePublishOrder(this.preservePublishOrder);
249+
return relay;
221250
}
222251
return null;
223252
}

0 commit comments

Comments
 (0)