From e0f559084d9ca32108bbd23644c935b82a067a09 Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Wed, 6 May 2026 16:01:59 +0530 Subject: [PATCH 1/2] NIFI-15483: Fixed PublishAMQP routing FlowFiles to success when broker cannot deliver message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PublishAMQP uses mandatory=true on basicPublish() so the broker returns messages it cannot route to any queue. However, the return arrives asynchronously via ReturnListener.handleReturn() on the AMQP I/O thread while the publishing thread had already moved on to session.transfer(REL_SUCCESS). The UndeliverableMessageLogger only logged a warning — it never signaled failure back to publish() or onTrigger(), so every unroutable message was silently counted as a success despite never reaching any consumer. Fix: - Enabled Publisher Confirms (channel.confirmSelect()) in the constructor. The broker's basic.return frame for an unroutable message is guaranteed to arrive before the corresponding confirm frame, so waitForConfirms() acts as a synchronization barrier that makes return detection reliable. - Added an AtomicReference field (undeliverableReturnReason) that UndeliverableMessageLogger.handleReturn() populates with exchange/routingKey/ replyCode/replyText when a message is returned. - publish() now: resets the field before each call, calls waitForConfirms(5s) to synchronize with the broker, then checks the field and throws AMQPException if the message was returned — causing onTrigger() to route to REL_FAILURE. - Broker NACKs (e.g., resource alarm) are also now surfaced as AMQPException because waitForConfirms() returns false on NACK. - Added regression tests to verify that AMQPPublisher and PublishAMQP correctly route FlowFiles to REL_FAILURE for all broker-side failure modes: - Added ShutdownSignalException to the catch block in AMQPPublisher.publish() - Converts the channel-close signal into AMQPException so PublishAMQP routes the FlowFile to REL_FAILURE with a descriptive error message - Added ShutdownSignalException import Co-authored-by: Rakesh Kumar Singh --- .../nifi/amqp/processors/AMQPPublisher.java | 84 ++++++++++++++++--- .../amqp/processors/AMQPPublisherTest.java | 57 ++++++++++++- .../nifi/amqp/processors/PublishAMQPTest.java | 40 +++++++++ .../nifi/amqp/processors/TestChannel.java | 42 ++++++++-- .../nifi/amqp/processors/TestConnection.java | 4 + 5 files changed, 208 insertions(+), 19 deletions(-) diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java index 27a64bbb1ffb..47ee1c49e358 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java @@ -20,10 +20,13 @@ import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ReturnListener; +import com.rabbitmq.client.ShutdownSignalException; import org.apache.nifi.logging.ComponentLog; import java.io.IOException; import java.net.SocketException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** * Generic publisher of messages to AMQP-based messaging system. It is based on @@ -33,6 +36,14 @@ final class AMQPPublisher extends AMQPWorker { private final String connectionString; + /** + * Stores the broker's return reason when a message is published with mandatory=true + * but the broker cannot route it to any queue. Written by the AMQP I/O thread via + * {@link UndeliverableMessageLogger} and read by the publishing thread after + * {@link com.rabbitmq.client.Channel#waitForConfirms} synchronizes the two. + */ + private final AtomicReference undeliverableReturnReason = new AtomicReference<>(null); + /** * Creates an instance of this publisher * @@ -43,6 +54,17 @@ final class AMQPPublisher extends AMQPWorker { getChannel().addReturnListener(new UndeliverableMessageLogger()); this.connectionString = connection.toString(); + // Enable Publisher Confirms on this channel so that waitForConfirms() can be used + // after basicPublish() to create a synchronization point. This ensures that any + // basic.return frame sent by the broker (for mandatory messages it cannot route) + // will have been processed by the ReturnListener before waitForConfirms() returns, + // allowing undeliverable messages to be reliably detected and routed to REL_FAILURE. + try { + getChannel().confirmSelect(); + } catch (final IOException e) { + throw new AMQPException("Failed to enable Publisher Confirms on AMQP channel", e); + } + processorLog.info("Successfully connected AMQPPublisher to {}", this.connectionString); } @@ -68,6 +90,9 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String processorLog.debug("Successfully connected AMQPPublisher to {} and '{}' exchange with '{}' as a routing key.", this.connectionString, exchange, routingKey); } + // Reset any stale return reason from a previous publish before sending. + undeliverableReturnReason.set(null); + try { getChannel().basicPublish(exchange, routingKey, true, properties, bytes); } catch (AlreadyClosedException | SocketException e) { @@ -75,6 +100,40 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String } catch (Exception e) { throw new AMQPException("Failed to publish message to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e); } + + // Wait for the broker's publish confirm (ack/nack). Because the broker sends a basic.return + // frame BEFORE the corresponding confirm frame for mandatory messages it cannot route, + // UndeliverableMessageLogger.handleReturn() is guaranteed to have run by the time + // waitForConfirms() returns. This makes undeliverable-message detection reliable. + // + // If the exchange does not exist, the broker closes the channel with a 404 NOT_FOUND + // channel.close frame, which causes waitForConfirms() to throw ShutdownSignalException. + // We catch it here and convert it to AMQPException so the FlowFile routes to REL_FAILURE + // instead of surfacing as an unhandled processor error. + try { + if (!getChannel().waitForConfirms(5_000L)) { + throw new AMQPException("Broker negatively acknowledged (NACK) message published to Exchange '" + + exchange + "' with Routing Key '" + routingKey + "'"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AMQPException("Interrupted while waiting for publish confirmation from broker", e); + } catch (TimeoutException e) { + throw new AMQPException("Timed out waiting for publish confirmation from broker for Exchange '" + + exchange + "' with Routing Key '" + routingKey + "'", e); + } catch (ShutdownSignalException e) { + // Broker closed the channel — most commonly because the exchange does not exist (404) + // or the vhost was deleted. Convert to AMQPException so PublishAMQP routes to REL_FAILURE. + throw new AMQPException("Broker closed channel while waiting for publish confirmation — " + + "Exchange '" + exchange + "' may not exist: " + e.getMessage(), e); + } + + // If the broker returned the message (e.g., no queue bound to the exchange/routing-key), + // surface it as a hard failure so the caller can route to REL_FAILURE instead of REL_SUCCESS. + final String returnReason = undeliverableReturnReason.get(); + if (returnReason != null) { + throw new AMQPException("Message returned as undeliverable by broker — " + returnReason); + } } @Override @@ -83,23 +142,22 @@ public String toString() { } /** - * Listener to listen and WARN-log undeliverable messages which are returned - * back to the sender. Since in the current implementation messages are sent - * with 'mandatory' bit set, such messages must have final destination - * otherwise they are silently dropped which could cause a confusion - * especially during early stages of flow development. This implies that - * bindings between exchange -> routingKey -> queue must exist and are - * typically done by AMQP administrator. This logger simply helps to monitor - * for such conditions by logging such messages as warning. In the future - * this can be extended to provide other type of functionality (e.g., fail - * processor etc.) + * Listens for messages returned by the broker when they cannot be routed to any queue + * (mandatory=true publish with no matching binding). Previously this listener only logged + * a warning, causing PublishAMQP to silently route the FlowFile to REL_SUCCESS even though + * the message was never delivered. (NIFI-15483) + * + * Now it stores the return reason in {@link #undeliverableReturnReason} so that + * {@link #publish} can detect it after {@code waitForConfirms()} synchronizes the two + * threads and throw an {@link AMQPException} to trigger REL_FAILURE routing. */ private final class UndeliverableMessageLogger implements ReturnListener { @Override public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException { - String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey - + "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + "."; - processorLog.warn(logMessage); + final String reason = "exchange='" + exchangeName + "' routingKey='" + routingKey + + "' replyCode=" + replyCode + " replyText='" + replyText + "'"; + undeliverableReturnReason.set(reason); + processorLog.warn("Message returned as undeliverable by broker: {}", reason); } } } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java index 84d7c5528f58..b8c92afe8dca 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java @@ -37,7 +37,6 @@ public class AMQPPublisherTest { - @SuppressWarnings("resource") @Test public void failOnNullConnection() { assertThrows(IllegalArgumentException.class, () -> new AMQPPublisher(null, null)); @@ -105,4 +104,60 @@ public void validateSuccessfulPublishingAndUndeliverableRoutingKey() throws Exce connection.close(); } + /** + * Verifies that a {@link com.rabbitmq.client.ShutdownSignalException} thrown by + * {@code waitForConfirms()} (e.g., broker closes channel with 404 NOT_FOUND because the + * exchange does not exist) is converted to {@link AMQPException} so the FlowFile routes + * to REL_FAILURE instead of surfacing as an unhandled processor error. + */ + @Test + public void failPublishWhenBrokerClosesChannelDuringConfirm() { + assertThrows(AMQPException.class, () -> { + TestConnection conn = new TestConnection(null, null); + conn.getTestChannel().setSimulateShutdownOnConfirm(true); + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), null, "foo", ""); + } + }); + } + + /** + * Verifies that a broker NACK (waitForConfirms returns false) throws {@link AMQPException} + * so the FlowFile routes to REL_FAILURE. + */ + @Test + public void failPublishWhenBrokerNacksMessage() { + assertThrows(AMQPException.class, () -> { + TestConnection conn = new TestConnection(null, null); + conn.getTestChannel().setSimulateNackOnConfirm(true); + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), null, "foo", ""); + } + }); + } + + /** + * Verifies that when the broker returns a message as undeliverable (basic.return, e.g., no + * queue bound to the exchange/routing-key), an {@link AMQPException} is thrown so the FlowFile + * routes to REL_FAILURE rather than silently to REL_SUCCESS. + */ + @Test + public void failPublishWhenMessageReturnedAsUndeliverable() { + assertThrows(AMQPException.class, () -> { + Map> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1")); + Map exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + + TestConnection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); + // Fire return listener synchronously so it is guaranteed to run before waitForConfirms() + conn.getTestChannel().setSimulateSynchronousReturn(true); + + try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""))) { + // Wrong routing key → broker returns the message as undeliverable + sender.publish("hello".getBytes(), null, "wrongKey", "myExchange"); + } + }); + } + } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index e2a2f0697aed..ce933ae25234 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -278,5 +278,45 @@ protected Connection createConnection(ProcessContext context, ExecutorService ex public Connection getConnection() { return connection; } + + public TestChannel getTestChannel() { + return connection.getTestChannel(); + } + } + + /** + * When the broker closes the channel with a 404 (exchange not found), the FlowFile + * must route to REL_FAILURE — not cause an unhandled processor exception. + */ + @Test + public void validateFlowFileRoutedToFailureWhenBrokerClosesChannel() { + final LocalPublishAMQP proc = new LocalPublishAMQP(); + final TestRunner testRunner = TestRunners.newTestRunner(proc); + setConnectionProperties(testRunner); + proc.getTestChannel().setSimulateShutdownOnConfirm(true); + + testRunner.enqueue("Hello Joe".getBytes()); + testRunner.run(); + + assertTrue(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty()); + assertNotNull(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst()); + } + + /** + * When the broker sends a NACK for the published message, the FlowFile must route + * to REL_FAILURE. + */ + @Test + public void validateFlowFileRoutedToFailureOnBrokerNack() { + final LocalPublishAMQP proc = new LocalPublishAMQP(); + final TestRunner testRunner = TestRunners.newTestRunner(proc); + setConnectionProperties(testRunner); + proc.getTestChannel().setSimulateNackOnConfirm(true); + + testRunner.enqueue("Hello Joe".getBytes()); + testRunner.run(); + + assertTrue(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty()); + assertNotNull(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst()); } } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java index ada7f18958fa..20c1ccb77745 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java @@ -78,6 +78,9 @@ class TestChannel implements Channel { private final BitSet acknowledgments = new BitSet(); private final BitSet nacks = new BitSet(); private int prefetchCount = 0; + private boolean simulateShutdownOnConfirm = false; + private boolean simulateNackOnConfirm = false; + private boolean simulateSynchronousReturn = false; public TestChannel(Map exchangeToRoutingKeyMappings, Map> routingKeyToQueueMappings) { @@ -100,6 +103,24 @@ void corruptChannel() { this.corrupted = true; } + /** Causes the next {@link #waitForConfirms(long)} call to throw {@link ShutdownSignalException}, + * simulating the broker closing the channel (e.g., exchange not found, 404 NOT_FOUND). */ + void setSimulateShutdownOnConfirm(boolean simulate) { + this.simulateShutdownOnConfirm = simulate; + } + + /** Causes the next {@link #waitForConfirms(long)} call to return {@code false}, + * simulating the broker sending a NACK for the published message. */ + void setSimulateNackOnConfirm(boolean simulate) { + this.simulateNackOnConfirm = simulate; + } + + /** When {@code true}, return listeners are invoked synchronously inside + * {@link #basicPublish} rather than asynchronously, making tests deterministic. */ + void setSimulateSynchronousReturn(boolean simulate) { + this.simulateSynchronousReturn = simulate; + } + void setConnection(Connection connection) { this.connection = connection; } @@ -283,15 +304,23 @@ public void basicPublish(final String exchange, final String routingKey, boolean private void discard(final String exchange, final String routingKey, boolean mandatory, final BasicProperties props, final byte[] body) { - // NO ROUTE. Invoke return listener async + // NO ROUTE. Invoke return listener — synchronously when simulating for tests, async otherwise. for (final ReturnListener listener : returnListeners) { - this.executorService.execute(() -> { + if (simulateSynchronousReturn) { try { listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body); } catch (Exception e) { throw new IllegalStateException("Failed to send return message", e); } - }); + } else { + this.executorService.execute(() -> { + try { + listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body); + } catch (Exception e) { + throw new IllegalStateException("Failed to send return message", e); + } + }); + } } } @@ -582,7 +611,7 @@ public RollbackOk txRollback() throws IOException { @Override public com.rabbitmq.client.AMQP.Confirm.SelectOk confirmSelect() throws IOException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + return null; // no-op: publisher confirms enabled for testing } @Override @@ -597,7 +626,10 @@ public boolean waitForConfirms() throws InterruptedException { @Override public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + if (simulateShutdownOnConfirm) { + throw new ShutdownSignalException(false, false, null, this); + } + return !simulateNackOnConfirm; } @Override diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java index 996c00dd8ceb..10adb50438a3 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java @@ -122,6 +122,10 @@ public Channel createChannel() throws IOException { return this.channel; } + public TestChannel getTestChannel() { + return this.channel; + } + @Override public Channel createChannel(int channelNumber) throws IOException { throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); From 4c18f20b235fd5192e1636f12bfd4669044cd3cc Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Thu, 21 May 2026 21:15:34 +0530 Subject: [PATCH 2/2] NIFI-15483: Added Delivery Guarantee property to make Publisher Confirms opt-in Added a new Delivery Guarantee property to PublishAMQP with two options: At most once (default): works like the original - sends the message without waiting for a broker reply. If the message cannot be delivered, only a warning is logged and the FlowFile routes to success. Best for high throughput. At least once: turns on RabbitMQ Publisher Confirms. The processor waits for the broker to confirm the message before routing. If the message is returned or the broker sends a NACK, the FlowFile routes to failure instead of success. This prevents silent data loss but can be much slower, especially with remote brokers. --- .../nifi/amqp/processors/AMQPPublisher.java | 91 +++++++++---------- .../nifi/amqp/processors/PublishAMQP.java | 53 ++++++++++- .../amqp/processors/AMQPPublisherTest.java | 44 +++++---- .../nifi/amqp/processors/ConsumeAMQPTest.java | 20 ++-- .../nifi/amqp/processors/PublishAMQPTest.java | 6 +- 5 files changed, 134 insertions(+), 80 deletions(-) diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java index 47ee1c49e358..7044dad62390 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java @@ -35,12 +35,14 @@ final class AMQPPublisher extends AMQPWorker { private final String connectionString; + private final boolean useConfirms; /** * Stores the broker's return reason when a message is published with mandatory=true * but the broker cannot route it to any queue. Written by the AMQP I/O thread via * {@link UndeliverableMessageLogger} and read by the publishing thread after * {@link com.rabbitmq.client.Channel#waitForConfirms} synchronizes the two. + * Only populated when {@link #useConfirms} is true. */ private final AtomicReference undeliverableReturnReason = new AtomicReference<>(null); @@ -48,21 +50,23 @@ final class AMQPPublisher extends AMQPWorker { * Creates an instance of this publisher * * @param connection instance of AMQP {@link Connection} + * @param useConfirms when true, enables RabbitMQ Publisher Confirms so that + * {@link #publish} waits for a broker ack/nack and reliably + * detects undeliverable messages; when false, the original + * fire-and-forget behaviour is used for maximum throughput */ - AMQPPublisher(Connection connection, ComponentLog processorLog) { + AMQPPublisher(Connection connection, ComponentLog processorLog, boolean useConfirms) { super(connection, processorLog); + this.useConfirms = useConfirms; getChannel().addReturnListener(new UndeliverableMessageLogger()); this.connectionString = connection.toString(); - // Enable Publisher Confirms on this channel so that waitForConfirms() can be used - // after basicPublish() to create a synchronization point. This ensures that any - // basic.return frame sent by the broker (for mandatory messages it cannot route) - // will have been processed by the ReturnListener before waitForConfirms() returns, - // allowing undeliverable messages to be reliably detected and routed to REL_FAILURE. - try { - getChannel().confirmSelect(); - } catch (final IOException e) { - throw new AMQPException("Failed to enable Publisher Confirms on AMQP channel", e); + if (useConfirms) { + try { + getChannel().confirmSelect(); + } catch (final IOException e) { + throw new AMQPException("Failed to enable Publisher Confirms on AMQP channel", e); + } } processorLog.info("Successfully connected AMQPPublisher to {}", this.connectionString); @@ -101,38 +105,31 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String throw new AMQPException("Failed to publish message to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e); } - // Wait for the broker's publish confirm (ack/nack). Because the broker sends a basic.return - // frame BEFORE the corresponding confirm frame for mandatory messages it cannot route, - // UndeliverableMessageLogger.handleReturn() is guaranteed to have run by the time - // waitForConfirms() returns. This makes undeliverable-message detection reliable. - // - // If the exchange does not exist, the broker closes the channel with a 404 NOT_FOUND - // channel.close frame, which causes waitForConfirms() to throw ShutdownSignalException. - // We catch it here and convert it to AMQPException so the FlowFile routes to REL_FAILURE - // instead of surfacing as an unhandled processor error. - try { - if (!getChannel().waitForConfirms(5_000L)) { - throw new AMQPException("Broker negatively acknowledged (NACK) message published to Exchange '" - + exchange + "' with Routing Key '" + routingKey + "'"); + if (useConfirms) { + // Wait for the broker's publish confirm (ack/nack). Because the broker sends a basic.return + // frame BEFORE the corresponding confirm frame for mandatory messages it cannot route, + // UndeliverableMessageLogger.handleReturn() is guaranteed to have run by the time + // waitForConfirms() returns. This makes undeliverable-message detection reliable. + try { + if (!getChannel().waitForConfirms(5_000L)) { + throw new AMQPException("Broker negatively acknowledged (NACK) message published to Exchange '" + + exchange + "' with Routing Key '" + routingKey + "'"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AMQPException("Interrupted while waiting for publish confirmation from broker", e); + } catch (TimeoutException e) { + throw new AMQPException("Timed out waiting for publish confirmation from broker for Exchange '" + + exchange + "' with Routing Key '" + routingKey + "'", e); + } catch (ShutdownSignalException e) { + throw new AMQPException("Broker closed channel while waiting for publish confirmation — " + + "Exchange '" + exchange + "' may not exist: " + e.getMessage(), e); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new AMQPException("Interrupted while waiting for publish confirmation from broker", e); - } catch (TimeoutException e) { - throw new AMQPException("Timed out waiting for publish confirmation from broker for Exchange '" - + exchange + "' with Routing Key '" + routingKey + "'", e); - } catch (ShutdownSignalException e) { - // Broker closed the channel — most commonly because the exchange does not exist (404) - // or the vhost was deleted. Convert to AMQPException so PublishAMQP routes to REL_FAILURE. - throw new AMQPException("Broker closed channel while waiting for publish confirmation — " - + "Exchange '" + exchange + "' may not exist: " + e.getMessage(), e); - } - // If the broker returned the message (e.g., no queue bound to the exchange/routing-key), - // surface it as a hard failure so the caller can route to REL_FAILURE instead of REL_SUCCESS. - final String returnReason = undeliverableReturnReason.get(); - if (returnReason != null) { - throw new AMQPException("Message returned as undeliverable by broker — " + returnReason); + final String returnReason = undeliverableReturnReason.get(); + if (returnReason != null) { + throw new AMQPException("Message returned as undeliverable by broker — " + returnReason); + } } } @@ -143,13 +140,15 @@ public String toString() { /** * Listens for messages returned by the broker when they cannot be routed to any queue - * (mandatory=true publish with no matching binding). Previously this listener only logged - * a warning, causing PublishAMQP to silently route the FlowFile to REL_SUCCESS even though - * the message was never delivered. (NIFI-15483) + * (mandatory=true publish with no matching binding). + * + * In {@link PublishAMQP.DeliveryGuarantee#AT_MOST_ONCE} mode (the default), this listener + * only logs a warning — matching the original behaviour. * - * Now it stores the return reason in {@link #undeliverableReturnReason} so that - * {@link #publish} can detect it after {@code waitForConfirms()} synchronizes the two - * threads and throw an {@link AMQPException} to trigger REL_FAILURE routing. + * In {@link PublishAMQP.DeliveryGuarantee#AT_LEAST_ONCE} mode, the return reason is also + * stored in {@link #undeliverableReturnReason} so that {@link #publish} can detect it after + * {@code waitForConfirms()} synchronizes the two threads and throw an {@link AMQPException} + * to trigger REL_FAILURE routing. */ private final class UndeliverableMessageLogger implements ReturnListener { @Override diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java index 97bc07a6f710..20ffe02f1744 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java @@ -95,6 +95,19 @@ public class PublishAMQP extends AbstractAMQPProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name("Delivery Guarantee") + .description("Controls whether the processor waits for a publish confirmation (broker ack/nack) before routing the FlowFile. " + + "\"At least once\" enables RabbitMQ Publisher Confirms: the processor blocks until the broker acknowledges the message, " + + "and undeliverable messages (no matching queue binding) are reliably routed to 'failure'. " + + "This prevents silent data loss at the cost of significantly higher latency, especially with remote brokers. " + + "\"At most once\" uses the original fire-and-forget mode: the message is sent without waiting for confirmation. " + + "Undeliverable messages are only logged as a warning and the FlowFile is still routed to 'success'. " + + "This mode offers maximum throughput but provides no delivery guarantee.") + .required(true) + .allowableValues(DeliveryGuarantee.class) + .defaultValue(DeliveryGuarantee.AT_MOST_ONCE) + .build(); public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder() .name("Headers Source") .description("The source of the headers which will be applied to the published message.") @@ -136,6 +149,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { Stream.of( EXCHANGE, ROUTING_KEY, + DELIVERY_GUARANTEE, HEADERS_SOURCE, HEADERS_PATTERN, HEADER_SEPARATOR @@ -207,7 +221,8 @@ public Set getRelationships() { @Override protected AMQPPublisher createAMQPWorker(final ProcessContext context, final Connection connection) { - return new AMQPPublisher(connection, getLogger()); + final boolean useConfirms = DeliveryGuarantee.AT_LEAST_ONCE == context.getProperty(DELIVERY_GUARANTEE).asAllowableValue(DeliveryGuarantee.class); + return new AMQPPublisher(connection, getLogger(), useConfirms); } @Override @@ -346,6 +361,42 @@ protected Character getHeaderSeparator(ProcessContext context, InputHeaderSource }; } + public enum DeliveryGuarantee implements DescribedValue { + + AT_MOST_ONCE("At most once", + "Fire-and-forget: message is sent without waiting for a broker acknowledgement. " + + "Undeliverable messages (no matching queue binding) are logged as a warning and " + + "the FlowFile is routed to 'success'. Offers maximum throughput."), + AT_LEAST_ONCE("At least once", + "Publisher Confirms are enabled: the processor blocks until the broker acknowledges " + + "the message (ack or nack). Undeliverable messages are reliably detected and routed " + + "to 'failure'. Prevents silent data loss at the cost of higher latency, particularly " + + "with remote brokers."); + + private final String displayName; + private final String description; + + DeliveryGuarantee(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + } + public enum InputHeaderSource implements DescribedValue { FLOWFILE_ATTRIBUTES("FlowFile Attributes", "Select FlowFile Attributes based on regular expression pattern for event headers. Key of the matching attribute will be used as header key"), diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java index b8c92afe8dca..b0d14b9734da 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java @@ -39,14 +39,14 @@ public class AMQPPublisherTest { @Test public void failOnNullConnection() { - assertThrows(IllegalArgumentException.class, () -> new AMQPPublisher(null, null)); + assertThrows(IllegalArgumentException.class, () -> new AMQPPublisher(null, null, false)); } @Test public void failPublishIfChannelClosed() { assertThrows(AMQPRollbackException.class, () -> { Connection conn = new TestConnection(null, null); - try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), false)) { conn.close(); sender.publish("oleg".getBytes(), null, "foo", ""); } @@ -57,7 +57,7 @@ public void failPublishIfChannelClosed() { public void failPublishIfChannelFails() { assertThrows(AMQPException.class, () -> { TestConnection conn = new TestConnection(null, null); - try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), false)) { ((TestChannel) conn.createChannel()).corruptChannel(); sender.publish("oleg".getBytes(), null, "foo", ""); } @@ -73,7 +73,7 @@ public void validateSuccessfulPublishingAndRouting() throws Exception { Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), null, "key1", "myExchange"); } @@ -95,7 +95,7 @@ public void validateSuccessfulPublishingAndUndeliverableRoutingKey() throws Exce ReturnListener retListener = mock(ReturnListener.class); connection.createChannel().addReturnListener(retListener); - try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""), false)) { sender.publish("hello".getBytes(), null, "key1", "myExchange"); } @@ -115,32 +115,23 @@ public void failPublishWhenBrokerClosesChannelDuringConfirm() { assertThrows(AMQPException.class, () -> { TestConnection conn = new TestConnection(null, null); conn.getTestChannel().setSimulateShutdownOnConfirm(true); - try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), true)) { sender.publish("hello".getBytes(), null, "foo", ""); } }); } - /** - * Verifies that a broker NACK (waitForConfirms returns false) throws {@link AMQPException} - * so the FlowFile routes to REL_FAILURE. - */ @Test public void failPublishWhenBrokerNacksMessage() { assertThrows(AMQPException.class, () -> { TestConnection conn = new TestConnection(null, null); conn.getTestChannel().setSimulateNackOnConfirm(true); - try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), true)) { sender.publish("hello".getBytes(), null, "foo", ""); } }); } - /** - * Verifies that when the broker returns a message as undeliverable (basic.return, e.g., no - * queue bound to the exchange/routing-key), an {@link AMQPException} is thrown so the FlowFile - * routes to REL_FAILURE rather than silently to REL_SUCCESS. - */ @Test public void failPublishWhenMessageReturnedAsUndeliverable() { assertThrows(AMQPException.class, () -> { @@ -150,14 +141,29 @@ public void failPublishWhenMessageReturnedAsUndeliverable() { exchangeToRoutingKeymap.put("myExchange", "key1"); TestConnection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); - // Fire return listener synchronously so it is guaranteed to run before waitForConfirms() conn.getTestChannel().setSimulateSynchronousReturn(true); - try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""))) { - // Wrong routing key → broker returns the message as undeliverable + try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""), true)) { sender.publish("hello".getBytes(), null, "wrongKey", "myExchange"); } }); } + @Test + public void succeedsPublishWhenMessageUndeliverableInAtMostOnceMode() throws Exception { + Map> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1")); + Map exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + + TestConnection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); + conn.getTestChannel().setSimulateSynchronousReturn(true); + + try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""), false)) { + // In AT_MOST_ONCE mode, undeliverable messages only produce a warning — no exception + sender.publish("hello".getBytes(), null, "wrongKey", "myExchange"); + } + conn.close(); + } + } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 6da28a4dbff7..1081c517ccdd 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -56,7 +56,7 @@ public void testMessageAcked() throws TimeoutException, IOException { final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); @@ -87,7 +87,7 @@ public void testBatchSizeAffectsAcks() throws TimeoutException, IOException { final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); @@ -118,7 +118,7 @@ public void testConsumerStopped() throws TimeoutException, IOException { final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); sender.publish("good-bye".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); @@ -153,7 +153,7 @@ public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -184,7 +184,7 @@ public void validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransf final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -219,7 +219,7 @@ public void validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAn final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -251,7 +251,7 @@ public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransfer final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -288,7 +288,7 @@ public void validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSucc final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -318,7 +318,7 @@ public void validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParamet final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -352,7 +352,7 @@ public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() throws E final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index ce933ae25234..3d6d90a7c808 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -293,6 +293,7 @@ public void validateFlowFileRoutedToFailureWhenBrokerClosesChannel() { final LocalPublishAMQP proc = new LocalPublishAMQP(); final TestRunner testRunner = TestRunners.newTestRunner(proc); setConnectionProperties(testRunner); + testRunner.setProperty(PublishAMQP.DELIVERY_GUARANTEE, PublishAMQP.DeliveryGuarantee.AT_LEAST_ONCE); proc.getTestChannel().setSimulateShutdownOnConfirm(true); testRunner.enqueue("Hello Joe".getBytes()); @@ -302,15 +303,12 @@ public void validateFlowFileRoutedToFailureWhenBrokerClosesChannel() { assertNotNull(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst()); } - /** - * When the broker sends a NACK for the published message, the FlowFile must route - * to REL_FAILURE. - */ @Test public void validateFlowFileRoutedToFailureOnBrokerNack() { final LocalPublishAMQP proc = new LocalPublishAMQP(); final TestRunner testRunner = TestRunners.newTestRunner(proc); setConnectionProperties(testRunner); + testRunner.setProperty(PublishAMQP.DELIVERY_GUARANTEE, PublishAMQP.DeliveryGuarantee.AT_LEAST_ONCE); proc.getTestChannel().setSimulateNackOnConfirm(true); testRunner.enqueue("Hello Joe".getBytes());