Skip to content

Commit c6fe0f7

Browse files
jbertramtabish121
authored andcommitted
ARTEMIS-5874 mitigate dead-lock in STOMP
1 parent 2ce57ed commit c6fe0f7

File tree

1 file changed

+11
-24
lines changed
  • artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp

1 file changed

+11
-24
lines changed

artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.Future;
2929
import java.util.concurrent.FutureTask;
3030
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3132

3233
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
3334
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -81,12 +82,11 @@ public final class StompConnection extends AbstractRemotingConnection {
8182
//this means login is valid. (stomp connection ok)
8283
private boolean valid;
8384

84-
private boolean destroyed = false;
85+
private static final AtomicIntegerFieldUpdater<StompConnection> DESTROYED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(StompConnection.class, "destroyed");
86+
private volatile int destroyed;
8587

8688
private final Acceptor acceptorUsed;
8789

88-
private final Object failLock = new Object();
89-
9090
private final boolean enableMessageID;
9191

9292
private final int minLargeMessageSize;
@@ -224,15 +224,9 @@ public void checkRoutingSemantics(String destination, RoutingType routingType) t
224224

225225
@Override
226226
public void destroy() {
227-
synchronized (failLock) {
228-
if (destroyed) {
229-
return;
230-
}
231-
232-
destroyed = true;
227+
if (DESTROYED_UPDATER.compareAndSet(this, 0, 1)) {
228+
internalClose();
233229
}
234-
235-
internalClose();
236230
}
237231

238232
public Acceptor getAcceptorUsed() {
@@ -255,24 +249,17 @@ private void internalClose() {
255249

256250
@Override
257251
public void fail(final ActiveMQException me) {
258-
synchronized (failLock) {
259-
if (destroyed) {
260-
return;
261-
}
262-
252+
if (DESTROYED_UPDATER.compareAndSet(this, 0, 1)) {
263253
StompFrame frame = frameHandler.createStompFrame(Stomp.Responses.ERROR);
264254
frame.addHeader(Stomp.Headers.Error.MESSAGE, me.getMessage());
265255
sendFrame(frame, null);
256+
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
266257

267-
destroyed = true;
268-
}
269-
270-
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
258+
// Then call the listeners
259+
callFailureListeners(me);
271260

272-
// Then call the listeners
273-
callFailureListeners(me);
274-
275-
internalClose();
261+
internalClose();
262+
}
276263
}
277264

278265
@Override

0 commit comments

Comments
 (0)