Skip to content

Commit 1c4f9b2

Browse files
committed
Local shovels: fix handling of acks/nacks from multiple queues
1 parent 911c553 commit 1c4f9b2

File tree

3 files changed

+235
-37
lines changed

3 files changed

+235
-37
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
128128
State#{dest => Dest#{current => #{queue_states => QState,
129129
delivery_id => 1,
130130
vhost => VHost},
131-
unconfirmed => rabbit_confirms:init(),
131+
unconfirmed => rabbit_shovel_confirms:init(),
132132
rejected => [],
133133
rejected_count => 0,
134134
confirmed => [],
135135
confirmed_count => 0}};
136136
_ ->
137137
State#{dest => Dest#{current => #{queue_states => QState,
138138
vhost => VHost},
139-
unconfirmed => rabbit_confirms:init(),
139+
unconfirmed => rabbit_shovel_confirms:init(),
140140
confirmed => [],
141141
confirmed_count => 0,
142142
rejected => [],
@@ -361,8 +361,8 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
361361
{#{}, State0}
362362
end,
363363
Msg = set_annotations(Msg0, Dest),
364-
QNames = route(Msg, Dest),
365-
Queues = rabbit_amqqueue:lookup_many(QNames),
364+
RoutedQNames = route(Msg, Dest),
365+
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
366366
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
367367
{ok, QState1, Actions} ->
368368
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
@@ -374,12 +374,15 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
374374
on_publish ->
375375
rabbit_shovel_behaviour:decr_remaining(
376376
1,
377-
record_confirms([{Tag, none}], State2));
377+
record_confirms([{Tag, Tag}], State2));
378378
_ ->
379379
rabbit_shovel_behaviour:decr_remaining(1, State2)
380380
end),
381381
MsgSeqNo = maps:get(correlation, Options, undefined),
382-
State4 = process_routing_confirm(MsgSeqNo, QNames, State3),
382+
QNames = lists:map(fun({QName, _}) -> QName;
383+
(QName) -> QName
384+
end, RoutedQNames),
385+
State4 = process_routing_confirm(MsgSeqNo, Tag, QNames, State3),
383386
send_confirms_and_nacks(handle_dest_queue_actions(Actions, State4));
384387
{error, Reason} ->
385388
exit({shutdown, Reason})
@@ -470,7 +473,7 @@ handle_dest_queue_actions(Actions, State) ->
470473
{U, Rej} =
471474
lists:foldr(
472475
fun(SeqNo, {U1, Acc}) ->
473-
case rabbit_confirms:reject(SeqNo, U1) of
476+
case rabbit_shovel_confirms:reject(SeqNo, U1) of
474477
{ok, MX, U2} ->
475478
{U2, [MX | Acc]};
476479
{error, not_found} ->
@@ -736,15 +739,14 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
736739
at_least_one_credit_req_in_flight => false}}
737740
end.
738741

739-
process_routing_confirm(undefined, _, State) ->
742+
process_routing_confirm(undefined, _, _, State) ->
740743
State;
741-
process_routing_confirm(MsgSeqNo, [], State)
744+
process_routing_confirm(MsgSeqNo, Tag, [], State)
742745
when is_integer(MsgSeqNo) ->
743-
record_confirms([{MsgSeqNo, none}], State);
744-
process_routing_confirm(MsgSeqNo, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
745-
XName = rabbit_misc:r(<<"/">>, exchange, <<>>),
746+
record_confirms([{MsgSeqNo, Tag}], State);
747+
process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
746748
State#{dest => Dst#{unconfirmed =>
747-
rabbit_confirms:insert(MsgSeqNo, QRefs, XName, Unconfirmed)}}.
749+
rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}.
748750

749751
record_confirms([], State) ->
750752
State;
@@ -765,7 +767,7 @@ record_rejects(MXs, State = #{dest := Dst = #{rejected := R,
765767
rejected_count => RC + Num}}).
766768

767769
confirm(MsgSeqNos, QRef, State = #{dest := Dst = #{unconfirmed := UC}}) ->
768-
{ConfirmMXs, UC1} = rabbit_confirms:confirm(MsgSeqNos, QRef, UC),
770+
{ConfirmMXs, UC1} = rabbit_shovel_confirms:confirm(MsgSeqNos, QRef, UC),
769771
record_confirms(ConfirmMXs, State#{dest => Dst#{unconfirmed => UC1}}).
770772

771773
send_nacks([], _, State) ->
@@ -789,9 +791,9 @@ send_confirms(Cs, Rs, State) ->
789791
coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun,
790792
State = #{dest := #{unconfirmed := UC}}) ->
791793
SMsgSeqNos = lists:usort(MsgSeqNos),
792-
UnconfirmedCutoff = case rabbit_confirms:is_empty(UC) of
794+
UnconfirmedCutoff = case rabbit_shovel_confirms:is_empty(UC) of
793795
true -> lists:last(SMsgSeqNos) + 1;
794-
false -> rabbit_confirms:smallest(UC)
796+
false -> rabbit_shovel_confirms:smallest(UC)
795797
end,
796798
Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]),
797799
{Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos),
@@ -810,15 +812,15 @@ send_confirms_and_nacks(State = #{dest := #{confirmed := [],
810812
send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C,
811813
rejected := R}}) ->
812814
Confirms = lists:append(C),
813-
ConfirmMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Confirms],
815+
ConfirmTags = [Tag || {_, Tag} <- Confirms],
814816
Rejects = lists:append(R),
815-
RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects],
817+
RejectTags = [Tag || {_, Tag} <- Rejects],
816818
State1 = #{dest := Dst2}
817-
= send_confirms(ConfirmMsgSeqNos,
818-
RejectMsgSeqNos,
819+
= send_confirms(ConfirmTags,
820+
RejectTags,
819821
State#{dest => Dst#{confirmed => [],
820822
confirmed_count => 0}}),
821-
send_nacks(RejectMsgSeqNos,
822-
ConfirmMsgSeqNos,
823+
send_nacks(RejectTags,
824+
ConfirmTags,
823825
State1#{dest => Dst2#{rejected => [],
824826
rejected_count => 0}}).
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
-module(rabbit_shovel_confirms).
2+
3+
-compile({no_auto_import, [size/1]}).
4+
5+
-include_lib("rabbit_common/include/rabbit.hrl").
6+
7+
-export([init/0,
8+
insert/4,
9+
confirm/3,
10+
reject/2,
11+
12+
remove_queue/2,
13+
14+
smallest/1,
15+
size/1,
16+
is_empty/1]).
17+
18+
-type seq_no() :: non_neg_integer().
19+
-type delivery_tag() :: non_neg_integer().
20+
-type queue_name() :: rabbit_amqqueue:name().
21+
22+
-record(?MODULE, {smallest :: undefined | seq_no(),
23+
unconfirmed = #{} :: #{seq_no() =>
24+
{delivery_tag(),
25+
#{queue_name() => ok}}}
26+
}).
27+
28+
-type mx() :: {seq_no(), delivery_tag()}.
29+
30+
-opaque state() :: #?MODULE{}.
31+
32+
-export_type([
33+
state/0
34+
]).
35+
36+
-spec init() -> state().
37+
init() ->
38+
#?MODULE{}.
39+
40+
-spec insert(seq_no(), [queue_name()], delivery_tag(), state()) ->
41+
state().
42+
insert(SeqNo, QNames, Tag,
43+
#?MODULE{smallest = S0,
44+
unconfirmed = U0} = State)
45+
when is_integer(SeqNo)
46+
andalso is_list(QNames)
47+
andalso not is_map_key(SeqNo, U0) ->
48+
U = U0#{SeqNo => {Tag, maps:from_keys(QNames, ok)}},
49+
S = case S0 of
50+
undefined -> SeqNo;
51+
_ -> S0
52+
end,
53+
State#?MODULE{smallest = S,
54+
unconfirmed = U}.
55+
56+
-spec confirm([seq_no()], queue_name(), state()) ->
57+
{[mx()], state()}.
58+
confirm(SeqNos, QName, #?MODULE{smallest = Smallest0,
59+
unconfirmed = U0} = State)
60+
when is_list(SeqNos) ->
61+
{Confirmed, ConfirmedSmallest, U} =
62+
lists:foldl(
63+
fun (SeqNo, Acc) ->
64+
confirm_one(SeqNo, QName, Smallest0, Acc)
65+
end, {[], false, U0}, SeqNos),
66+
Smallest = case ConfirmedSmallest of
67+
true ->
68+
%% work out new smallest
69+
next_smallest(Smallest0, U);
70+
false ->
71+
Smallest0
72+
end,
73+
{Confirmed, State#?MODULE{smallest = Smallest,
74+
unconfirmed = U}}.
75+
76+
-spec reject(seq_no(), state()) ->
77+
{ok, mx(), state()} | {error, not_found}.
78+
reject(SeqNo, #?MODULE{smallest = Smallest0,
79+
unconfirmed = U0} = State)
80+
when is_integer(SeqNo) ->
81+
case maps:take(SeqNo, U0) of
82+
{{Tag, _QS}, U} ->
83+
Smallest = case SeqNo of
84+
Smallest0 ->
85+
%% need to scan as the smallest was removed
86+
next_smallest(Smallest0, U);
87+
_ ->
88+
Smallest0
89+
end,
90+
{ok, {SeqNo, Tag}, State#?MODULE{unconfirmed = U,
91+
smallest = Smallest}};
92+
error ->
93+
{error, not_found}
94+
end.
95+
96+
%% idempotent
97+
-spec remove_queue(queue_name(), state()) ->
98+
{[mx()], state()}.
99+
remove_queue(QName, #?MODULE{unconfirmed = U} = State) ->
100+
SeqNos = maps:fold(
101+
fun (SeqNo, {_Tag, QS0}, Acc) ->
102+
case maps:is_key(QName, QS0) of
103+
true ->
104+
[SeqNo | Acc];
105+
false ->
106+
Acc
107+
end
108+
end, [], U),
109+
confirm(lists:sort(SeqNos), QName,State).
110+
111+
-spec smallest(state()) -> seq_no() | undefined.
112+
smallest(#?MODULE{smallest = Smallest}) ->
113+
Smallest.
114+
115+
-spec size(state()) -> non_neg_integer().
116+
size(#?MODULE{unconfirmed = U}) ->
117+
maps:size(U).
118+
119+
-spec is_empty(state()) -> boolean().
120+
is_empty(State) ->
121+
size(State) == 0.
122+
123+
%% INTERNAL
124+
125+
confirm_one(SeqNo, QName, Smallest, {Acc, ConfirmedSmallest0, U0}) ->
126+
case maps:take(SeqNo, U0) of
127+
{{Tag, QS}, U1}
128+
when is_map_key(QName, QS)
129+
andalso map_size(QS) == 1 ->
130+
%% last queue confirm
131+
ConfirmedSmallest = case SeqNo of
132+
Smallest -> true;
133+
_ -> ConfirmedSmallest0
134+
end,
135+
{[{SeqNo, Tag} | Acc], ConfirmedSmallest, U1};
136+
{{Tag, QS}, U1} ->
137+
{Acc, ConfirmedSmallest0, U1#{SeqNo => {Tag, maps:remove(QName, QS)}}};
138+
error ->
139+
{Acc, ConfirmedSmallest0, U0}
140+
end.
141+
142+
next_smallest(_S, U) when map_size(U) == 0 ->
143+
undefined;
144+
next_smallest(S, U) when is_map_key(S, U) ->
145+
S;
146+
next_smallest(S, U) ->
147+
%% TODO: this is potentially infinitely recursive if called incorrectly
148+
next_smallest(S+1, U).

deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ groups() ->
2727
[
2828
{tests, [], [
2929
local_to_local_dest_down,
30-
local_to_local_multiple_dest_down,
30+
local_to_local_multiple_all_dest_down,
31+
local_to_local_multiple_some_dest_down,
3132
local_to_local_no_destination
3233
]}
3334
].
@@ -50,9 +51,18 @@ init_per_suite(Config0) ->
5051
"dest_queue_down"
5152
]}
5253
]),
53-
rabbit_ct_helpers:run_setup_steps(Config1,
54-
rabbit_ct_broker_helpers:setup_steps() ++
55-
rabbit_ct_client_helpers:setup_steps()).
54+
Config2 = rabbit_ct_helpers:run_setup_steps(
55+
Config1,
56+
rabbit_ct_broker_helpers:setup_steps() ++
57+
rabbit_ct_client_helpers:setup_steps()),
58+
[Node | _] = rabbit_ct_broker_helpers:get_node_configs(Config2, nodename),
59+
case rabbit_ct_broker_helpers:enable_feature_flag(
60+
Config2, [Node], 'rabbitmq_4.0.0') of
61+
ok ->
62+
Config2;
63+
_ ->
64+
{skip, "This suite requires rabbitmq_4.0.0 feature flag"}
65+
end.
5666

5767
end_per_suite(Config) ->
5868
application:stop(amqp10_client),
@@ -61,9 +71,6 @@ end_per_suite(Config) ->
6171
rabbit_ct_broker_helpers:teardown_steps()).
6272

6373
init_per_group(_, Config) ->
64-
[Node | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
65-
ok = rabbit_ct_broker_helpers:enable_feature_flag(
66-
Config, [Node], 'rabbitmq_4.0.0'),
6774
Config.
6875

6976
end_per_group(_, Config) ->
@@ -118,7 +125,7 @@ local_to_local_dest_down(Config) ->
118125
expect_many(Sess, Dest, 10)
119126
end).
120127

121-
local_to_local_multiple_dest_down(Config) ->
128+
local_to_local_multiple_all_dest_down(Config) ->
122129
Src = ?config(srcq, Config),
123130
Dest = ?config(destq, Config),
124131
Dest2 = ?config(destq2, Config),
@@ -137,16 +144,57 @@ local_to_local_multiple_dest_down(Config) ->
137144
]),
138145
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
139146
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
140-
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, 0, 0, 0],
141-
[<<"local_to_local_multiple_dest_down_dest2">>, 0, 0, 0],
142-
[<<"local_to_local_multiple_dest_down_src">>, 10, _, _]],
147+
?awaitMatch([[<<"local_to_local_multiple_all_dest_down_dest">>, 0, 0, 0],
148+
[<<"local_to_local_multiple_all_dest_down_dest2">>, 0, 0, 0],
149+
[<<"local_to_local_multiple_all_dest_down_src">>, 10, _, _]],
150+
list_queue_messages(Config),
151+
30000),
152+
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
153+
?awaitMatch([[<<"local_to_local_multiple_all_dest_down_dest">>, 10, 10, 0],
154+
[<<"local_to_local_multiple_all_dest_down_dest2">>, 10, 10, 0],
155+
[<<"local_to_local_multiple_all_dest_down_src">>, 0, 0, 0]],
156+
list_queue_messages(Config),
157+
30000),
158+
expect_many(Sess, Dest, 10)
159+
end).
160+
161+
local_to_local_multiple_some_dest_down(Config) ->
162+
Src = ?config(srcq, Config),
163+
Dest = ?config(destq, Config),
164+
Dest2 = ?config(destq2, Config),
165+
declare_queue(Config, 0, <<"/">>, Src),
166+
%% Declare each destination queue in a different node. Just one of
167+
%% them will be down, but this still means the message can't be confirmed
168+
%% and should be requeued.
169+
declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest),
170+
declare_and_bind_queue(Config, 2, <<"/">>, <<"amq.fanout">>, Dest2, Dest2),
171+
with_session(
172+
Config,
173+
fun (Sess) ->
174+
shovel_test_utils:set_param(Config, ?PARAM,
175+
[{<<"src-protocol">>, <<"local">>},
176+
{<<"src-queue">>, Src},
177+
{<<"dest-protocol">>, <<"local">>},
178+
{<<"dest-exchange">>, <<"amq.fanout">>},
179+
{<<"dest-exchange-key">>, <<"">>}
180+
]),
181+
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
182+
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
183+
%% Messages won't be confirmed to source until all destination
184+
%% queues are able to confirm them, until them we keep retrying
185+
%% This generates multiple duplicates, but that's how publishing
186+
%% works.
187+
?awaitMatch([[<<"local_to_local_multiple_some_dest_down_dest">>, 0, 0, 0],
188+
[<<"local_to_local_multiple_some_dest_down_dest2">>, M, M, 0],
189+
[<<"local_to_local_multiple_some_dest_down_src">>, 10, _, _]]
190+
when (M > 10),
143191
list_queue_messages(Config),
144192
30000),
145193
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
146-
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, N, N, 0],
147-
[<<"local_to_local_multiple_dest_down_dest2">>, M, M, 0],
148-
[<<"local_to_local_multiple_dest_down_src">>, 0, 0, 0]]
149-
when ((N >= 10) and (M >= 10)),
194+
?awaitMatch([[<<"local_to_local_multiple_some_dest_down_dest">>, N, N, 0],
195+
[<<"local_to_local_multiple_some_dest_down_dest2">>, M, M, 0],
196+
[<<"local_to_local_multiple_some_dest_down_src">>, 0, 0, 0]]
197+
when ((N == 10) and (M >= 10)),
150198
list_queue_messages(Config),
151199
30000),
152200
expect_many(Sess, Dest, 10)

0 commit comments

Comments
 (0)