Skip to content

Commit d58ca06

Browse files
authored
fix(#1731): adapter deadlock (#1735)
* fix(#1731): resolve deadlock in rollbackPendingCursorAdditions by invoking onCompletion outside synchronized block * test(#1731): attempt to simplify the test code by using only one inner class to implement both.
1 parent 1f2114e commit d58ca06

File tree

2 files changed

+275
-4
lines changed

2 files changed

+275
-4
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -790,18 +790,25 @@ public void onAdd(MessageContext messageContext) {
790790
}
791791

792792
public void rollbackPendingCursorAdditions(MessageId messageId) {
793+
MessageContext toRollback = null;
793794
synchronized (indexOrderedCursorUpdates) {
794795
for (int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
795-
MessageContext mc = indexOrderedCursorUpdates.get(i);
796+
final MessageContext mc = indexOrderedCursorUpdates.get(i);
796797
if (mc.message.getMessageId().equals(messageId)) {
797798
indexOrderedCursorUpdates.remove(mc);
798-
if (mc.onCompletion != null) {
799-
mc.onCompletion.run();
800-
}
799+
toRollback = mc;
801800
break;
802801
}
803802
}
804803
}
804+
// Invoke onCompletion outside synchronized(indexOrderedCursorUpdates) to avoid a
805+
// lock-ordering deadlock with JDBCMessageStore.addMessage, which holds
806+
// pendingAdditions while calling indexListener.onAdd() (which acquires
807+
// indexOrderedCursorUpdates). The onCompletion callback acquires pendingAdditions,
808+
// so calling it inside the lock produces a deadlock cycle.
809+
if (toRollback != null && toRollback.onCompletion != null) {
810+
toRollback.onCompletion.run();
811+
}
805812
}
806813

807814
private void doPendingCursorAdditions() throws Exception {
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.store.jdbc.h2;
18+
19+
import java.io.IOException;
20+
import java.sql.SQLException;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import jakarta.jms.Connection;
28+
import jakarta.jms.MessageProducer;
29+
import jakarta.jms.Session;
30+
31+
import org.apache.activemq.ActiveMQConnectionFactory;
32+
import org.apache.activemq.broker.BrokerService;
33+
import org.apache.activemq.command.ActiveMQDestination;
34+
import org.apache.activemq.command.ActiveMQQueue;
35+
import org.apache.activemq.command.MessageId;
36+
import org.apache.activemq.command.XATransactionId;
37+
import org.apache.activemq.store.IndexListener;
38+
import org.apache.activemq.store.MessageStore;
39+
import org.apache.activemq.store.ProxyMessageStore;
40+
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
41+
import org.apache.activemq.store.jdbc.TransactionContext;
42+
import org.apache.activemq.store.jdbc.adapter.H2JDBCAdapter;
43+
import org.junit.After;
44+
import org.junit.Before;
45+
import org.junit.Rule;
46+
import org.junit.Test;
47+
import org.junit.rules.Timeout;
48+
49+
import static org.junit.Assert.assertTrue;
50+
51+
/**
52+
* Reproduces the JDBC store deadlock described in GitHub issue #1731.
53+
*
54+
* <p>Lock-ordering inversion between:
55+
* <ul>
56+
* <li>{@code pendingAdditions} (JDBCMessageStore) — held by {@code addMessage} while calling
57+
* {@code indexListener.onAdd()}, and needed later by the cursor-completion callback</li>
58+
* <li>{@code indexOrderedCursorUpdates} (Queue) — held by {@code rollbackPendingCursorAdditions}
59+
* while calling that same cursor-completion callback</li>
60+
* </ul>
61+
*
62+
* <p>Fix: move {@code mc.onCompletion.run()} outside {@code synchronized(indexOrderedCursorUpdates)}
63+
* in {@code Queue.rollbackPendingCursorAdditions}.
64+
*
65+
* @see <a href="https://github.com/apache/activemq/issues/1731">GitHub #1731</a>
66+
*/
67+
public class H2JDBCDeadlockOnSendExceptionTest {
68+
69+
private static final String QUEUE_NAME = "test.deadlock.queue";
70+
71+
private BrokerService broker;
72+
private ActiveMQConnectionFactory connectionFactory;
73+
private final DeadlockCoordinator coordinator = new DeadlockCoordinator();
74+
75+
/** Fail the test if it hangs beyond 30 seconds — that indicates a deadlock. */
76+
@Rule
77+
public final Timeout testTimeout = Timeout.seconds(30);
78+
79+
@Before
80+
public void setUp() throws Exception {
81+
broker = new BrokerService();
82+
broker.setUseJmx(false);
83+
broker.setPersistenceAdapter(createJDBCAdapter());
84+
broker.addConnector("tcp://0.0.0.0:0");
85+
broker.start();
86+
broker.waitUntilStarted();
87+
connectionFactory = new ActiveMQConnectionFactory(
88+
broker.getTransportConnectors().get(0).getPublishableConnectString());
89+
}
90+
91+
@After
92+
public void tearDown() throws Exception {
93+
if (broker != null && broker.isStarted()) {
94+
broker.stop();
95+
}
96+
}
97+
98+
private JDBCPersistenceAdapter createJDBCAdapter() throws IOException {
99+
final JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter() {
100+
@Override
101+
public MessageStore createQueueMessageStore(final ActiveMQQueue destination) throws IOException {
102+
final MessageStore base = super.createQueueMessageStore(destination);
103+
// Intercept registerIndexListener to wrap Queue with the coordinator.
104+
return new ProxyMessageStore(base) {
105+
@Override
106+
public void registerIndexListener(final IndexListener indexListener) {
107+
coordinator.setQueueListener(indexListener);
108+
super.registerIndexListener(coordinator);
109+
}
110+
};
111+
}
112+
};
113+
jdbc.setDataSource(H2DB.createDataSource("H2JDBCDeadlockTest"));
114+
jdbc.setAdapter(coordinator.newAdapter());
115+
jdbc.deleteAllMessages();
116+
jdbc.setUseLock(false);
117+
return jdbc;
118+
}
119+
120+
/**
121+
* Verifies that no deadlock occurs when a JDBC exception fires during {@code addMessage}
122+
* while another {@code addMessage} executes concurrently.
123+
*
124+
* <p>Before the fix: test hangs → {@code @Rule Timeout} fails it.
125+
* After the fix: both threads complete normally.
126+
*/
127+
@Test
128+
public void testNoDeadlockOnJDBCException() throws Exception {
129+
final CountDownLatch rollbackHoldsIndexLock = new CountDownLatch(1);
130+
final CountDownLatch threadBHoldsPendingLock = new CountDownLatch(1);
131+
coordinator.arm(rollbackHoldsIndexLock, threadBHoldsPendingLock);
132+
133+
final ExecutorService executor = Executors.newFixedThreadPool(2);
134+
final CountDownLatch allDone = new CountDownLatch(2);
135+
136+
// Thread A: sends the message whose DB write will be injected to fail.
137+
// The failure triggers Queue.rollbackPendingCursorAdditions, which acquires
138+
// indexOrderedCursorUpdates and calls mc.onCompletion (inside that lock before fix).
139+
executor.execute(() -> {
140+
try (final Connection conn = connectionFactory.createConnection();
141+
final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
142+
conn.start();
143+
session.createProducer(session.createQueue(QUEUE_NAME))
144+
.send(session.createTextMessage("will-fail"));
145+
} catch (final Exception ignored) {
146+
// Expected: the broker propagates the injected IOException as JMSException.
147+
} finally {
148+
allDone.countDown();
149+
}
150+
});
151+
152+
// Wait until Thread A's rollback has acquired indexOrderedCursorUpdates.
153+
assertTrue("Thread A rollback should start", rollbackHoldsIndexLock.await(10, TimeUnit.SECONDS));
154+
155+
// Thread B: sends a normal message while Thread A's rollback holds indexOrderedCursorUpdates.
156+
// Thread B enters synchronized(pendingAdditions) in addMessage, then calls
157+
// indexListener.onAdd() which needs indexOrderedCursorUpdates → BLOCKED (before fix).
158+
// Meanwhile Thread A needs pendingAdditions (held by Thread B) → DEADLOCK.
159+
executor.execute(() -> {
160+
try (final Connection conn = connectionFactory.createConnection();
161+
final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
162+
conn.start();
163+
session.createProducer(session.createQueue(QUEUE_NAME))
164+
.send(session.createTextMessage("should-succeed"));
165+
} catch (final Exception ignored) {
166+
} finally {
167+
allDone.countDown();
168+
}
169+
});
170+
171+
assertTrue("Both threads should complete without deadlock",
172+
allDone.await(15, TimeUnit.SECONDS));
173+
executor.shutdown();
174+
}
175+
176+
// -------------------------------------------------------------------------
177+
178+
/**
179+
* Coordinates the precise timing needed to expose the ABBA lock cycle.
180+
*
181+
* <p>Implements {@link IndexListener} to intercept {@code onAdd()} calls:
182+
* <ul>
183+
* <li>For the <em>first</em> message (Thread A's fail message): wraps {@code mc.onCompletion}
184+
* so that when it is called from inside {@code synchronized(indexOrderedCursorUpdates)}
185+
* it signals Thread B and waits for Thread B to hold {@code pendingAdditions} before
186+
* trying to acquire it — completing the deadlock cycle.</li>
187+
* <li>For the <em>second</em> message (Thread B): signals Thread A that
188+
* {@code pendingAdditions} is held, then calls the real {@code onAdd} which tries
189+
* to acquire {@code indexOrderedCursorUpdates} (held by Thread A) — BLOCKED.</li>
190+
* </ul>
191+
*
192+
* <p>Also provides a {@link H2JDBCAdapter} that fails the first {@code doAddMessage}
193+
* call, which is what pushes Thread A into the rollback path.
194+
*/
195+
static class DeadlockCoordinator implements IndexListener {
196+
197+
private IndexListener queueListener;
198+
private CountDownLatch rollbackHoldsIndexLock;
199+
private CountDownLatch threadBHoldsPendingLock;
200+
201+
/** Set to true (on Thread A) in onAdd so the adapter throws on the same thread. */
202+
private final AtomicBoolean failNextWrite = new AtomicBoolean(false);
203+
/** Flips to false after Thread A's onCompletion wrapper is registered. */
204+
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
205+
206+
void setQueueListener(final IndexListener queueListener) {
207+
this.queueListener = queueListener;
208+
}
209+
210+
void arm(final CountDownLatch rollbackHoldsIndexLock,
211+
final CountDownLatch threadBHoldsPendingLock) {
212+
this.rollbackHoldsIndexLock = rollbackHoldsIndexLock;
213+
this.threadBHoldsPendingLock = threadBHoldsPendingLock;
214+
}
215+
216+
H2JDBCAdapter newAdapter() {
217+
return new H2JDBCAdapter() {
218+
@Override
219+
public void doAddMessage(final TransactionContext c, final long sequence,
220+
final MessageId messageID,
221+
final ActiveMQDestination destination,
222+
final byte[] data, final long expiration,
223+
final byte priority, final XATransactionId xid)
224+
throws SQLException, IOException {
225+
if (failNextWrite.compareAndSet(true, false)) {
226+
throw new SQLException(
227+
"Simulated DB failure to reproduce deadlock (GitHub #1731)", "S1000");
228+
}
229+
super.doAddMessage(c, sequence, messageID, destination,
230+
data, expiration, priority, xid);
231+
}
232+
};
233+
}
234+
235+
@Override
236+
public void onAdd(final IndexListener.MessageContext mc) {
237+
if (firstMessage.compareAndSet(true, false)) {
238+
// Thread A — inside synchronized(pendingAdditions) in JDBCMessageStore.addMessage.
239+
// Mark the write to fail, then wrap onCompletion with deadlock-triggering logic.
240+
failNextWrite.set(true);
241+
final Runnable original = mc.onCompletion;
242+
final IndexListener.MessageContext wrapped = new IndexListener.MessageContext(
243+
mc.context, mc.message, () -> {
244+
// Called from Queue.rollbackPendingCursorAdditions while holding
245+
// synchronized(indexOrderedCursorUpdates).
246+
rollbackHoldsIndexLock.countDown(); // signal Thread B to start
247+
try {
248+
threadBHoldsPendingLock.await(10, TimeUnit.SECONDS); // wait for Thread B
249+
} catch (final InterruptedException e) {
250+
Thread.currentThread().interrupt();
251+
}
252+
original.run(); // needs pendingAdditions → DEADLOCK if Thread B holds it
253+
});
254+
queueListener.onAdd(wrapped);
255+
} else {
256+
// Thread B — also inside synchronized(pendingAdditions).
257+
// Signal Thread A that pendingAdditions is now held.
258+
threadBHoldsPendingLock.countDown();
259+
// Needs indexOrderedCursorUpdates (held by Thread A's rollback) → BLOCKED.
260+
queueListener.onAdd(mc);
261+
}
262+
}
263+
}
264+
}

0 commit comments

Comments
 (0)