Skip to content

Commit 792e9fa

Browse files
committed
fix(client): update session checks to use an atomic reference to the thread instead of a global thread local
We need the check to be session scoped and to JVM scoped per spec.
1 parent 9f62940 commit 792e9fa

File tree

4 files changed

+32
-28
lines changed

4 files changed

+32
-28
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -609,8 +609,10 @@ public void start() throws JMSException {
609609
*/
610610
@Override
611611
public void stop() throws JMSException {
612-
ActiveMQSession.checkNotInCompletionListenerCallback("stop");
613-
ActiveMQSession.checkNotInMessageListenerCallback("stop");
612+
for (final ActiveMQSession session : sessions) {
613+
session.checkNotInCompletionListenerCallback("stop");
614+
session.checkNotInMessageListenerCallback("stop");
615+
}
614616
doStop(true);
615617
}
616618

@@ -679,8 +681,10 @@ void doStop(boolean checkClosed) throws JMSException {
679681
*/
680682
@Override
681683
public void close() throws JMSException {
682-
ActiveMQSession.checkNotInCompletionListenerCallback("close");
683-
ActiveMQSession.checkNotInMessageListenerCallback("close");
684+
for (final ActiveMQSession session : sessions) {
685+
session.checkNotInCompletionListenerCallback("close");
686+
session.checkNotInMessageListenerCallback("close");
687+
}
684688
try {
685689
// If we were running, lets stop first.
686690
if (!closed.get() && !transportFailed.get()) {

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,11 +1454,11 @@ public void dispatch(MessageDispatch md) {
14541454
try {
14551455
boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
14561456
if (!expired) {
1457-
ActiveMQSession.IN_MESSAGE_LISTENER_CALLBACK.set(true);
1457+
session.messageListenerThread.set(Thread.currentThread());
14581458
try {
14591459
listener.onMessage(message);
14601460
} finally {
1461-
ActiveMQSession.IN_MESSAGE_LISTENER_CALLBACK.set(false);
1461+
session.messageListenerThread.set(null);
14621462
}
14631463
}
14641464
afterMessageIsConsumed(md, expired);

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public Destination getDestination() throws JMSException {
168168
*/
169169
@Override
170170
public void close() throws JMSException {
171-
ActiveMQSession.checkNotInCompletionListenerCallback("close");
171+
session.checkNotInCompletionListenerCallback("close");
172172
if (!closed) {
173173
dispose();
174174
this.session.asyncSendPacket(info.createRemoveCommand());

activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.concurrent.atomic.AtomicBoolean;
3636
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.concurrent.atomic.AtomicReference;
3738

3839
import jakarta.jms.BytesMessage;
3940
import jakarta.jms.CompletionListener;
4041
import jakarta.jms.Destination;
4142
import jakarta.jms.IllegalStateException;
42-
import jakarta.jms.IllegalStateRuntimeException;
4343
import jakarta.jms.InvalidDestinationException;
4444
import jakarta.jms.InvalidSelectorException;
4545
import jakarta.jms.JMSException;
@@ -65,7 +65,6 @@
6565
import jakarta.jms.TopicSubscriber;
6666
import jakarta.jms.TransactionRolledBackException;
6767

68-
import org.apache.activemq.selector.SelectorParser;
6968
import org.apache.activemq.blob.BlobDownloader;
7069
import org.apache.activemq.blob.BlobTransferPolicy;
7170
import org.apache.activemq.blob.BlobUploader;
@@ -249,13 +248,13 @@ public static interface DeliveryListener {
249248
private final ExecutorService asyncSendExecutor = Executors.newSingleThreadExecutor(
250249
r -> new Thread(r, "ActiveMQ async-send"));
251250

252-
// Set to true on the executor thread while a CompletionListener callback is executing.
253-
// Used to detect illegal session operations (close/commit/rollback) from within a callback.
254-
static final ThreadLocal<Boolean> IN_COMPLETION_LISTENER_CALLBACK = ThreadLocal.withInitial(() -> false);
251+
// Tracks the thread currently executing a CompletionListener callback for this session.
252+
// Session-scoped (instance field) so checks only apply to this session's own callbacks,
253+
// not to callbacks from other sessions running on the same thread.
254+
final AtomicReference<Thread> completionListenerThread = new AtomicReference<>();
255255

256-
// Set to true on the dispatch thread while a MessageListener.onMessage() callback is executing.
257-
// Used to detect illegal connection/session operations from within a MessageListener callback.
258-
static final ThreadLocal<Boolean> IN_MESSAGE_LISTENER_CALLBACK = ThreadLocal.withInitial(() -> false);
256+
// Tracks the thread currently executing a MessageListener.onMessage() callback for this session.
257+
final AtomicReference<Thread> messageListenerThread = new AtomicReference<>();
259258

260259
/**
261260
* Construct the Session
@@ -1088,11 +1087,11 @@ public void run() {
10881087
}
10891088

10901089
LOG.trace("{} onMessage({})", this, message.getMessageId());
1091-
IN_MESSAGE_LISTENER_CALLBACK.set(true);
1090+
messageListenerThread.set(Thread.currentThread());
10921091
try {
10931092
messageListener.onMessage(message);
10941093
} finally {
1095-
IN_MESSAGE_LISTENER_CALLBACK.set(false);
1094+
messageListenerThread.set(null);
10961095
}
10971096

10981097
} catch (Throwable e) {
@@ -2478,11 +2477,11 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin
24782477
if (sendException != null) {
24792478
final JMSException finalEx = sendException;
24802479
final Future<?> future = asyncSendExecutor.submit(() -> {
2481-
IN_COMPLETION_LISTENER_CALLBACK.set(true);
2480+
completionListenerThread.set(Thread.currentThread());
24822481
try {
24832482
completionListener.onException(originalMessage, finalEx);
24842483
} finally {
2485-
IN_COMPLETION_LISTENER_CALLBACK.set(false);
2484+
completionListenerThread.set(null);
24862485
}
24872486
});
24882487
awaitAsyncSendFuture(future, originalMessage, completionListener);
@@ -2491,14 +2490,14 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin
24912490

24922491
// Send succeeded - invoke onCompletion on executor thread (not sender thread) per spec 7.3.8
24932492
final Future<?> future = asyncSendExecutor.submit(() -> {
2494-
IN_COMPLETION_LISTENER_CALLBACK.set(true);
2493+
completionListenerThread.set(Thread.currentThread());
24952494
try {
24962495
completionListener.onCompletion(originalMessage);
24972496
} catch (Exception e) {
24982497
// Per spec 7.3.2, exceptions thrown by the callback are swallowed
24992498
LOG.warn("CompletionListener.onCompletion threw an exception", e);
25002499
} finally {
2501-
IN_COMPLETION_LISTENER_CALLBACK.set(false);
2500+
completionListenerThread.set(null);
25022501
}
25032502
});
25042503
awaitAsyncSendFuture(future, originalMessage, completionListener);
@@ -2519,22 +2518,23 @@ private void awaitAsyncSendFuture(final Future<?> future, final Message original
25192518

25202519
/**
25212520
* Throws {@link jakarta.jms.IllegalStateException} if the current thread is executing a
2522-
* CompletionListener callback, per Jakarta Messaging 3.1 spec section 7.3.5.
2523-
* The classic JMS API uses checked IllegalStateException (not the runtime variant).
2521+
* CompletionListener callback for this session, per Jakarta Messaging 3.1 spec section 7.3.5.
2522+
* The check is session-scoped: callbacks from other sessions on the same thread are unaffected.
25242523
*/
2525-
static void checkNotInCompletionListenerCallback(final String operation) throws jakarta.jms.IllegalStateException {
2526-
if (Boolean.TRUE.equals(IN_COMPLETION_LISTENER_CALLBACK.get())) {
2524+
void checkNotInCompletionListenerCallback(final String operation) throws jakarta.jms.IllegalStateException {
2525+
if (Thread.currentThread() == completionListenerThread.get()) {
25272526
throw new jakarta.jms.IllegalStateException(
25282527
"Cannot call " + operation + "() from within a CompletionListener callback");
25292528
}
25302529
}
25312530

25322531
/**
25332532
* Throws {@link jakarta.jms.IllegalStateException} if the current thread is executing a
2534-
* MessageListener.onMessage() callback, per Jakarta Messaging spec section 4.4.
2533+
* MessageListener.onMessage() callback for this session, per Jakarta Messaging spec section 4.4.
2534+
* The check is session-scoped: callbacks from other sessions on the same thread are unaffected.
25352535
*/
2536-
static void checkNotInMessageListenerCallback(final String operation) throws jakarta.jms.IllegalStateException {
2537-
if (Boolean.TRUE.equals(IN_MESSAGE_LISTENER_CALLBACK.get())) {
2536+
void checkNotInMessageListenerCallback(final String operation) throws jakarta.jms.IllegalStateException {
2537+
if (Thread.currentThread() == messageListenerThread.get()) {
25382538
throw new jakarta.jms.IllegalStateException(
25392539
"Cannot call " + operation + "() from within a MessageListener callback");
25402540
}

0 commit comments

Comments
 (0)