Skip to content

Commit b219ccf

Browse files
dcorbachomergify[bot]
authored andcommitted
Local shovels: fix handling of acks/nacks from multiple queues
(cherry picked from commit 41d5283)
1 parent fdfe2ba commit b219ccf

File tree

3 files changed

+223
-31
lines changed

3 files changed

+223
-31
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
128128
State#{dest => Dest#{current => #{queue_states => QState,
129129
delivery_id => 1,
130130
vhost => VHost},
131-
unconfirmed => rabbit_confirms:init(),
131+
unconfirmed => rabbit_shovel_confirms:init(),
132132
rejected => [],
133133
rejected_count => 0,
134134
confirmed => [],
135135
confirmed_count => 0}};
136136
_ ->
137137
State#{dest => Dest#{current => #{queue_states => QState,
138138
vhost => VHost},
139-
unconfirmed => rabbit_confirms:init(),
139+
unconfirmed => rabbit_shovel_confirms:init(),
140140
confirmed => [],
141141
confirmed_count => 0,
142142
rejected => [],
@@ -361,8 +361,8 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
361361
{#{}, State0}
362362
end,
363363
Msg = set_annotations(Msg0, Dest),
364-
QNames = route(Msg, Dest),
365-
Queues = rabbit_amqqueue:lookup_many(QNames),
364+
RoutedQNames = route(Msg, Dest),
365+
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
366366
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
367367
{ok, QState1, Actions} ->
368368
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
@@ -374,12 +374,15 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
374374
on_publish ->
375375
rabbit_shovel_behaviour:decr_remaining(
376376
1,
377-
record_confirms([{Tag, none}], State2));
377+
record_confirms([{Tag, Tag}], State2));
378378
_ ->
379379
rabbit_shovel_behaviour:decr_remaining(1, State2)
380380
end),
381381
MsgSeqNo = maps:get(correlation, Options, undefined),
382-
State4 = process_routing_confirm(MsgSeqNo, QNames, State3),
382+
QNames = lists:map(fun({QName, _}) -> QName;
383+
(QName) -> QName
384+
end, RoutedQNames),
385+
State4 = process_routing_confirm(MsgSeqNo, Tag, QNames, State3),
383386
send_confirms_and_nacks(handle_dest_queue_actions(Actions, State4));
384387
{error, Reason} ->
385388
exit({shutdown, Reason})
@@ -470,7 +473,7 @@ handle_dest_queue_actions(Actions, State) ->
470473
{U, Rej} =
471474
lists:foldr(
472475
fun(SeqNo, {U1, Acc}) ->
473-
case rabbit_confirms:reject(SeqNo, U1) of
476+
case rabbit_shovel_confirms:reject(SeqNo, U1) of
474477
{ok, MX, U2} ->
475478
{U2, [MX | Acc]};
476479
{error, not_found} ->
@@ -736,15 +739,14 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
736739
at_least_one_credit_req_in_flight => false}}
737740
end.
738741

739-
process_routing_confirm(undefined, _, State) ->
742+
process_routing_confirm(undefined, _, _, State) ->
740743
State;
741-
process_routing_confirm(MsgSeqNo, [], State)
744+
process_routing_confirm(MsgSeqNo, Tag, [], State)
742745
when is_integer(MsgSeqNo) ->
743-
record_confirms([{MsgSeqNo, none}], State);
744-
process_routing_confirm(MsgSeqNo, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
745-
XName = rabbit_misc:r(<<"/">>, exchange, <<>>),
746+
record_confirms([{MsgSeqNo, Tag}], State);
747+
process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
746748
State#{dest => Dst#{unconfirmed =>
747-
rabbit_confirms:insert(MsgSeqNo, QRefs, XName, Unconfirmed)}}.
749+
rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}.
748750

749751
record_confirms([], State) ->
750752
State;
@@ -765,7 +767,7 @@ record_rejects(MXs, State = #{dest := Dst = #{rejected := R,
765767
rejected_count => RC + Num}}).
766768

767769
confirm(MsgSeqNos, QRef, State = #{dest := Dst = #{unconfirmed := UC}}) ->
768-
{ConfirmMXs, UC1} = rabbit_confirms:confirm(MsgSeqNos, QRef, UC),
770+
{ConfirmMXs, UC1} = rabbit_shovel_confirms:confirm(MsgSeqNos, QRef, UC),
769771
record_confirms(ConfirmMXs, State#{dest => Dst#{unconfirmed => UC1}}).
770772

771773
send_nacks([], _, State) ->
@@ -789,9 +791,9 @@ send_confirms(Cs, Rs, State) ->
789791
coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun,
790792
State = #{dest := #{unconfirmed := UC}}) ->
791793
SMsgSeqNos = lists:usort(MsgSeqNos),
792-
UnconfirmedCutoff = case rabbit_confirms:is_empty(UC) of
794+
UnconfirmedCutoff = case rabbit_shovel_confirms:is_empty(UC) of
793795
true -> lists:last(SMsgSeqNos) + 1;
794-
false -> rabbit_confirms:smallest(UC)
796+
false -> rabbit_shovel_confirms:smallest(UC)
795797
end,
796798
Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]),
797799
{Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos),
@@ -810,15 +812,15 @@ send_confirms_and_nacks(State = #{dest := #{confirmed := [],
810812
send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C,
811813
rejected := R}}) ->
812814
Confirms = lists:append(C),
813-
ConfirmMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Confirms],
815+
ConfirmTags = [Tag || {_, Tag} <- Confirms],
814816
Rejects = lists:append(R),
815-
RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects],
817+
RejectTags = [Tag || {_, Tag} <- Rejects],
816818
State1 = #{dest := Dst2}
817-
= send_confirms(ConfirmMsgSeqNos,
818-
RejectMsgSeqNos,
819+
= send_confirms(ConfirmTags,
820+
RejectTags,
819821
State#{dest => Dst#{confirmed => [],
820822
confirmed_count => 0}}),
821-
send_nacks(RejectMsgSeqNos,
822-
ConfirmMsgSeqNos,
823+
send_nacks(RejectTags,
824+
ConfirmTags,
823825
State1#{dest => Dst2#{rejected => [],
824826
rejected_count => 0}}).
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
-module(rabbit_shovel_confirms).
2+
3+
-compile({no_auto_import, [size/1]}).
4+
5+
-include_lib("rabbit_common/include/rabbit.hrl").
6+
7+
-export([init/0,
8+
insert/4,
9+
confirm/3,
10+
reject/2,
11+
12+
remove_queue/2,
13+
14+
smallest/1,
15+
size/1,
16+
is_empty/1]).
17+
18+
-type seq_no() :: non_neg_integer().
19+
-type delivery_tag() :: non_neg_integer().
20+
-type queue_name() :: rabbit_amqqueue:name().
21+
22+
-record(?MODULE, {smallest :: undefined | seq_no(),
23+
unconfirmed = #{} :: #{seq_no() =>
24+
{delivery_tag(),
25+
#{queue_name() => ok}}}
26+
}).
27+
28+
-type mx() :: {seq_no(), delivery_tag()}.
29+
30+
-opaque state() :: #?MODULE{}.
31+
32+
-export_type([
33+
state/0
34+
]).
35+
36+
-spec init() -> state().
37+
init() ->
38+
#?MODULE{}.
39+
40+
-spec insert(seq_no(), [queue_name()], delivery_tag(), state()) ->
41+
state().
42+
insert(SeqNo, QNames, Tag,
43+
#?MODULE{smallest = S0,
44+
unconfirmed = U0} = State)
45+
when is_integer(SeqNo)
46+
andalso is_list(QNames)
47+
andalso not is_map_key(SeqNo, U0) ->
48+
U = U0#{SeqNo => {Tag, maps:from_keys(QNames, ok)}},
49+
S = case S0 of
50+
undefined -> SeqNo;
51+
_ -> S0
52+
end,
53+
State#?MODULE{smallest = S,
54+
unconfirmed = U}.
55+
56+
-spec confirm([seq_no()], queue_name(), state()) ->
57+
{[mx()], state()}.
58+
confirm(SeqNos, QName, #?MODULE{smallest = Smallest0,
59+
unconfirmed = U0} = State)
60+
when is_list(SeqNos) ->
61+
{Confirmed, ConfirmedSmallest, U} =
62+
lists:foldl(
63+
fun (SeqNo, Acc) ->
64+
confirm_one(SeqNo, QName, Smallest0, Acc)
65+
end, {[], false, U0}, SeqNos),
66+
Smallest = case ConfirmedSmallest of
67+
true ->
68+
%% work out new smallest
69+
next_smallest(Smallest0, U);
70+
false ->
71+
Smallest0
72+
end,
73+
{Confirmed, State#?MODULE{smallest = Smallest,
74+
unconfirmed = U}}.
75+
76+
-spec reject(seq_no(), state()) ->
77+
{ok, mx(), state()} | {error, not_found}.
78+
reject(SeqNo, #?MODULE{smallest = Smallest0,
79+
unconfirmed = U0} = State)
80+
when is_integer(SeqNo) ->
81+
case maps:take(SeqNo, U0) of
82+
{{Tag, _QS}, U} ->
83+
Smallest = case SeqNo of
84+
Smallest0 ->
85+
%% need to scan as the smallest was removed
86+
next_smallest(Smallest0, U);
87+
_ ->
88+
Smallest0
89+
end,
90+
{ok, {SeqNo, Tag}, State#?MODULE{unconfirmed = U,
91+
smallest = Smallest}};
92+
error ->
93+
{error, not_found}
94+
end.
95+
96+
%% idempotent
97+
-spec remove_queue(queue_name(), state()) ->
98+
{[mx()], state()}.
99+
remove_queue(QName, #?MODULE{unconfirmed = U} = State) ->
100+
SeqNos = maps:fold(
101+
fun (SeqNo, {_Tag, QS0}, Acc) ->
102+
case maps:is_key(QName, QS0) of
103+
true ->
104+
[SeqNo | Acc];
105+
false ->
106+
Acc
107+
end
108+
end, [], U),
109+
confirm(lists:sort(SeqNos), QName,State).
110+
111+
-spec smallest(state()) -> seq_no() | undefined.
112+
smallest(#?MODULE{smallest = Smallest}) ->
113+
Smallest.
114+
115+
-spec size(state()) -> non_neg_integer().
116+
size(#?MODULE{unconfirmed = U}) ->
117+
maps:size(U).
118+
119+
-spec is_empty(state()) -> boolean().
120+
is_empty(State) ->
121+
size(State) == 0.
122+
123+
%% INTERNAL
124+
125+
confirm_one(SeqNo, QName, Smallest, {Acc, ConfirmedSmallest0, U0}) ->
126+
case maps:take(SeqNo, U0) of
127+
{{Tag, QS}, U1}
128+
when is_map_key(QName, QS)
129+
andalso map_size(QS) == 1 ->
130+
%% last queue confirm
131+
ConfirmedSmallest = case SeqNo of
132+
Smallest -> true;
133+
_ -> ConfirmedSmallest0
134+
end,
135+
{[{SeqNo, Tag} | Acc], ConfirmedSmallest, U1};
136+
{{Tag, QS}, U1} ->
137+
{Acc, ConfirmedSmallest0, U1#{SeqNo => {Tag, maps:remove(QName, QS)}}};
138+
error ->
139+
{Acc, ConfirmedSmallest0, U0}
140+
end.
141+
142+
next_smallest(_S, U) when map_size(U) == 0 ->
143+
undefined;
144+
next_smallest(S, U) when is_map_key(S, U) ->
145+
S;
146+
next_smallest(S, U) ->
147+
%% TODO: this is potentially infinitely recursive if called incorrectly
148+
next_smallest(S+1, U).

deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ groups() ->
2727
[
2828
{tests, [], [
2929
local_to_local_dest_down,
30-
local_to_local_multiple_dest_down,
30+
local_to_local_multiple_all_dest_down,
31+
local_to_local_multiple_some_dest_down,
3132
local_to_local_no_destination
3233
]}
3334
].
@@ -120,7 +121,7 @@ local_to_local_dest_down(Config) ->
120121
expect_many(Sess, Dest, 10)
121122
end).
122123

123-
local_to_local_multiple_dest_down(Config) ->
124+
local_to_local_multiple_all_dest_down(Config) ->
124125
Src = ?config(srcq, Config),
125126
Dest = ?config(destq, Config),
126127
Dest2 = ?config(destq2, Config),
@@ -139,16 +140,57 @@ local_to_local_multiple_dest_down(Config) ->
139140
]),
140141
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
141142
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
142-
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, 0, 0, 0],
143-
[<<"local_to_local_multiple_dest_down_dest2">>, 0, 0, 0],
144-
[<<"local_to_local_multiple_dest_down_src">>, 10, _, _]],
143+
?awaitMatch([[<<"local_to_local_multiple_all_dest_down_dest">>, 0, 0, 0],
144+
[<<"local_to_local_multiple_all_dest_down_dest2">>, 0, 0, 0],
145+
[<<"local_to_local_multiple_all_dest_down_src">>, 10, _, _]],
145146
list_queue_messages(Config),
146147
30000),
147148
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
148-
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, N, N, 0],
149-
[<<"local_to_local_multiple_dest_down_dest2">>, M, M, 0],
150-
[<<"local_to_local_multiple_dest_down_src">>, 0, 0, 0]]
151-
when ((N >= 10) and (M >= 10)),
149+
?awaitMatch([[<<"local_to_local_multiple_all_dest_down_dest">>, 10, 10, 0],
150+
[<<"local_to_local_multiple_all_dest_down_dest2">>, 10, 10, 0],
151+
[<<"local_to_local_multiple_all_dest_down_src">>, 0, 0, 0]],
152+
list_queue_messages(Config),
153+
30000),
154+
expect_many(Sess, Dest, 10)
155+
end).
156+
157+
local_to_local_multiple_some_dest_down(Config) ->
158+
Src = ?config(srcq, Config),
159+
Dest = ?config(destq, Config),
160+
Dest2 = ?config(destq2, Config),
161+
declare_queue(Config, 0, <<"/">>, Src),
162+
%% Declare each destination queue in a different node. Just one of
163+
%% them will be down, but this still means the message can't be confirmed
164+
%% and should be requeued.
165+
declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest),
166+
declare_and_bind_queue(Config, 2, <<"/">>, <<"amq.fanout">>, Dest2, Dest2),
167+
with_session(
168+
Config,
169+
fun (Sess) ->
170+
shovel_test_utils:set_param(Config, ?PARAM,
171+
[{<<"src-protocol">>, <<"local">>},
172+
{<<"src-queue">>, Src},
173+
{<<"dest-protocol">>, <<"local">>},
174+
{<<"dest-exchange">>, <<"amq.fanout">>},
175+
{<<"dest-exchange-key">>, <<"">>}
176+
]),
177+
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
178+
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
179+
%% Messages won't be confirmed to source until all destination
180+
%% queues are able to confirm them, until them we keep retrying
181+
%% This generates multiple duplicates, but that's how publishing
182+
%% works.
183+
?awaitMatch([[<<"local_to_local_multiple_some_dest_down_dest">>, 0, 0, 0],
184+
[<<"local_to_local_multiple_some_dest_down_dest2">>, M, M, 0],
185+
[<<"local_to_local_multiple_some_dest_down_src">>, 10, _, _]]
186+
when (M > 10),
187+
list_queue_messages(Config),
188+
30000),
189+
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
190+
?awaitMatch([[<<"local_to_local_multiple_some_dest_down_dest">>, N, N, 0],
191+
[<<"local_to_local_multiple_some_dest_down_dest2">>, M, M, 0],
192+
[<<"local_to_local_multiple_some_dest_down_src">>, 0, 0, 0]]
193+
when ((N == 10) and (M >= 10)),
152194
list_queue_messages(Config),
153195
30000),
154196
expect_many(Sess, Dest, 10)

0 commit comments

Comments
 (0)