Skip to content

Commit d01e851

Browse files
Copilotzhfeng
andauthored
Document XA transaction limitations with MessageListener and provide correct consumption patterns (#457)
* Initial plan * Add documentation and test for XA transactions with message consumption - Document MessageListener limitations with XA transactions - Add comprehensive guide on correct patterns for XA message consumption - Create integration test demonstrating proper synchronous consumption with XA - Explain why async MessageListener callbacks don't participate in transactions - Provide alternative approaches including JCA, manual transaction management, and polling Co-authored-by: zhfeng <1246139+zhfeng@users.noreply.github.com> * Use assertTrue instead of assertEquals for boolean check Co-authored-by: zhfeng <1246139+zhfeng@users.noreply.github.com> * Address code review feedback - Remove System.out.println from test code - Clarify documentation about message loss with AUTO_ACKNOWLEDGE Co-authored-by: zhfeng <1246139+zhfeng@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: zhfeng <1246139+zhfeng@users.noreply.github.com>
1 parent 8e5559f commit d01e851

File tree

2 files changed

+262
-0
lines changed

2 files changed

+262
-0
lines changed

docs/modules/ROOT/pages/index.adoc

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,140 @@ quarkus.transaction-manager.enable-recovery=true
4444
----
4545
XA support is only available with `quarkus-artemis-jms`.
4646

47+
=== Consuming Messages with XA Transactions
48+
49+
When consuming messages with XA transactions enabled, it's important to understand how transaction boundaries work.
50+
51+
==== Synchronous Message Consumption (Recommended)
52+
53+
For reliable transaction management, use synchronous message consumption within a `@Transactional` method:
54+
55+
[source,java]
56+
----
57+
@Transactional
58+
public void processMessages(ConnectionFactory connectionFactory) {
59+
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
60+
JMSConsumer consumer = context.createConsumer(context.createQueue("myQueue"))) {
61+
62+
Message message = consumer.receive(1000L);
63+
if (message != null) {
64+
// Process message
65+
// If an exception is thrown, the transaction will be rolled back
66+
// and the message will be redelivered
67+
}
68+
}
69+
}
70+
----
71+
72+
[IMPORTANT]
73+
====
74+
When using XA transactions, always create the `JMSContext` with `Session.SESSION_TRANSACTED` mode, not `JMSContext.AUTO_ACKNOWLEDGE`. The `AUTO_ACKNOWLEDGE` mode bypasses transaction control and messages will be acknowledged immediately.
75+
====
76+
77+
==== Asynchronous Message Consumption (Advanced)
78+
79+
Using `MessageListener` with `setMessageListener()` for asynchronous message consumption requires special consideration:
80+
81+
[source,java]
82+
----
83+
// This pattern does NOT automatically participate in XA transactions!
84+
@ApplicationScoped
85+
public class MyListener implements MessageListener {
86+
87+
@Inject
88+
public MyListener(ConnectionFactory connectionFactory) {
89+
JMSContext context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
90+
JMSConsumer consumer = context.createConsumer(context.createQueue("myQueue"));
91+
consumer.setMessageListener(this);
92+
}
93+
94+
@Override
95+
public void onMessage(Message message) {
96+
// This callback runs on a JMS provider thread
97+
// There is NO automatic transaction boundary here
98+
// Exceptions thrown here will NOT roll back XA transactions
99+
throw new RuntimeException("Message not processed");
100+
// With AUTO_ACKNOWLEDGE, the message is already acknowledged and will be lost
101+
}
102+
}
103+
----
104+
105+
[WARNING]
106+
====
107+
The `MessageListener.onMessage()` callback is invoked on an internal JMS provider thread and **does not run within a transaction boundary** by default. When an exception is thrown:
108+
109+
* The message will be acknowledged (if using AUTO_ACKNOWLEDGE)
110+
* No XA transaction rollback will occur
111+
* The message will NOT be redelivered or sent to the Dead Letter Queue (DLQ)
112+
* The message is effectively lost
113+
114+
This is a fundamental limitation of the JMS specification when using MessageListener directly.
115+
====
116+
117+
If you need asynchronous message consumption with XA transaction support, consider these alternatives:
118+
119+
1. **Use Quarkus Messaging with Artemis JCA**: The `quarkus-artemis-jms-ra` extension provides proper Message-Driven Bean support with automatic transaction management around message listeners.
120+
121+
2. **Manual Transaction Management**: Manually manage transactions within the listener:
122+
+
123+
[source,java]
124+
----
125+
@ApplicationScoped
126+
public class MyListener implements MessageListener {
127+
128+
@Inject
129+
TransactionManager transactionManager;
130+
131+
@Override
132+
public void onMessage(Message message) {
133+
try {
134+
transactionManager.begin();
135+
// Process message
136+
transactionManager.commit();
137+
} catch (Exception e) {
138+
try {
139+
transactionManager.rollback();
140+
} catch (Exception rbEx) {
141+
// Handle rollback exception
142+
}
143+
// Note: Manual session management may still be required
144+
// for proper message redelivery
145+
}
146+
}
147+
}
148+
----
149+
+
150+
[NOTE]
151+
====
152+
Even with manual transaction management, proper message redelivery requires careful session acknowledgement handling. For production use, consider using a JCA-based solution like `quarkus-artemis-jms-ra`.
153+
====
154+
155+
3. **Polling Pattern**: Use a scheduled method to poll for messages synchronously within a transaction:
156+
+
157+
[source,java]
158+
----
159+
@ApplicationScoped
160+
public class MessagePoller {
161+
162+
@Inject
163+
ConnectionFactory connectionFactory;
164+
165+
@Scheduled(every = "1s")
166+
@Transactional
167+
public void pollMessages() {
168+
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
169+
JMSConsumer consumer = context.createConsumer(context.createQueue("myQueue"))) {
170+
171+
Message message = consumer.receiveNoWait();
172+
if (message != null) {
173+
// Process message
174+
// Transaction will be rolled back on exception
175+
}
176+
}
177+
}
178+
}
179+
----
180+
47181
== Custom ConnectionFactory
48182
For those messaging drivers which do not have quarkus extension, such as `ibmmq-client`. You need to create a custom `ConnectionFactory` and wrap it by yourself. Here is an example:
49183
[source,java]
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package io.quarkiverse.messaginghub.pooled.jms.it;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNull;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
7+
import java.util.UUID;
8+
9+
import jakarta.inject.Inject;
10+
import jakarta.jms.ConnectionFactory;
11+
import jakarta.jms.JMSConsumer;
12+
import jakarta.jms.JMSContext;
13+
import jakarta.jms.JMSException;
14+
import jakarta.jms.Message;
15+
import jakarta.jms.Session;
16+
import jakarta.transaction.Transactional;
17+
18+
import org.junit.jupiter.api.Test;
19+
20+
import io.quarkus.test.junit.QuarkusTest;
21+
import io.quarkus.test.junit.TestProfile;
22+
23+
/**
24+
* Test demonstrating the issue with MessageListener and XA transactions,
25+
* and showing the correct approach for consuming messages with XA.
26+
*/
27+
@QuarkusTest
28+
@TestProfile(JmsXATestProfile.class)
29+
public class PooledXAMessageListenerTest {
30+
31+
@Inject
32+
ConnectionFactory connectionFactory;
33+
34+
private static final String TEST_QUEUE = "xa-listener-test-queue";
35+
36+
/**
37+
* Test demonstrating the correct way to consume messages with XA transactions:
38+
* Use synchronous receive() within a @Transactional method.
39+
* When an exception is thrown, the transaction is rolled back and the message is redelivered.
40+
*/
41+
@Test
42+
public void testSynchronousConsumptionWithXA() throws Exception {
43+
String messageBody = "test-message-" + UUID.randomUUID();
44+
45+
// Send a message
46+
sendMessage(messageBody);
47+
48+
// Try to receive and process with exception - should rollback
49+
boolean exceptionThrown = false;
50+
try {
51+
receiveAndProcessWithException();
52+
} catch (RuntimeException e) {
53+
exceptionThrown = true;
54+
}
55+
56+
assertTrue(exceptionThrown, "Expected exception to be thrown");
57+
58+
// Message should still be available due to rollback
59+
String receivedBody = receiveMessage();
60+
assertEquals(messageBody, receivedBody, "Message should be redelivered after rollback");
61+
62+
// Successfully process the message
63+
receiveAndProcess();
64+
65+
// Message should now be consumed
66+
String noMessage = receiveMessage();
67+
assertNull(noMessage, "Message should be consumed after successful processing");
68+
}
69+
70+
/**
71+
* Sends a message to the test queue within a transaction.
72+
*/
73+
@Transactional
74+
void sendMessage(String body) {
75+
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED)) {
76+
context.createProducer().send(context.createQueue(TEST_QUEUE), body);
77+
}
78+
}
79+
80+
/**
81+
* Receives and processes a message within a transaction.
82+
* This is the CORRECT pattern for XA transaction support.
83+
*/
84+
@Transactional
85+
void receiveAndProcess() throws JMSException {
86+
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
87+
JMSConsumer consumer = context.createConsumer(context.createQueue(TEST_QUEUE))) {
88+
89+
Message message = consumer.receive(5000L);
90+
if (message != null) {
91+
String body = message.getBody(String.class);
92+
// Process message successfully
93+
}
94+
}
95+
}
96+
97+
/**
98+
* Receives a message and throws an exception within a transaction.
99+
* The transaction will be rolled back and the message will be redelivered.
100+
*/
101+
@Transactional
102+
void receiveAndProcessWithException() throws JMSException {
103+
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
104+
JMSConsumer consumer = context.createConsumer(context.createQueue(TEST_QUEUE))) {
105+
106+
Message message = consumer.receive(5000L);
107+
if (message != null) {
108+
String body = message.getBody(String.class);
109+
throw new RuntimeException("Simulated processing failure");
110+
}
111+
}
112+
}
113+
114+
/**
115+
* Receives a message without transaction (for verification).
116+
*/
117+
String receiveMessage() throws JMSException {
118+
try (JMSContext context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);
119+
JMSConsumer consumer = context.createConsumer(context.createQueue(TEST_QUEUE))) {
120+
121+
Message message = consumer.receive(1000L);
122+
if (message != null) {
123+
return message.getBody(String.class);
124+
}
125+
return null;
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)