Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,140 @@ quarkus.transaction-manager.enable-recovery=true
----
XA support is only available with `quarkus-artemis-jms`.

=== Consuming Messages with XA Transactions

When consuming messages with XA transactions enabled, it's important to understand how transaction boundaries work.

==== Synchronous Message Consumption (Recommended)

For reliable transaction management, use synchronous message consumption within a `@Transactional` method:

[source,java]
----
@Transactional
public void processMessages(ConnectionFactory connectionFactory) {
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
JMSConsumer consumer = context.createConsumer(context.createQueue("myQueue"))) {

Message message = consumer.receive(1000L);
if (message != null) {
// Process message
// If an exception is thrown, the transaction will be rolled back
// and the message will be redelivered
}
}
}
----

[IMPORTANT]
====
When using XA transactions, always create the `JMSContext` with `Session.SESSION_TRANSACTED` mode, not `JMSContext.AUTO_ACKNOWLEDGE`. The `AUTO_ACKNOWLEDGE` mode bypasses transaction control and messages will be acknowledged immediately.
====

==== Asynchronous Message Consumption (Advanced)

Using `MessageListener` with `setMessageListener()` for asynchronous message consumption requires special consideration:

[source,java]
----
// This pattern does NOT automatically participate in XA transactions!
@ApplicationScoped
public class MyListener implements MessageListener {

@Inject
public MyListener(ConnectionFactory connectionFactory) {
JMSContext context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
JMSConsumer consumer = context.createConsumer(context.createQueue("myQueue"));
consumer.setMessageListener(this);
}

@Override
public void onMessage(Message message) {
// This callback runs on a JMS provider thread
// There is NO automatic transaction boundary here
// Exceptions thrown here will NOT roll back XA transactions
throw new RuntimeException("Message not processed");
// With AUTO_ACKNOWLEDGE, the message is already acknowledged and will be lost
}
}
----

[WARNING]
====
The `MessageListener.onMessage()` callback is invoked on an internal JMS provider thread and **does not run within a transaction boundary** by default. When an exception is thrown:

* The message will be acknowledged (if using AUTO_ACKNOWLEDGE)
* No XA transaction rollback will occur
* The message will NOT be redelivered or sent to the Dead Letter Queue (DLQ)
* The message is effectively lost

This is a fundamental limitation of the JMS specification when using MessageListener directly.
====

If you need asynchronous message consumption with XA transaction support, consider these alternatives:

1. **Use Quarkus Messaging with Artemis JCA**: The `quarkus-artemis-jms-ra` extension provides proper Message-Driven Bean support with automatic transaction management around message listeners.

2. **Manual Transaction Management**: Manually manage transactions within the listener:
+
[source,java]
----
@ApplicationScoped
public class MyListener implements MessageListener {

@Inject
TransactionManager transactionManager;

@Override
public void onMessage(Message message) {
try {
transactionManager.begin();
// Process message
transactionManager.commit();
} catch (Exception e) {
try {
transactionManager.rollback();
} catch (Exception rbEx) {
// Handle rollback exception
}
// Note: Manual session management may still be required
// for proper message redelivery
}
}
}
----
+
[NOTE]
====
Even with manual transaction management, proper message redelivery requires careful session acknowledgement handling. For production use, consider using a JCA-based solution like `quarkus-artemis-jms-ra`.
====

3. **Polling Pattern**: Use a scheduled method to poll for messages synchronously within a transaction:
+
[source,java]
----
@ApplicationScoped
public class MessagePoller {

@Inject
ConnectionFactory connectionFactory;

@Scheduled(every = "1s")
@Transactional
public void pollMessages() {
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
JMSConsumer consumer = context.createConsumer(context.createQueue("myQueue"))) {

Message message = consumer.receiveNoWait();
if (message != null) {
// Process message
// Transaction will be rolled back on exception
}
}
}
}
----

== Custom ConnectionFactory
For those messaging drivers which do not have quarkus extension, such as `ibmmq-client`. You need to create a custom `ConnectionFactory` and wrap it by yourself. Here is an example:
[source,java]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package io.quarkiverse.messaginghub.pooled.jms.it;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.UUID;

import jakarta.inject.Inject;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Session;
import jakarta.transaction.Transactional;

import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

/**
* Test demonstrating the issue with MessageListener and XA transactions,
* and showing the correct approach for consuming messages with XA.
*/
@QuarkusTest
@TestProfile(JmsXATestProfile.class)
public class PooledXAMessageListenerTest {

@Inject
ConnectionFactory connectionFactory;

private static final String TEST_QUEUE = "xa-listener-test-queue";

/**
* Test demonstrating the correct way to consume messages with XA transactions:
* Use synchronous receive() within a @Transactional method.
* When an exception is thrown, the transaction is rolled back and the message is redelivered.
*/
@Test
public void testSynchronousConsumptionWithXA() throws Exception {
String messageBody = "test-message-" + UUID.randomUUID();

// Send a message
sendMessage(messageBody);

// Try to receive and process with exception - should rollback
boolean exceptionThrown = false;
try {
receiveAndProcessWithException();
} catch (RuntimeException e) {
exceptionThrown = true;
}

assertTrue(exceptionThrown, "Expected exception to be thrown");

// Message should still be available due to rollback
String receivedBody = receiveMessage();
assertEquals(messageBody, receivedBody, "Message should be redelivered after rollback");

// Successfully process the message
receiveAndProcess();

// Message should now be consumed
String noMessage = receiveMessage();
assertNull(noMessage, "Message should be consumed after successful processing");
}

/**
* Sends a message to the test queue within a transaction.
*/
@Transactional
void sendMessage(String body) {
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED)) {
context.createProducer().send(context.createQueue(TEST_QUEUE), body);
}
}

/**
* Receives and processes a message within a transaction.
* This is the CORRECT pattern for XA transaction support.
*/
@Transactional
void receiveAndProcess() throws JMSException {
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
JMSConsumer consumer = context.createConsumer(context.createQueue(TEST_QUEUE))) {

Message message = consumer.receive(5000L);
if (message != null) {
String body = message.getBody(String.class);
// Process message successfully
}
}
}

/**
* Receives a message and throws an exception within a transaction.
* The transaction will be rolled back and the message will be redelivered.
*/
@Transactional
void receiveAndProcessWithException() throws JMSException {
try (JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
JMSConsumer consumer = context.createConsumer(context.createQueue(TEST_QUEUE))) {

Message message = consumer.receive(5000L);
if (message != null) {
String body = message.getBody(String.class);
throw new RuntimeException("Simulated processing failure");
}
}
}

/**
* Receives a message without transaction (for verification).
*/
String receiveMessage() throws JMSException {
try (JMSContext context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);
JMSConsumer consumer = context.createConsumer(context.createQueue(TEST_QUEUE))) {

Message message = consumer.receive(1000L);
if (message != null) {
return message.getBody(String.class);
}
return null;
}
}
}