From 6e2e19591ac9ecdf4615cb4805a837f84453f31d Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 28 Aug 2025 12:29:20 +0200 Subject: [PATCH 1/3] Local shovels: fix handling of acks/nacks --- .../src/rabbit_local_shovel.erl | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 8ba238675d69..f0b673ac0ad0 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -591,22 +591,20 @@ get_user_vhost_from_amqp_param(Uri) -> settle(Op, DeliveryTag, Multiple, #{source := #{queue_r := QRef, current := Current = #{consumer_tag := CTag, - unacked_message_q := UAMQ0} + unacked_message_q := UAMQ0, + queue_states := QState0} } = Src} = State0) -> {MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple), - State = State0#{source => Src#{current => Current#{unacked_message_q => UAMQ}}}, - lists:foldl( - fun(MsgId, #{source := Src0 = #{current := Current0 = #{queue_states := QState0}}} = St0) -> - case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of - {ok, QState1, Actions} -> - St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}}, - handle_queue_actions(Actions, St); - {'protocol_error', Type, Reason, Args} -> - ?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp", - [Op, Type, io_lib:format(Reason, Args)]), - exit({shutdown, {ack_failed, Reason}}) - end - end, State, MsgIds). + case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of + {ok, QState1, Actions} -> + State = State0#{source => Src#{current => Current#{queue_states => QState1, + unacked_message_q => UAMQ}}}, + handle_queue_actions(Actions, State); + {'protocol_error', Type, Reason, Args} -> + ?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp", + [Op, Type, io_lib:format(Reason, Args)]), + exit({shutdown, {ack_failed, Reason}}) + end. %% From rabbit_channel %% Records a client-sent acknowledgement. Handles both single delivery acks From 8a116500e07781152e5b26e8d907bbf2cc9fca5e Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Fri, 29 Aug 2025 17:00:52 +0200 Subject: [PATCH 2/3] Local shovels: generate multiple ack/nack as the channel does Avoids overflowing source queue with individual ack/nacks --- .../src/rabbit_local_shovel.erl | 174 +++++++++++++----- 1 file changed, 130 insertions(+), 44 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index f0b673ac0ad0..d001702466cb 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -128,11 +128,19 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs}, State#{dest => Dest#{current => #{queue_states => QState, delivery_id => 1, vhost => VHost}, - unacked => #{}}}; + unconfirmed => rabbit_confirms:init(), + rejected => [], + rejected_count => 0, + confirmed => [], + confirmed_count => 0}}; _ -> State#{dest => Dest#{current => #{queue_states => QState, vhost => VHost}, - unacked => #{}}} + unconfirmed => rabbit_confirms:init(), + confirmed => [], + confirmed_count => 0, + rejected => [], + rejected_count => 0}} end). maybe_add_dest_queue(State = #{dest := Dest = #{queue := QName, @@ -206,6 +214,7 @@ init_source(State = #{source := #{queue_r := QName, init_dest(#{name := Name, shovel_type := Type, dest := #{add_forward_headers := AFH} = Dst} = State) -> + _TRef = erlang:send_after(1000, self(), send_confirms_and_nacks), case AFH of true -> Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(), @@ -291,6 +300,9 @@ handle_source({queue_event, #resource{name = Queue, {protocol_error, _Type, Reason, ReasonArgs} -> {stop, list_to_binary(io_lib:format(Reason, ReasonArgs))} end; +handle_source(send_confirms_and_nacks, State) -> + _TRef = erlang:send_after(1000, self(), send_confirms_and_nacks), + send_confirms_and_nacks(State); handle_source({{'DOWN', #resource{name = Queue, kind = queue, virtual_host = VHost}}, _, _, _, _} , @@ -305,9 +317,9 @@ handle_dest({queue_event, QRef, Evt}, case rabbit_queue_type:handle_event(QRef, Evt, QueueStates0) of {ok, QState1, Actions} -> State = State0#{dest => Dest#{current => Current#{queue_states => QState1}}}, - handle_dest_queue_actions(Actions, State); + send_confirms_and_nacks(handle_dest_queue_actions(Actions, State)); {eol, Actions} -> - _ = handle_dest_queue_actions(Actions, State0), + _ = send_confirms_and_nacks(handle_dest_queue_actions(Actions, State0)), {stop, {outbound_link_or_channel_closure, queue_deleted}}; {protocol_error, _Type, Reason, ReasonArgs} -> {stop, list_to_binary(io_lib:format(Reason, ReasonArgs))} @@ -337,8 +349,7 @@ ack(DeliveryTag, Multiple, State) -> nack(DeliveryTag, Multiple, State) -> maybe_grant_credit(settle(requeue, DeliveryTag, Multiple, State)). -forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current, - unacked := Unacked} = Dest, +forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest, ack_mode := AckMode} = State0) -> {Options, #{dest := #{current := Current1} = Dest1} = State} = case AckMode of @@ -355,23 +366,21 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current, case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of {ok, QState1, Actions} -> State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}}, - #{dest := Dst1} = State2 = rabbit_shovel_behaviour:incr_forwarded(State1), - State4 = rabbit_shovel_behaviour:decr_remaining_unacked( + State2 = rabbit_shovel_behaviour:incr_forwarded(State1), + State3 = rabbit_shovel_behaviour:decr_remaining_unacked( case AckMode of - no_ack -> - rabbit_shovel_behaviour:decr_remaining(1, State2); on_confirm when length(Queues) > 0 -> - Correlation = maps:get(correlation, Options), - State2#{dest => Dst1#{unacked => Unacked#{Correlation => Tag}}}; - on_confirm -> - %% Drop the messages as 0.9.1, no destination available - State3 = rabbit_shovel_behaviour:ack(Tag, false, State2), - rabbit_shovel_behaviour:decr_remaining(1, State3); + State2; on_publish -> - State3 = rabbit_shovel_behaviour:ack(Tag, false, State2), - rabbit_shovel_behaviour:decr_remaining(1, State3) + rabbit_shovel_behaviour:decr_remaining( + 1, + record_confirms([{Tag, none}], State2)); + _ -> + rabbit_shovel_behaviour:decr_remaining(1, State2) end), - handle_dest_queue_actions(Actions, State4); + MsgSeqNo = maps:get(correlation, Options, undefined), + State4 = process_routing_confirm(MsgSeqNo, QNames, State3), + send_confirms_and_nacks(handle_dest_queue_actions(Actions, State4)); {error, Reason} -> exit({shutdown, Reason}) end. @@ -455,14 +464,21 @@ increase_next_tag(#{source := Source = #{current := Current = #{next_tag := Deli handle_dest_queue_actions(Actions, State) -> lists:foldl( - fun({settled, _QName, MsgSeqNos}, S0) -> - confirm_to_inbound(fun(Tag, StateX) -> - rabbit_shovel_behaviour:ack(Tag, false, StateX) - end, MsgSeqNos, S0); - ({rejected, _QName, MsgSeqNos}, S0) -> - confirm_to_inbound(fun(Tag, StateX) -> - rabbit_shovel_behaviour:nack(Tag, false, StateX) - end, MsgSeqNos, S0); + fun({settled, QName, MsgSeqNos}, S0) -> + confirm(MsgSeqNos, QName, S0); + ({rejected, _QName, MsgSeqNos}, #{dest := Dst = #{unconfirmed := U0}} = S0) -> + {U, Rej} = + lists:foldr( + fun(SeqNo, {U1, Acc}) -> + case rabbit_confirms:reject(SeqNo, U1) of + {ok, MX, U2} -> + {U2, [MX | Acc]}; + {error, not_found} -> + {U1, Acc} + end + end, {U0, []}, MsgSeqNos), + S = S0#{dest => Dst#{unconfirmed => U}}, + record_rejects(Rej, S); %% TODO handle {block, QName} (_Action, S0) -> S0 @@ -647,23 +663,6 @@ route(Msg, #{current := #{vhost := VHost}}) -> Exchange = rabbit_exchange:lookup_or_die(ExchangeName), rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}). -confirm_to_inbound(ConfirmFun, SeqNos, State) - when is_list(SeqNos) -> - lists:foldl(fun(Seq, State0) -> - confirm_to_inbound(ConfirmFun, Seq, State0) - end, State, SeqNos); -confirm_to_inbound(ConfirmFun, Seq, - State0 = #{dest := #{unacked := Unacked} = Dst}) -> - case Unacked of - #{Seq := InTag} -> - Unacked1 = maps:remove(Seq, Unacked), - State = rabbit_shovel_behaviour:decr_remaining( - 1, State0#{dest => Dst#{unacked => Unacked1}}), - ConfirmFun(InTag, State); - _ -> - State0 - end. - sent_delivery(#{source := #{delivery_count := DeliveryCount0, credit := Credit0} = Src } = State0, NumMsgs) -> @@ -736,3 +735,90 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra State0#{source => Src#{credit => Credit, at_least_one_credit_req_in_flight => false}} end. + +process_routing_confirm(undefined, _, State) -> + State; +process_routing_confirm(MsgSeqNo, [], State) + when is_integer(MsgSeqNo) -> + record_confirms([{MsgSeqNo, none}], State); +process_routing_confirm(MsgSeqNo, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) -> + XName = rabbit_misc:r(<<"/">>, exchange, <<>>), + State#{dest => Dst#{unconfirmed => + rabbit_confirms:insert(MsgSeqNo, QRefs, XName, Unconfirmed)}}. + +record_confirms([], State) -> + State; +record_confirms(MXs, State = #{dest := Dst = #{confirmed := C, + confirmed_count := CC}}) -> + Num = length(MXs), + rabbit_shovel_behaviour:decr_remaining( + Num, State#{dest => Dst#{confirmed => [MXs | C], + confirmed_count => CC + Num}}). + +record_rejects([], State) -> + State; +record_rejects(MXs, State = #{dest := Dst = #{rejected := R, + rejected_count := RC}}) -> + Num = length(MXs), + rabbit_shovel_behaviour:decr_remaining( + Num, State#{dest => Dst#{rejected => [MXs | R], + rejected_count => RC + Num}}). + +confirm(MsgSeqNos, QRef, State = #{dest := Dst = #{unconfirmed := UC}}) -> + {ConfirmMXs, UC1} = rabbit_confirms:confirm(MsgSeqNos, QRef, UC), + record_confirms(ConfirmMXs, State#{dest => Dst#{unconfirmed => UC1}}). + +send_nacks([], _, State) -> + State; +send_nacks(Rs, Cs, State) -> + coalesce_and_send(Rs, Cs, + fun(MsgSeqNo, Multiple, StateX) -> + rabbit_shovel_behaviour:nack(MsgSeqNo, Multiple, StateX) + end, State). + +send_confirms([], _, State) -> + State; +send_confirms([MsgSeqNo], _, State) -> + rabbit_shovel_behaviour:ack(MsgSeqNo, false, State); +send_confirms(Cs, Rs, State) -> + coalesce_and_send(Cs, Rs, + fun(MsgSeqNo, Multiple, StateX) -> + rabbit_shovel_behaviour:ack(MsgSeqNo, Multiple, StateX) + end, State). + +coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, + State = #{dest := #{unconfirmed := UC}}) -> + SMsgSeqNos = lists:usort(MsgSeqNos), + UnconfirmedCutoff = case rabbit_confirms:is_empty(UC) of + true -> lists:last(SMsgSeqNos) + 1; + false -> rabbit_confirms:smallest(UC) + end, + Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]), + {Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos), + State1 = case Ms of + [] -> State; + _ -> MkMsgFun(lists:last(Ms), true, State) + end, + lists:foldl(fun(SeqNo, S) -> + MkMsgFun(SeqNo, false, S) + end, State1, Ss). + +%% Todo remove XName from confirm/unconfirm as we don't need it for local +send_confirms_and_nacks(State = #{dest := #{confirmed := [], + rejected := []}}) -> + State; +send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C, + rejected := R}}) -> + Confirms = lists:append(C), + ConfirmMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Confirms], + Rejects = lists:append(R), + RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects], + State1 = #{dest := Dst2} + = send_confirms(ConfirmMsgSeqNos, + RejectMsgSeqNos, + State#{dest => Dst#{confirmed => [], + confirmed_count => 0}}), + send_nacks(RejectMsgSeqNos, + ConfirmMsgSeqNos, + State1#{dest => Dst2#{rejected => [], + rejected_count => 0}}). From 41d52835bf22e2789b0f0a2c2748f5f5b69ccd2f Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Tue, 2 Sep 2025 12:01:10 +0200 Subject: [PATCH 3/3] Local shovels: fix handling of acks/nacks from multiple queues --- .../src/rabbit_local_shovel.erl | 46 +++--- .../src/rabbit_shovel_confirms.erl | 148 ++++++++++++++++++ .../test/local_dynamic_cluster_SUITE.erl | 60 +++++-- 3 files changed, 223 insertions(+), 31 deletions(-) create mode 100644 deps/rabbitmq_shovel/src/rabbit_shovel_confirms.erl diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index d001702466cb..66c6dbdd38bc 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -128,7 +128,7 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs}, State#{dest => Dest#{current => #{queue_states => QState, delivery_id => 1, vhost => VHost}, - unconfirmed => rabbit_confirms:init(), + unconfirmed => rabbit_shovel_confirms:init(), rejected => [], rejected_count => 0, confirmed => [], @@ -136,7 +136,7 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs}, _ -> State#{dest => Dest#{current => #{queue_states => QState, vhost => VHost}, - unconfirmed => rabbit_confirms:init(), + unconfirmed => rabbit_shovel_confirms:init(), confirmed => [], confirmed_count => 0, rejected => [], @@ -361,8 +361,8 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = {#{}, State0} end, Msg = set_annotations(Msg0, Dest), - QNames = route(Msg, Dest), - Queues = rabbit_amqqueue:lookup_many(QNames), + RoutedQNames = route(Msg, Dest), + Queues = rabbit_amqqueue:lookup_many(RoutedQNames), case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of {ok, QState1, Actions} -> State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}}, @@ -374,12 +374,15 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = on_publish -> rabbit_shovel_behaviour:decr_remaining( 1, - record_confirms([{Tag, none}], State2)); + record_confirms([{Tag, Tag}], State2)); _ -> rabbit_shovel_behaviour:decr_remaining(1, State2) end), MsgSeqNo = maps:get(correlation, Options, undefined), - State4 = process_routing_confirm(MsgSeqNo, QNames, State3), + QNames = lists:map(fun({QName, _}) -> QName; + (QName) -> QName + end, RoutedQNames), + State4 = process_routing_confirm(MsgSeqNo, Tag, QNames, State3), send_confirms_and_nacks(handle_dest_queue_actions(Actions, State4)); {error, Reason} -> exit({shutdown, Reason}) @@ -470,7 +473,7 @@ handle_dest_queue_actions(Actions, State) -> {U, Rej} = lists:foldr( fun(SeqNo, {U1, Acc}) -> - case rabbit_confirms:reject(SeqNo, U1) of + case rabbit_shovel_confirms:reject(SeqNo, U1) of {ok, MX, U2} -> {U2, [MX | Acc]}; {error, not_found} -> @@ -736,15 +739,14 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra at_least_one_credit_req_in_flight => false}} end. -process_routing_confirm(undefined, _, State) -> +process_routing_confirm(undefined, _, _, State) -> State; -process_routing_confirm(MsgSeqNo, [], State) +process_routing_confirm(MsgSeqNo, Tag, [], State) when is_integer(MsgSeqNo) -> - record_confirms([{MsgSeqNo, none}], State); -process_routing_confirm(MsgSeqNo, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) -> - XName = rabbit_misc:r(<<"/">>, exchange, <<>>), + record_confirms([{MsgSeqNo, Tag}], State); +process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) -> State#{dest => Dst#{unconfirmed => - rabbit_confirms:insert(MsgSeqNo, QRefs, XName, Unconfirmed)}}. + rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}. record_confirms([], State) -> State; @@ -765,7 +767,7 @@ record_rejects(MXs, State = #{dest := Dst = #{rejected := R, rejected_count => RC + Num}}). confirm(MsgSeqNos, QRef, State = #{dest := Dst = #{unconfirmed := UC}}) -> - {ConfirmMXs, UC1} = rabbit_confirms:confirm(MsgSeqNos, QRef, UC), + {ConfirmMXs, UC1} = rabbit_shovel_confirms:confirm(MsgSeqNos, QRef, UC), record_confirms(ConfirmMXs, State#{dest => Dst#{unconfirmed => UC1}}). send_nacks([], _, State) -> @@ -789,9 +791,9 @@ send_confirms(Cs, Rs, State) -> coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, State = #{dest := #{unconfirmed := UC}}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - UnconfirmedCutoff = case rabbit_confirms:is_empty(UC) of + UnconfirmedCutoff = case rabbit_shovel_confirms:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; - false -> rabbit_confirms:smallest(UC) + false -> rabbit_shovel_confirms:smallest(UC) end, Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]), {Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos), @@ -810,15 +812,15 @@ send_confirms_and_nacks(State = #{dest := #{confirmed := [], send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C, rejected := R}}) -> Confirms = lists:append(C), - ConfirmMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Confirms], + ConfirmTags = [Tag || {_, Tag} <- Confirms], Rejects = lists:append(R), - RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects], + RejectTags = [Tag || {_, Tag} <- Rejects], State1 = #{dest := Dst2} - = send_confirms(ConfirmMsgSeqNos, - RejectMsgSeqNos, + = send_confirms(ConfirmTags, + RejectTags, State#{dest => Dst#{confirmed => [], confirmed_count => 0}}), - send_nacks(RejectMsgSeqNos, - ConfirmMsgSeqNos, + send_nacks(RejectTags, + ConfirmTags, State1#{dest => Dst2#{rejected => [], rejected_count => 0}}). diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_confirms.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_confirms.erl new file mode 100644 index 000000000000..1592d24bba2a --- /dev/null +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_confirms.erl @@ -0,0 +1,148 @@ +-module(rabbit_shovel_confirms). + +-compile({no_auto_import, [size/1]}). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([init/0, + insert/4, + confirm/3, + reject/2, + + remove_queue/2, + + smallest/1, + size/1, + is_empty/1]). + +-type seq_no() :: non_neg_integer(). +-type delivery_tag() :: non_neg_integer(). +-type queue_name() :: rabbit_amqqueue:name(). + +-record(?MODULE, {smallest :: undefined | seq_no(), + unconfirmed = #{} :: #{seq_no() => + {delivery_tag(), + #{queue_name() => ok}}} + }). + +-type mx() :: {seq_no(), delivery_tag()}. + +-opaque state() :: #?MODULE{}. + +-export_type([ + state/0 + ]). + +-spec init() -> state(). +init() -> + #?MODULE{}. + +-spec insert(seq_no(), [queue_name()], delivery_tag(), state()) -> + state(). +insert(SeqNo, QNames, Tag, + #?MODULE{smallest = S0, + unconfirmed = U0} = State) + when is_integer(SeqNo) + andalso is_list(QNames) + andalso not is_map_key(SeqNo, U0) -> + U = U0#{SeqNo => {Tag, maps:from_keys(QNames, ok)}}, + S = case S0 of + undefined -> SeqNo; + _ -> S0 + end, + State#?MODULE{smallest = S, + unconfirmed = U}. + +-spec confirm([seq_no()], queue_name(), state()) -> + {[mx()], state()}. +confirm(SeqNos, QName, #?MODULE{smallest = Smallest0, + unconfirmed = U0} = State) + when is_list(SeqNos) -> + {Confirmed, ConfirmedSmallest, U} = + lists:foldl( + fun (SeqNo, Acc) -> + confirm_one(SeqNo, QName, Smallest0, Acc) + end, {[], false, U0}, SeqNos), + Smallest = case ConfirmedSmallest of + true -> + %% work out new smallest + next_smallest(Smallest0, U); + false -> + Smallest0 + end, + {Confirmed, State#?MODULE{smallest = Smallest, + unconfirmed = U}}. + +-spec reject(seq_no(), state()) -> + {ok, mx(), state()} | {error, not_found}. +reject(SeqNo, #?MODULE{smallest = Smallest0, + unconfirmed = U0} = State) + when is_integer(SeqNo) -> + case maps:take(SeqNo, U0) of + {{Tag, _QS}, U} -> + Smallest = case SeqNo of + Smallest0 -> + %% need to scan as the smallest was removed + next_smallest(Smallest0, U); + _ -> + Smallest0 + end, + {ok, {SeqNo, Tag}, State#?MODULE{unconfirmed = U, + smallest = Smallest}}; + error -> + {error, not_found} + end. + +%% idempotent +-spec remove_queue(queue_name(), state()) -> + {[mx()], state()}. +remove_queue(QName, #?MODULE{unconfirmed = U} = State) -> + SeqNos = maps:fold( + fun (SeqNo, {_Tag, QS0}, Acc) -> + case maps:is_key(QName, QS0) of + true -> + [SeqNo | Acc]; + false -> + Acc + end + end, [], U), + confirm(lists:sort(SeqNos), QName,State). + +-spec smallest(state()) -> seq_no() | undefined. +smallest(#?MODULE{smallest = Smallest}) -> + Smallest. + +-spec size(state()) -> non_neg_integer(). +size(#?MODULE{unconfirmed = U}) -> + maps:size(U). + +-spec is_empty(state()) -> boolean(). +is_empty(State) -> + size(State) == 0. + +%% INTERNAL + +confirm_one(SeqNo, QName, Smallest, {Acc, ConfirmedSmallest0, U0}) -> + case maps:take(SeqNo, U0) of + {{Tag, QS}, U1} + when is_map_key(QName, QS) + andalso map_size(QS) == 1 -> + %% last queue confirm + ConfirmedSmallest = case SeqNo of + Smallest -> true; + _ -> ConfirmedSmallest0 + end, + {[{SeqNo, Tag} | Acc], ConfirmedSmallest, U1}; + {{Tag, QS}, U1} -> + {Acc, ConfirmedSmallest0, U1#{SeqNo => {Tag, maps:remove(QName, QS)}}}; + error -> + {Acc, ConfirmedSmallest0, U0} + end. + +next_smallest(_S, U) when map_size(U) == 0 -> + undefined; +next_smallest(S, U) when is_map_key(S, U) -> + S; +next_smallest(S, U) -> + %% TODO: this is potentially infinitely recursive if called incorrectly + next_smallest(S+1, U). diff --git a/deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl index 6d18a2986bfc..4aadb927981c 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl @@ -27,7 +27,8 @@ groups() -> [ {tests, [], [ local_to_local_dest_down, - local_to_local_multiple_dest_down, + local_to_local_multiple_all_dest_down, + local_to_local_multiple_some_dest_down, local_to_local_no_destination ]} ]. @@ -120,7 +121,7 @@ local_to_local_dest_down(Config) -> expect_many(Sess, Dest, 10) end). -local_to_local_multiple_dest_down(Config) -> +local_to_local_multiple_all_dest_down(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), Dest2 = ?config(destq2, Config), @@ -139,16 +140,57 @@ local_to_local_multiple_dest_down(Config) -> ]), ok = rabbit_ct_broker_helpers:stop_node(Config, 1), publish_many(Sess, Src, Dest, <<"tag1">>, 10), - ?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, 0, 0, 0], - [<<"local_to_local_multiple_dest_down_dest2">>, 0, 0, 0], - [<<"local_to_local_multiple_dest_down_src">>, 10, _, _]], + ?awaitMatch([[<<"local_to_local_multiple_all_dest_down_dest">>, 0, 0, 0], + [<<"local_to_local_multiple_all_dest_down_dest2">>, 0, 0, 0], + [<<"local_to_local_multiple_all_dest_down_src">>, 10, _, _]], list_queue_messages(Config), 30000), ok = rabbit_ct_broker_helpers:start_node(Config, 1), - ?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, N, N, 0], - [<<"local_to_local_multiple_dest_down_dest2">>, M, M, 0], - [<<"local_to_local_multiple_dest_down_src">>, 0, 0, 0]] - when ((N >= 10) and (M >= 10)), + ?awaitMatch([[<<"local_to_local_multiple_all_dest_down_dest">>, 10, 10, 0], + [<<"local_to_local_multiple_all_dest_down_dest2">>, 10, 10, 0], + [<<"local_to_local_multiple_all_dest_down_src">>, 0, 0, 0]], + list_queue_messages(Config), + 30000), + expect_many(Sess, Dest, 10) + end). + +local_to_local_multiple_some_dest_down(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + Dest2 = ?config(destq2, Config), + declare_queue(Config, 0, <<"/">>, Src), + %% Declare each destination queue in a different node. Just one of + %% them will be down, but this still means the message can't be confirmed + %% and should be requeued. + declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest), + declare_and_bind_queue(Config, 2, <<"/">>, <<"amq.fanout">>, Dest2, Dest2), + with_session( + Config, + fun (Sess) -> + shovel_test_utils:set_param(Config, ?PARAM, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-exchange">>, <<"amq.fanout">>}, + {<<"dest-exchange-key">>, <<"">>} + ]), + ok = rabbit_ct_broker_helpers:stop_node(Config, 1), + publish_many(Sess, Src, Dest, <<"tag1">>, 10), + %% Messages won't be confirmed to source until all destination + %% queues are able to confirm them, until them we keep retrying + %% This generates multiple duplicates, but that's how publishing + %% works. + ?awaitMatch([[<<"local_to_local_multiple_some_dest_down_dest">>, 0, 0, 0], + [<<"local_to_local_multiple_some_dest_down_dest2">>, M, M, 0], + [<<"local_to_local_multiple_some_dest_down_src">>, 10, _, _]] + when (M > 10), + list_queue_messages(Config), + 30000), + ok = rabbit_ct_broker_helpers:start_node(Config, 1), + ?awaitMatch([[<<"local_to_local_multiple_some_dest_down_dest">>, N, N, 0], + [<<"local_to_local_multiple_some_dest_down_dest2">>, M, M, 0], + [<<"local_to_local_multiple_some_dest_down_src">>, 0, 0, 0]] + when ((N == 10) and (M >= 10)), list_queue_messages(Config), 30000), expect_many(Sess, Dest, 10)