From 258b5db5faded59f80852bf1cbc28b57fc468c9f Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 30 Oct 2025 10:26:36 +0100 Subject: [PATCH 1/3] Shovel tests: test disk alarm (cherry picked from commit c0b118ac288fbec1f7e1cfd974d313fcb0e323d8) --- .../test/shovel_dynamic_SUITE.erl | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl index 26bc968e242..5b78b8603fc 100644 --- a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl @@ -108,7 +108,8 @@ tests() -> application_properties, delete_src_queue, shovel_status, - change_definition + change_definition, + disk_alarm ]. %% ------------------------------------------------------------------- @@ -594,6 +595,20 @@ change_definition(Config) -> amqp10_expect_empty(Sess, Dest2) end). +disk_alarm(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session(Config, + fun (Sess) -> + ShovelArgs = ?config(shovel_args, Config), + amqp10_publish(Sess, Src, <<"hello">>, 10), + rabbit_ct_broker_helpers:set_alarm(Config, 0, disk), + set_param(Config, ?PARAM, ShovelArgs), + amqp10_expect_empty(Sess, Dest), + rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk), + amqp10_expect_count(Sess, Dest, 10) + end). + %%---------------------------------------------------------------------------- maybe_skip_local_protocol(Config) -> [Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), From 31d60f88030ee1145628df30185107aeaea5b229 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 30 Oct 2025 10:26:53 +0100 Subject: [PATCH 2/3] Shovel AMQP10 bugfix: handle errors when alarms are set When an alarm is set in the destination cluster, the AMQP10 shovel receives a remote_incoming_window_exceeded error. It must handle it and retry the send later on, until the alarm is cleared (cherry picked from commit 822f8003dd8f6a9c44795967cc580218a1956fea) --- deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index f673251b6ab..c084a9b83ac 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -42,6 +42,7 @@ -include_lib("kernel/include/logger.hrl"). -define(LINK_CREDIT_TIMEOUT, 20_000). +-define(AWAIT_SEND_MSG_TIMEOUT, 1_000). -type state() :: rabbit_shovel_behaviour:state(). -type uri() :: rabbit_shovel_behaviour:uri(). @@ -374,7 +375,11 @@ send_msg(Link, Msg) -> send_msg(Link, Msg) after ?LINK_CREDIT_TIMEOUT -> {stop, credited_timeout} - end + end; + {error, remote_incoming_window_exceeded} -> + %% We could be blocked because of an alarm + timer:sleep(?AWAIT_SEND_MSG_TIMEOUT), + send_msg(Link, Msg) end. add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) -> From 143a2eae23f39c08c3987b34de6744d8fe0f98aa Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 30 Oct 2025 12:34:46 +0100 Subject: [PATCH 3/3] Shovel local bugfix: handle cluster alarms Local shovels must stop publishing when alarms are set in the destination cluster. Messages are stored in memory and sent when the alarm clears, the same way it is done for AMQP091 (cherry picked from commit f0381159e7339812639371e42ff5fa1df72deb0a) --- .../src/rabbit_local_shovel.erl | 81 +++++++++++++++++-- .../test/local_dynamic_SUITE.erl | 46 ++++++++++- 2 files changed, 120 insertions(+), 7 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 2689028ee18..265c3f871c8 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -24,6 +24,7 @@ -export([ boot_step/0, + conserve_resources/3, parse/2, connect_source/1, connect_dest/1, @@ -76,6 +77,12 @@ boot_step() -> rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}), rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}). +-spec conserve_resources(pid(), + rabbit_alarm:resource_alarm_source(), + rabbit_alarm:resource_alert()) -> ok. +conserve_resources(Pid, Source, {_, Conserve, _}) -> + gen_server:cast(Pid, {conserve_resources, Source, Conserve}). + parse(_Name, {source, Source}) -> Queue = parse_parameter(queue, fun parse_binary/1, proplists:get_value(queue, Source)), @@ -234,14 +241,17 @@ init_dest(#{name := Name, dest := #{add_forward_headers := AFH} = Dst} = State) -> rabbit_global_counters:publisher_created(?PROTOCOL), _TRef = erlang:send_after(1000, self(), send_confirms_and_nacks), + Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + Alarms = sets:from_list(Alarms0), case AFH of true -> Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(), <<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type), <<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)}, - State#{dest => Dst#{cached_forward_headers => Props}}; + State#{dest => Dst#{cached_forward_headers => Props, + alarms => Alarms}}; false -> - State + State#{dest => Dst#{alarms => Alarms}} end. source_uri(_State) -> @@ -359,6 +369,19 @@ handle_dest({{'DOWN', #resource{kind = queue, {eol, QState1, _QRef} -> State0#{dest => Dest#{current => Current#{queue_states => QState1}}} end; +handle_dest({conserve_resources, Alarm, Conserve}, #{dest := #{alarms := Alarms0} = Dest} = State0) -> + Alarms = case Conserve of + true -> sets:add_element(Alarm, Alarms0); + false -> sets:del_element(Alarm, Alarms0) + end, + State = State0#{dest => Dest#{alarms => Alarms}}, + case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of + {false, true} -> + %% All alarms cleared + forward_pending_delivery(State); + {_, _} -> + State + end; handle_dest(_Msg, State) -> State. @@ -374,7 +397,16 @@ forward(_, _, #{source := #{remaining_unacked := 0}} = State) -> %% come back. So drop subsequent messages on the floor to be %% requeued later State; -forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest, +forward(Tag, Msg, State) -> + case is_blocked(State) of + true -> + PendingEntry = {Tag, Msg}, + add_pending_delivery(PendingEntry, State); + false -> + do_forward(Tag, Msg, State) + end. + +do_forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest, ack_mode := AckMode} = State0) -> {Options, #{dest := #{current := Current1} = Dest1} = State} = case AckMode of @@ -437,10 +469,15 @@ add_routing(Msg0, Dest) -> RK -> mc:set_annotation(?ANN_ROUTING_KEYS, [RK], Msg) end. -status(_) -> - running. +status(State) -> + case is_blocked(State) of + true -> blocked; + false -> running + end. -pending_count(_State) -> +pending_count(#{dest := #{pending_delivery := Pending}}) -> + queue:len(Pending); +pending_count(_) -> 0. %% Internal @@ -903,3 +940,35 @@ messages_delivered(QName, S0) -> _ -> ok end. + +is_blocked(#{dest := #{alarms := Alarms}}) -> + not sets:is_empty(Alarms); +is_blocked(_) -> + false. + +add_pending_delivery(Elem, State = #{dest := Dest}) -> + Pending = maps:get(pending_delivery, Dest, queue:new()), + State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}. + +pop_pending_delivery(State = #{dest := Dest}) -> + Pending = maps:get(pending_delivery, Dest, queue:new()), + case queue:out(Pending) of + {empty, _} -> + empty; + {{value, Elem}, Pending2} -> + {Elem, State#{dest => Dest#{pending_delivery => Pending2}}} + end. + +forward_pending_delivery(State) -> + case pop_pending_delivery(State) of + empty -> + State; + {{Tag, Mc}, S} -> + S2 = do_forward(Tag, Mc, S), + case is_blocked(S2) of + true -> + S2; + false -> + forward_pending_delivery(S2) + end + end. diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index 5c1e5abd3a5..dfb7f744e9e 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -36,7 +36,8 @@ groups() -> local_to_local_delete_dest_queue, local_to_local_stream_credit_flow_no_ack, local_to_local_simple_uri, - local_to_local_counters + local_to_local_counters, + local_to_local_alarms ]} ]. @@ -247,6 +248,41 @@ local_to_local_counters(Config) -> get_global_counters(Config), 30_000) end). +local_to_local_alarms(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + ShovelArgs = [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-queue">>, Dest}], + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_publish(Sess, Src, <<"hello">>, 1000), + rabbit_ct_broker_helpers:set_alarm(Config, 0, disk), + rabbit_ct_broker_helpers:set_alarm(Config, 0, disk), + shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs), + ?awaitMatch({running, blocked}, get_blocked_status(Config), 30000), + amqp10_expect_empty(Sess, Dest), + rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk), + ?awaitMatch({running, running}, get_blocked_status(Config), 30000), + amqp10_expect_count(Sess, Dest, 1000), + + shovel_test_utils:clear_param(Config, ?PARAM), + + amqp10_publish(Sess, Src, <<"hello">>, 1000), + rabbit_ct_broker_helpers:set_alarm(Config, 0, disk), + rabbit_ct_broker_helpers:set_alarm(Config, 0, memory), + shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs), + ?awaitMatch({running, blocked}, get_blocked_status(Config), 30000), + amqp10_expect_empty(Sess, Dest), + rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk), + ?awaitMatch({running, blocked}, get_blocked_status(Config), 30000), + amqp10_expect_empty(Sess, Dest), + rabbit_ct_broker_helpers:clear_alarm(Config, 0, memory), + ?awaitMatch({running, running}, get_blocked_status(Config), 30000), + amqp10_expect_count(Sess, Dest, 1000) + end). %%---------------------------------------------------------------------------- declare_queue(Config, VHost, QName) -> declare_queue(Config, VHost, QName, []). @@ -303,3 +339,11 @@ get_global_counters(Config) -> get_global_counters0(Config, Key) -> Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []), maps:get(Key, Overview). + +get_blocked_status(Config) -> + case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []) of + [{_, _, {Status, PropList}, _, _}] -> + {Status, proplists:get_value(blocked_status, PropList)}; + _ -> + empty + end.