Skip to content

Commit e0d393e

Browse files
committed
Fix issue in STOMP broker relay
1 parent 5d20b75 commit e0d393e

File tree

3 files changed

+13
-32
lines changed

3 files changed

+13
-32
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public String getUser() {
180180
}
181181

182182
public String getTargetDestination(String sessionId) {
183-
return (this.targetDestination != null) ? this.targetDestination + "/" + sessionId : null;
183+
return (this.targetDestination != null) ? this.targetDestination + sessionId : null;
184184
}
185185
}
186186

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.concurrent.BlockingQueue;
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.LinkedBlockingQueue;
27-
import java.util.concurrent.TimeUnit;
2827

2928
import org.apache.commons.logging.Log;
3029
import org.apache.commons.logging.LogFactory;
@@ -232,7 +231,7 @@ public void stop() {
232231
}
233232
this.running = false;
234233
try {
235-
this.tcpClient.close().await(5000, TimeUnit.MILLISECONDS);
234+
this.tcpClient.close().await();
236235
}
237236
catch (Throwable t) {
238237
logger.error("Failed to close reactor TCP client", t);
@@ -321,12 +320,13 @@ protected void handleInternal(Message<?> message, SimpMessageType messageType, S
321320
}
322321
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
323322
RelaySession session = this.relaySessions.remove(sessionId);
324-
if (session != null) {
323+
if (session == null) {
325324
if (logger.isTraceEnabled()) {
326325
logger.trace("Session already removed, sessionId=" + sessionId);
327326
}
328-
session.forward(message);
327+
return;
329328
}
329+
session.forward(message);
330330
}
331331
else {
332332
RelaySession session = this.relaySessions.get(sessionId);
@@ -404,16 +404,9 @@ private void readStompFrame(String stompFrame) {
404404
}
405405
return;
406406
}
407-
if (StompCommand.ERROR == headers.getStompCommand()) {
408-
if (logger.isDebugEnabled()) {
409-
logger.warn("STOMP ERROR: " + headers.getMessage() + ". Removing session id=" + this.sessionId);
410-
}
411-
relaySessions.remove(this.sessionId);
412-
}
413407

414408
headers.setSessionId(this.sessionId);
415409
message = MessageBuilder.fromMessage(message).copyHeaders(headers.toMap()).build();
416-
417410
sendMessageToClient(message);
418411
}
419412

@@ -455,25 +448,13 @@ public void forward(Message<?> message) {
455448
}
456449

457450
private boolean forwardInternal(Message<?> message, TcpConnection<String, String> connection) {
458-
459-
try {
460-
if (logger.isTraceEnabled()) {
461-
logger.trace("Forwarding message to STOMP broker, message id=" + message.getHeaders().getId());
462-
}
463-
byte[] bytes = stompMessageConverter.fromMessage(message);
464-
connection.send(new String(bytes, Charset.forName("UTF-8")));
465-
}
466-
catch (Throwable ex) {
467-
logger.error("Forward failed message id=" + message.getHeaders().getId(), ex);
468-
try {
469-
connection.close();
470-
}
471-
catch (Throwable t) {
472-
// ignore
473-
}
474-
sendError(this.sessionId, "Failed to forward message " + message + ": " + ex.getMessage());
475-
return false;
451+
if (logger.isTraceEnabled()) {
452+
logger.trace("Forwarding message to STOMP broker, message id=" + message.getHeaders().getId());
476453
}
454+
byte[] bytes = stompMessageConverter.fromMessage(message);
455+
connection.send(new String(bytes, Charset.forName("UTF-8")));
456+
457+
// TODO: detect if send fails and send ERROR downstream (except on DISCONNECT)
477458
return true;
478459
}
479460

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,11 @@ public void handleMessage(Message<?> message) {
244244
WebSocketSession session = this.sessions.get(sessionId);
245245
if (session == null) {
246246
// TODO: failed message delivery mechanism
247-
logger.error("Ignoring message, session not found: " + sessionId);
247+
logger.error("Ignoring message, sessionId not found: " + message);
248248
return;
249249
}
250250

251-
if (headers.getSubscriptionId() == null) {
251+
if (StompCommand.MESSAGE.equals(headers.getStompCommand()) && (headers.getSubscriptionId() == null)) {
252252
// TODO: failed message delivery mechanism
253253
logger.error("Ignoring message, no subscriptionId header: " + message);
254254
return;

0 commit comments

Comments
 (0)