Skip to content

Commit 9f62940

Browse files
committed
fix(client): handle send failures in async send to prevent deadlocks
1 parent 74c22b7 commit 9f62940

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2409,6 +2409,7 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin
24092409

24102410
final ActiveMQMessage msg;
24112411
final Message originalMessage = message;
2412+
JMSException sendException = null;
24122413

24132414
synchronized (sendMutex) {
24142415
doStartTransaction();
@@ -2463,23 +2464,31 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin
24632464

24642465
// Perform the wire-level send synchronously while holding sendMutex.
24652466
// This ensures messages are delivered to the broker in send order.
2467+
// Capture any send failure so the callback can be invoked outside sendMutex,
2468+
// preventing a deadlock if the CompletionListener calls producer.send().
24662469
try {
24672470
this.connection.syncSendPacket(msg);
24682471
} catch (JMSException sendEx) {
2469-
// Send failed - invoke onException on executor thread (not sender thread)
2470-
final Future<?> future = asyncSendExecutor.submit(() -> {
2471-
IN_COMPLETION_LISTENER_CALLBACK.set(true);
2472-
try {
2473-
completionListener.onException(originalMessage, sendEx);
2474-
} finally {
2475-
IN_COMPLETION_LISTENER_CALLBACK.set(false);
2476-
}
2477-
});
2478-
awaitAsyncSendFuture(future, originalMessage, completionListener);
2479-
return;
2472+
sendException = sendEx;
24802473
}
24812474
}
24822475

2476+
// Both success and error callbacks are invoked outside sendMutex to avoid deadlock.
2477+
// A CompletionListener is allowed to call producer.send() which would re-acquire sendMutex.
2478+
if (sendException != null) {
2479+
final JMSException finalEx = sendException;
2480+
final Future<?> future = asyncSendExecutor.submit(() -> {
2481+
IN_COMPLETION_LISTENER_CALLBACK.set(true);
2482+
try {
2483+
completionListener.onException(originalMessage, finalEx);
2484+
} finally {
2485+
IN_COMPLETION_LISTENER_CALLBACK.set(false);
2486+
}
2487+
});
2488+
awaitAsyncSendFuture(future, originalMessage, completionListener);
2489+
return;
2490+
}
2491+
24832492
// Send succeeded - invoke onCompletion on executor thread (not sender thread) per spec 7.3.8
24842493
final Future<?> future = asyncSendExecutor.submit(() -> {
24852494
IN_COMPLETION_LISTENER_CALLBACK.set(true);

0 commit comments

Comments
 (0)