Skip to content

Commit 497d74d

Browse files
Merge pull request #14873 from rabbitmq/shovel10-alarms
Shovel bugfix: handle cluster alarms in AMQP10 and local shovels
2 parents e49ed93 + f038115 commit 497d74d

File tree

4 files changed

+142
-9
lines changed

4 files changed

+142
-9
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
-include_lib("kernel/include/logger.hrl").
4343

4444
-define(LINK_CREDIT_TIMEOUT, 20_000).
45+
-define(AWAIT_SEND_MSG_TIMEOUT, 1_000).
4546

4647
-type state() :: rabbit_shovel_behaviour:state().
4748
-type uri() :: rabbit_shovel_behaviour:uri().
@@ -374,7 +375,11 @@ send_msg(Link, Msg) ->
374375
send_msg(Link, Msg)
375376
after ?LINK_CREDIT_TIMEOUT ->
376377
{stop, credited_timeout}
377-
end
378+
end;
379+
{error, remote_incoming_window_exceeded} ->
380+
%% We could be blocked because of an alarm
381+
timer:sleep(?AWAIT_SEND_MSG_TIMEOUT),
382+
send_msg(Link, Msg)
378383
end.
379384

380385
add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
-export([
2626
boot_step/0,
27+
conserve_resources/3,
2728
parse/2,
2829
connect_source/1,
2930
connect_dest/1,
@@ -76,6 +77,12 @@ boot_step() ->
7677
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
7778
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).
7879

80+
-spec conserve_resources(pid(),
81+
rabbit_alarm:resource_alarm_source(),
82+
rabbit_alarm:resource_alert()) -> ok.
83+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
84+
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).
85+
7986
parse(_Name, {source, Source}) ->
8087
Queue = parse_parameter(queue, fun parse_binary/1,
8188
proplists:get_value(queue, Source)),
@@ -222,14 +229,17 @@ init_dest(#{name := Name,
222229
dest := #{add_forward_headers := AFH} = Dst} = State) ->
223230
rabbit_global_counters:publisher_created(?PROTOCOL),
224231
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
232+
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
233+
Alarms = sets:from_list(Alarms0),
225234
case AFH of
226235
true ->
227236
Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(),
228237
<<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type),
229238
<<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)},
230-
State#{dest => Dst#{cached_forward_headers => Props}};
239+
State#{dest => Dst#{cached_forward_headers => Props,
240+
alarms => Alarms}};
231241
false ->
232-
State
242+
State#{dest => Dst#{alarms => Alarms}}
233243
end.
234244

235245
source_uri(_State) ->
@@ -347,6 +357,19 @@ handle_dest({{'DOWN', #resource{kind = queue,
347357
{eol, QState1, _QRef} ->
348358
State0#{dest => Dest#{current => Current#{queue_states => QState1}}}
349359
end;
360+
handle_dest({conserve_resources, Alarm, Conserve}, #{dest := #{alarms := Alarms0} = Dest} = State0) ->
361+
Alarms = case Conserve of
362+
true -> sets:add_element(Alarm, Alarms0);
363+
false -> sets:del_element(Alarm, Alarms0)
364+
end,
365+
State = State0#{dest => Dest#{alarms => Alarms}},
366+
case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of
367+
{false, true} ->
368+
%% All alarms cleared
369+
forward_pending_delivery(State);
370+
{_, _} ->
371+
State
372+
end;
350373
handle_dest(_Msg, State) ->
351374
State.
352375

@@ -362,7 +385,16 @@ forward(_, _, #{source := #{remaining_unacked := 0}} = State) ->
362385
%% come back. So drop subsequent messages on the floor to be
363386
%% requeued later
364387
State;
365-
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
388+
forward(Tag, Msg, State) ->
389+
case is_blocked(State) of
390+
true ->
391+
PendingEntry = {Tag, Msg},
392+
add_pending_delivery(PendingEntry, State);
393+
false ->
394+
do_forward(Tag, Msg, State)
395+
end.
396+
397+
do_forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
366398
ack_mode := AckMode} = State0) ->
367399
{Options, #{dest := #{current := Current1} = Dest1} = State} =
368400
case AckMode of
@@ -425,10 +457,15 @@ add_routing(Msg0, Dest) ->
425457
RK -> mc:set_annotation(?ANN_ROUTING_KEYS, [RK], Msg)
426458
end.
427459

428-
status(_) ->
429-
running.
460+
status(State) ->
461+
case is_blocked(State) of
462+
true -> blocked;
463+
false -> running
464+
end.
430465

431-
pending_count(_State) ->
466+
pending_count(#{dest := #{pending_delivery := Pending}}) ->
467+
queue:len(Pending);
468+
pending_count(_) ->
432469
0.
433470

434471
%% Internal
@@ -891,3 +928,35 @@ messages_delivered(QName, S0) ->
891928
_ ->
892929
ok
893930
end.
931+
932+
is_blocked(#{dest := #{alarms := Alarms}}) ->
933+
not sets:is_empty(Alarms);
934+
is_blocked(_) ->
935+
false.
936+
937+
add_pending_delivery(Elem, State = #{dest := Dest}) ->
938+
Pending = maps:get(pending_delivery, Dest, queue:new()),
939+
State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}.
940+
941+
pop_pending_delivery(State = #{dest := Dest}) ->
942+
Pending = maps:get(pending_delivery, Dest, queue:new()),
943+
case queue:out(Pending) of
944+
{empty, _} ->
945+
empty;
946+
{{value, Elem}, Pending2} ->
947+
{Elem, State#{dest => Dest#{pending_delivery => Pending2}}}
948+
end.
949+
950+
forward_pending_delivery(State) ->
951+
case pop_pending_delivery(State) of
952+
empty ->
953+
State;
954+
{{Tag, Mc}, S} ->
955+
S2 = do_forward(Tag, Mc, S),
956+
case is_blocked(S2) of
957+
true ->
958+
S2;
959+
false ->
960+
forward_pending_delivery(S2)
961+
end
962+
end.

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ groups() ->
3636
local_to_local_delete_dest_queue,
3737
local_to_local_stream_credit_flow_no_ack,
3838
local_to_local_simple_uri,
39-
local_to_local_counters
39+
local_to_local_counters,
40+
local_to_local_alarms
4041
]}
4142
].
4243

@@ -240,6 +241,41 @@ local_to_local_counters(Config) ->
240241
get_global_counters(Config), 30_000)
241242
end).
242243

244+
local_to_local_alarms(Config) ->
245+
Src = ?config(srcq, Config),
246+
Dest = ?config(destq, Config),
247+
ShovelArgs = [{<<"src-protocol">>, <<"local">>},
248+
{<<"src-queue">>, Src},
249+
{<<"dest-protocol">>, <<"local">>},
250+
{<<"dest-queue">>, Dest}],
251+
with_amqp10_session(
252+
Config,
253+
fun (Sess) ->
254+
amqp10_publish(Sess, Src, <<"hello">>, 1000),
255+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
256+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
257+
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
258+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
259+
amqp10_expect_empty(Sess, Dest),
260+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
261+
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
262+
amqp10_expect_count(Sess, Dest, 1000),
263+
264+
shovel_test_utils:clear_param(Config, ?PARAM),
265+
266+
amqp10_publish(Sess, Src, <<"hello">>, 1000),
267+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
268+
rabbit_ct_broker_helpers:set_alarm(Config, 0, memory),
269+
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
270+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
271+
amqp10_expect_empty(Sess, Dest),
272+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
273+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
274+
amqp10_expect_empty(Sess, Dest),
275+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, memory),
276+
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
277+
amqp10_expect_count(Sess, Dest, 1000)
278+
end).
243279
%%----------------------------------------------------------------------------
244280
declare_queue(Config, VHost, QName) ->
245281
declare_queue(Config, VHost, QName, []).
@@ -296,3 +332,11 @@ get_global_counters(Config) ->
296332
get_global_counters0(Config, Key) ->
297333
Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []),
298334
maps:get(Key, Overview).
335+
336+
get_blocked_status(Config) ->
337+
case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []) of
338+
[{_, _, {Status, PropList}, _, _}] ->
339+
{Status, proplists:get_value(blocked_status, PropList)};
340+
_ ->
341+
empty
342+
end.

deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ tests() ->
108108
application_properties,
109109
delete_src_queue,
110110
shovel_status,
111-
change_definition
111+
change_definition,
112+
disk_alarm
112113
].
113114

114115
%% -------------------------------------------------------------------
@@ -589,6 +590,20 @@ change_definition(Config) ->
589590
amqp10_expect_empty(Sess, Dest2)
590591
end).
591592

593+
disk_alarm(Config) ->
594+
Src = ?config(srcq, Config),
595+
Dest = ?config(destq, Config),
596+
with_amqp10_session(Config,
597+
fun (Sess) ->
598+
ShovelArgs = ?config(shovel_args, Config),
599+
amqp10_publish(Sess, Src, <<"hello">>, 10),
600+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
601+
set_param(Config, ?PARAM, ShovelArgs),
602+
amqp10_expect_empty(Sess, Dest),
603+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
604+
amqp10_expect_count(Sess, Dest, 10)
605+
end).
606+
592607
%%----------------------------------------------------------------------------
593608
maybe_skip_local_protocol(Config) ->
594609
[Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

0 commit comments

Comments
 (0)