Skip to content

Commit 0174f59

Browse files
committed
Local shovel: handle unacked messages in queue down
It affects messages routed through exchanges, when there is no target queue configured in the shovel (it could be many queues as destination or none).
1 parent bbb9aea commit 0174f59

File tree

2 files changed

+267
-1
lines changed

2 files changed

+267
-1
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,14 +317,25 @@ handle_dest({{'DOWN', #resource{name = Queue,
317317
virtual_host = VHost}}, _, _, _, _} ,
318318
#{dest := #{queue := Queue, current := #{vhost := VHost}}}) ->
319319
{stop, {outbound_link_or_channel_closure, dest_queue_down}};
320+
handle_dest({{'DOWN', #resource{kind = queue,
321+
virtual_host = VHost} = QName}, _MRef, process, QPid, Reason},
322+
#{dest := Dest = #{current := Current = #{vhost := VHost,
323+
queue_states := QStates0}}} = State0) ->
324+
case rabbit_queue_type:handle_down(QPid, QName, Reason, QStates0) of
325+
{ok, QState1, Actions} ->
326+
State1 = State0#{dest => Dest#{current => Current#{queue_states => QState1}}},
327+
handle_dest_queue_actions(Actions, State1);
328+
{eol, QState1, QRef} ->
329+
State0#{dest => Dest#{current => Current#{queue_states => QState1}}}
330+
end;
320331
handle_dest(_Msg, State) ->
321332
State.
322333

323334
ack(DeliveryTag, Multiple, State) ->
324335
maybe_grant_credit(settle(complete, DeliveryTag, Multiple, State)).
325336

326337
nack(DeliveryTag, Multiple, State) ->
327-
maybe_grant_credit(settle(discard, DeliveryTag, Multiple, State)).
338+
maybe_grant_credit(settle(requeue, DeliveryTag, Multiple, State)).
328339

329340
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
330341
unacked := Unacked} = Dest,
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(local_dynamic_cluster_SUITE).
9+
10+
-include_lib("amqp_client/include/amqp_client.hrl").
11+
-include_lib("common_test/include/ct.hrl").
12+
-include_lib("eunit/include/eunit.hrl").
13+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
14+
15+
-compile(export_all).
16+
17+
-import(shovel_test_utils, [await_amqp10_event/3, await_credit/1]).
18+
19+
-define(PARAM, <<"test">>).
20+
21+
all() ->
22+
[
23+
{group, tests}
24+
].
25+
26+
groups() ->
27+
[
28+
{tests, [], [
29+
local_to_local_dest_down
30+
]}
31+
].
32+
33+
%% -------------------------------------------------------------------
34+
%% Testsuite setup/teardown.
35+
%% -------------------------------------------------------------------
36+
37+
init_per_suite(Config0) ->
38+
{ok, _} = application:ensure_all_started(amqp10_client),
39+
rabbit_ct_helpers:log_environment(),
40+
Config1 = rabbit_ct_helpers:set_config(Config0, [
41+
{rmq_nodename_suffix, ?MODULE},
42+
{rmq_nodes_count, 2},
43+
{rmq_nodes_clustered, true},
44+
{ignored_crashes, [
45+
"server_initiated_close,404",
46+
"writer,send_failed,closed",
47+
"source_queue_down",
48+
"dest_queue_down"
49+
]}
50+
]),
51+
rabbit_ct_helpers:run_setup_steps(Config1,
52+
rabbit_ct_broker_helpers:setup_steps() ++
53+
rabbit_ct_client_helpers:setup_steps()).
54+
55+
end_per_suite(Config) ->
56+
application:stop(amqp10_client),
57+
rabbit_ct_helpers:run_teardown_steps(Config,
58+
rabbit_ct_client_helpers:teardown_steps() ++
59+
rabbit_ct_broker_helpers:teardown_steps()).
60+
61+
init_per_group(_, Config) ->
62+
[Node | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
63+
ok = rabbit_ct_broker_helpers:enable_feature_flag(
64+
Config, [Node], 'rabbitmq_4.0.0'),
65+
Config.
66+
67+
end_per_group(_, Config) ->
68+
Config.
69+
70+
init_per_testcase(Testcase, Config0) ->
71+
SrcQ = list_to_binary(atom_to_list(Testcase) ++ "_src"),
72+
DestQ = list_to_binary(atom_to_list(Testcase) ++ "_dest"),
73+
DestQ2 = list_to_binary(atom_to_list(Testcase) ++ "_dest2"),
74+
VHost = list_to_binary(atom_to_list(Testcase) ++ "_vhost"),
75+
Config = [{srcq, SrcQ}, {destq, DestQ}, {destq2, DestQ2},
76+
{alt_vhost, VHost} | Config0],
77+
78+
rabbit_ct_helpers:testcase_started(Config, Testcase).
79+
80+
end_per_testcase(Testcase, Config) ->
81+
shovel_test_utils:clear_param(Config, ?PARAM),
82+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []),
83+
_ = rabbit_ct_broker_helpers:delete_vhost(Config, ?config(alt_vhost, Config)),
84+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
85+
86+
%% -------------------------------------------------------------------
87+
%% Testcases.
88+
%% -------------------------------------------------------------------
89+
90+
local_to_local_dest_down(Config) ->
91+
Src = ?config(srcq, Config),
92+
Dest = ?config(destq, Config),
93+
declare_queue(Config, 0, <<"/">>, Src),
94+
declare_queue(Config, 1, <<"/">>, Dest),
95+
with_session(
96+
Config,
97+
fun (Sess) ->
98+
shovel_test_utils:set_param(Config, ?PARAM,
99+
[{<<"src-protocol">>, <<"local">>},
100+
{<<"src-queue">>, Src},
101+
{<<"dest-protocol">>, <<"local">>},
102+
{<<"dest-exchange">>, <<>>},
103+
{<<"dest-exchange-key">>, Dest}
104+
]),
105+
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
106+
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
107+
?awaitMatch([[<<"local_to_local_dest_down_dest">>, <<>>, <<>>, <<>>],
108+
[<<"local_to_local_dest_down_src">>, <<"10">>, <<"0">>, <<"10">>]],
109+
list_queue_messages(Config),
110+
30000),
111+
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
112+
?awaitMatch([[<<"local_to_local_dest_down_dest">>, <<"10">>, <<"10">>, <<"0">>],
113+
[<<"local_to_local_dest_down_src">>, <<"0">>, <<"0">>, <<"0">>]],
114+
list_queue_messages(Config),
115+
30000),
116+
expect_many(Sess, Dest, 10)
117+
end).
118+
119+
%%----------------------------------------------------------------------------
120+
list_queue_messages(Config) ->
121+
lists:sort(
122+
rabbit_ct_broker_helpers:rabbitmqctl_list(
123+
Config, 0,
124+
["list_queues", "name", "messages", "messages_ready", "messages_unacknowledged", "--no-table-headers"])).
125+
126+
with_session(Config, Fun) ->
127+
with_session(Config, <<"/">>, Fun).
128+
129+
with_session(Config, VHost, Fun) ->
130+
Hostname = ?config(rmq_hostname, Config),
131+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
132+
Cfg = #{address => Hostname,
133+
port => Port,
134+
sasl => {plain, <<"guest">>, <<"guest">>},
135+
hostname => <<"vhost:", VHost/binary>>},
136+
{ok, Conn} = amqp10_client:open_connection(Cfg),
137+
{ok, Sess} = amqp10_client:begin_session(Conn),
138+
Fun(Sess),
139+
amqp10_client:close_connection(Conn),
140+
ok.
141+
142+
publish(Sender, Tag, Payload) when is_binary(Payload) ->
143+
Headers = #{durable => true},
144+
Msg = amqp10_msg:set_headers(Headers,
145+
amqp10_msg:new(Tag, Payload, false)),
146+
%% N.B.: this function does not attach a link and does not
147+
%% need to use await_credit/1
148+
ok = amqp10_client:send_msg(Sender, Msg),
149+
receive
150+
{amqp10_disposition, {accepted, Tag}} -> ok
151+
after 3000 ->
152+
exit(publish_disposition_not_received)
153+
end.
154+
155+
publish(Session, Source, Dest, Tag, Payloads) ->
156+
LinkName = <<"dynamic-sender-", Dest/binary>>,
157+
{ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source,
158+
unsettled, unsettled_state),
159+
ok = await_amqp10_event(link, Sender, attached),
160+
ok = await_credit(Sender),
161+
case is_list(Payloads) of
162+
true ->
163+
[publish(Sender, Tag, Payload) || Payload <- Payloads];
164+
false ->
165+
publish(Sender, Tag, Payloads)
166+
end,
167+
amqp10_client:detach_link(Sender).
168+
169+
publish_many(Session, Source, Dest, Tag, N) ->
170+
Payloads = [integer_to_binary(Payload) || Payload <- lists:seq(1, N)],
171+
publish(Session, Source, Dest, Tag, Payloads).
172+
173+
expect_many(Session, Dest, N) ->
174+
LinkName = <<"dynamic-receiver-", Dest/binary>>,
175+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
176+
Dest, settled,
177+
unsettled_state),
178+
ok = amqp10_client:flow_link_credit(Receiver, 10, 1),
179+
Msgs = expect(Receiver, N, []),
180+
amqp10_client:detach_link(Receiver),
181+
Msgs.
182+
183+
expect(_, 0, Acc) ->
184+
Acc;
185+
expect(Receiver, N, Acc) ->
186+
receive
187+
{amqp10_msg, Receiver, InMsg} ->
188+
expect(Receiver, N - 1, [amqp10_msg:body(InMsg) | Acc])
189+
after 4000 ->
190+
throw({timeout_in_expect_waiting_for_delivery, N, Acc})
191+
end.
192+
193+
expect(Receiver) ->
194+
receive
195+
{amqp10_msg, Receiver, InMsg} ->
196+
InMsg
197+
after 4000 ->
198+
throw(timeout_in_expect_waiting_for_delivery)
199+
end.
200+
201+
declare_queue(Config, Node, VHost, QName) ->
202+
declare_queue(Config, Node, VHost, QName, []).
203+
204+
declare_queue(Config, Node, VHost, QName, Args) ->
205+
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, VHost),
206+
{ok, Ch} = amqp_connection:open_channel(Conn),
207+
?assertEqual(
208+
{'queue.declare_ok', QName, 0, 0},
209+
amqp_channel:call(
210+
Ch, #'queue.declare'{queue = QName, durable = true, arguments = Args})),
211+
rabbit_ct_client_helpers:close_channel(Ch),
212+
rabbit_ct_client_helpers:close_connection(Conn).
213+
214+
declare_and_bind_queue(Config, VHost, Exchange, QName, RoutingKey) ->
215+
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
216+
{ok, Ch} = amqp_connection:open_channel(Conn),
217+
?assertEqual(
218+
{'queue.declare_ok', QName, 0, 0},
219+
amqp_channel:call(
220+
Ch, #'queue.declare'{queue = QName, durable = true,
221+
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]})),
222+
?assertMatch(
223+
#'queue.bind_ok'{},
224+
amqp_channel:call(Ch, #'queue.bind'{
225+
queue = QName,
226+
exchange = Exchange,
227+
routing_key = RoutingKey
228+
})),
229+
rabbit_ct_client_helpers:close_channel(Ch),
230+
rabbit_ct_client_helpers:close_connection(Conn).
231+
232+
declare_exchange(Config, VHost, Exchange) ->
233+
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
234+
{ok, Ch} = amqp_connection:open_channel(Conn),
235+
?assertMatch(
236+
#'exchange.declare_ok'{},
237+
amqp_channel:call(Ch, #'exchange.declare'{exchange = Exchange})),
238+
rabbit_ct_client_helpers:close_channel(Ch),
239+
rabbit_ct_client_helpers:close_connection(Conn).
240+
241+
delete_all_queues() ->
242+
Queues = rabbit_amqqueue:list(),
243+
lists:foreach(
244+
fun(Q) ->
245+
{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
246+
end, Queues).
247+
248+
delete_queue(Name, VHost) ->
249+
QName = rabbit_misc:r(VHost, queue, Name),
250+
case rabbit_amqqueue:lookup(QName) of
251+
{ok, Q} ->
252+
{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>);
253+
_ ->
254+
ok
255+
end.

0 commit comments

Comments
 (0)