From f6ad5ff058e084bbdbe5253092acbb5b930963de Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 20 Oct 2025 08:46:31 +0200 Subject: [PATCH 1/4] Revert "Shovel bugfix: requeue rejected messages with AMQP1.0" This reverts commit 1471f23ee23198a057e507b7eafe3e724c4f3f6c. --- deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index b9a5f339f485..b365f35138a7 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -306,12 +306,12 @@ ack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> -spec nack(Tag :: tag(), Multi :: boolean(), state()) -> state(). nack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> % the tag is the same as the deliveryid - ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, released), + ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, rejected), State#{source => Src#{last_nacked_tag => Tag}}; nack(Tag, true, State = #{source := #{current := #{link := LinkRef}, last_nacked_tag := LastTag} = Src}) -> First = LastTag + 1, - ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, released), + ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, rejected), State#{source => Src#{last_nacked_tag => Tag}}. status(#{dest := #{current := #{link_state := attached}}}) -> From f1a7020ad1f9e6f0ac7f44ac96bd4c22ff96e1b7 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 20 Oct 2025 09:14:17 +0200 Subject: [PATCH 2/4] Shovels: skip tests cases until rejection behaviour is unified --- deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl index dab7271025e0..172ad027b8ab 100644 --- a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl @@ -71,8 +71,12 @@ tests() -> autodelete_quorum_on_confirm_no_transfer, autodelete_classic_on_publish_no_transfer, autodelete_quorum_on_publish_no_transfer, - autodelete_classic_on_confirm_with_rejections, - autodelete_quorum_on_confirm_with_rejections, + %% AMQP091 and local shovels requeue messages on reject + %% AMQP10 discards messages on reject + %% These two tests will remain commented out until the + %% behaviour is unified. + %% autodelete_classic_on_confirm_with_rejections, + %% autodelete_quorum_on_confirm_with_rejections, autodelete_classic_on_publish_with_rejections, autodelete_quorum_on_publish_with_rejections ]. From 0c4ec95f3b8863e230fc903bb16017dcf6787c32 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 20 Oct 2025 14:08:20 +0200 Subject: [PATCH 3/4] Shovel bugfix: use 'last_acked_tag' to keep track of acked/nacked messages --- deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index b365f35138a7..f673251b6ab9 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -307,12 +307,12 @@ ack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> nack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> % the tag is the same as the deliveryid ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, rejected), - State#{source => Src#{last_nacked_tag => Tag}}; + State#{source => Src#{last_acked_tag => Tag}}; nack(Tag, true, State = #{source := #{current := #{link := LinkRef}, - last_nacked_tag := LastTag} = Src}) -> + last_acked_tag := LastTag} = Src}) -> First = LastTag + 1, ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, rejected), - State#{source => Src#{last_nacked_tag => Tag}}. + State#{source => Src#{last_acked_tag => Tag}}. status(#{dest := #{current := #{link_state := attached}}}) -> flow; From 9908b252a0abe0d525aeac2cf161c49fc105271e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Mon, 20 Oct 2025 14:02:56 +0200 Subject: [PATCH 4/4] amqp10_client: Accept the `{shutdown, _}` exit reason is `close_sent/3` [Why] The clause above the new one in `close_sent/3` was already handling the fact that the remote process could exit during the close "handshake". But the remote process could terminate with `{shutdown, Reason}` in addition to `shutdown`, where `Reason` is `delete` or `autodelete` for instance. [How] We handle `{shutdown, _}` exactly as `shutdown`. This was detected by frequent failures in CI. --- deps/amqp10_client/src/amqp10_client_connection.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 0ca030fafa20..1c57a2c73239 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -352,6 +352,9 @@ close_sent(_EvtType, heartbeat, _Data) -> close_sent(_EvtType, {'EXIT', _Pid, shutdown}, _Data) -> %% monitored processes may exit during closure keep_state_and_data; +close_sent(_EvtType, {'EXIT', _Pid, {shutdown, _}}, _Data) -> + %% monitored processes may exit during closure + keep_state_and_data; close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _Reason}, #state{reader = ReaderPid}) -> %% if the reader exits we probably won't receive a close frame