Skip to content

Commit 1c72316

Browse files
committed
Local shovels: Handle multiple rejects on node down
1 parent 0174f59 commit 1c72316

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ handle_dest({{'DOWN', #resource{kind = queue,
325325
{ok, QState1, Actions} ->
326326
State1 = State0#{dest => Dest#{current => Current#{queue_states => QState1}}},
327327
handle_dest_queue_actions(Actions, State1);
328-
{eol, QState1, QRef} ->
328+
{eol, QState1, _QRef} ->
329329
State0#{dest => Dest#{current => Current#{queue_states => QState1}}}
330330
end;
331331
handle_dest(_Msg, State) ->
@@ -652,11 +652,15 @@ confirm_to_inbound(ConfirmFun, SeqNos, State)
652652
end, State, SeqNos);
653653
confirm_to_inbound(ConfirmFun, Seq,
654654
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
655-
#{Seq := InTag} = Unacked,
656-
Unacked1 = maps:remove(Seq, Unacked),
657-
State = rabbit_shovel_behaviour:decr_remaining(
658-
1, State0#{dest => Dst#{unacked => Unacked1}}),
659-
ConfirmFun(InTag, State).
655+
case Unacked of
656+
#{Seq := InTag} ->
657+
Unacked1 = maps:remove(Seq, Unacked),
658+
State = rabbit_shovel_behaviour:decr_remaining(
659+
1, State0#{dest => Dst#{unacked => Unacked1}}),
660+
ConfirmFun(InTag, State);
661+
_ ->
662+
State0
663+
end.
660664

661665
sent_delivery(#{source := #{delivery_count := DeliveryCount0,
662666
credit := Credit0} = Src

deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ all() ->
2626
groups() ->
2727
[
2828
{tests, [], [
29-
local_to_local_dest_down
29+
local_to_local_dest_down,
30+
local_to_local_multiple_dest_down
3031
]}
3132
].
3233

@@ -105,7 +106,7 @@ local_to_local_dest_down(Config) ->
105106
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
106107
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
107108
?awaitMatch([[<<"local_to_local_dest_down_dest">>, <<>>, <<>>, <<>>],
108-
[<<"local_to_local_dest_down_src">>, <<"10">>, <<"0">>, <<"10">>]],
109+
[<<"local_to_local_dest_down_src">>, <<"10">>, _, _]],
109110
list_queue_messages(Config),
110111
30000),
111112
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
@@ -116,6 +117,39 @@ local_to_local_dest_down(Config) ->
116117
expect_many(Sess, Dest, 10)
117118
end).
118119

120+
local_to_local_multiple_dest_down(Config) ->
121+
Src = ?config(srcq, Config),
122+
Dest = ?config(destq, Config),
123+
Dest2 = ?config(destq2, Config),
124+
declare_queue(Config, 0, <<"/">>, Src),
125+
declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest),
126+
declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest2, Dest2),
127+
with_session(
128+
Config,
129+
fun (Sess) ->
130+
shovel_test_utils:set_param(Config, ?PARAM,
131+
[{<<"src-protocol">>, <<"local">>},
132+
{<<"src-queue">>, Src},
133+
{<<"dest-protocol">>, <<"local">>},
134+
{<<"dest-exchange">>, <<"amq.fanout">>},
135+
{<<"dest-exchange-key">>, Dest}
136+
]),
137+
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
138+
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
139+
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, <<>>, <<>>, <<>>],
140+
[<<"local_to_local_multiple_dest_down_dest2">>, <<>>, <<>>, <<>>],
141+
[<<"local_to_local_multiple_dest_down_src">>, <<"10">>, _, _]],
142+
list_queue_messages(Config),
143+
30000),
144+
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
145+
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, <<"10">>, <<"10">>, <<"0">>],
146+
[<<"local_to_local_multiple_dest_down_dest2">>, <<"10">>, <<"10">>, <<"0">>],
147+
[<<"local_to_local_multiple_dest_down_src">>, <<"0">>, <<"0">>, <<"0">>]],
148+
list_queue_messages(Config),
149+
30000),
150+
expect_many(Sess, Dest, 10)
151+
end).
152+
119153
%%----------------------------------------------------------------------------
120154
list_queue_messages(Config) ->
121155
lists:sort(
@@ -211,8 +245,8 @@ declare_queue(Config, Node, VHost, QName, Args) ->
211245
rabbit_ct_client_helpers:close_channel(Ch),
212246
rabbit_ct_client_helpers:close_connection(Conn).
213247

214-
declare_and_bind_queue(Config, VHost, Exchange, QName, RoutingKey) ->
215-
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
248+
declare_and_bind_queue(Config, Node, VHost, Exchange, QName, RoutingKey) ->
249+
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, VHost),
216250
{ok, Ch} = amqp_connection:open_channel(Conn),
217251
?assertEqual(
218252
{'queue.declare_ok', QName, 0, 0},

0 commit comments

Comments
 (0)