Skip to content

Commit 24e2575

Browse files
committed
rabbitmq_shovel: Split 0-9-1 alarms tests into a new suite
This change also refactors them to use two unclustered nodes. This is a prerequisite for the child change which will remove the workaround of using a direct connection to be able to publish while a node is in alarm.
1 parent 1d1faff commit 24e2575

File tree

3 files changed

+273
-141
lines changed

3 files changed

+273
-141
lines changed
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(amqp091_alarm_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("amqp_client/include/amqp_client.hrl").
13+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
14+
15+
-compile([export_all, nowarn_export_all]).
16+
17+
all() ->
18+
[
19+
{group, all_tests}
20+
].
21+
22+
groups() ->
23+
[
24+
{all_tests, [], all_tests()}
25+
].
26+
27+
all_tests() ->
28+
[
29+
dest_resource_alarm_on_confirm,
30+
dest_resource_alarm_on_publish,
31+
dest_resource_alarm_no_ack
32+
].
33+
34+
%% -------------------------------------------------------------------
35+
%% Testsuite setup/teardown.
36+
%% -------------------------------------------------------------------
37+
38+
init_per_suite(Config) ->
39+
rabbit_ct_helpers:log_environment(),
40+
Config1 = rabbit_ct_helpers:set_config(Config, [
41+
{rmq_nodename_suffix, ?MODULE},
42+
{rmq_nodes_count, 2},
43+
{rmq_nodes_clustered, false}
44+
]),
45+
rabbit_ct_helpers:run_setup_steps(Config1,
46+
rabbit_ct_broker_helpers:setup_steps() ++
47+
rabbit_ct_client_helpers:setup_steps()).
48+
49+
end_per_suite(Config) ->
50+
rabbit_ct_helpers:run_teardown_steps(Config,
51+
rabbit_ct_client_helpers:teardown_steps() ++
52+
rabbit_ct_broker_helpers:teardown_steps()).
53+
54+
init_per_group(_, Config) ->
55+
rabbit_ct_helpers:set_config(
56+
Config,
57+
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
58+
{shovel_source_idx, 1},
59+
{shovel_dest_uri, shovel_test_utils:make_uri(Config, 0)},
60+
{shovel_dest_idx, 0}
61+
]).
62+
63+
end_per_group(_, Config) ->
64+
Config.
65+
66+
init_per_testcase(Testcase, Config) ->
67+
rabbit_ct_helpers:testcase_started(Config, Testcase).
68+
69+
end_per_testcase(Testcase, Config) ->
70+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
71+
72+
%% -------------------------------------------------------------------
73+
%% Testcases.
74+
%% -------------------------------------------------------------------
75+
76+
dest_resource_alarm_on_confirm(Config) ->
77+
dest_resource_alarm(<<"on-confirm">>, Config).
78+
79+
dest_resource_alarm_on_publish(Config) ->
80+
dest_resource_alarm(<<"on-publish">>, Config).
81+
82+
dest_resource_alarm_no_ack(Config) ->
83+
dest_resource_alarm(<<"no-ack">>, Config).
84+
85+
dest_resource_alarm(AckMode, Config) ->
86+
SourceUri = ?config(shovel_source_uri, Config),
87+
SourceIdx = ?config(shovel_source_idx, Config),
88+
DestUri = ?config(shovel_dest_uri, Config),
89+
DestIdx = ?config(shovel_dest_idx, Config),
90+
91+
{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(
92+
Config, SourceIdx),
93+
amqp_channel:call(Ch1, #'confirm.select'{}),
94+
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"src">>}),
95+
publish(Ch1, <<>>, <<"src">>, <<"hello">>),
96+
true = amqp_channel:wait_for_confirms(Ch1),
97+
#{messages := 1} = message_count(Config, SourceIdx, <<"src">>),
98+
while_blocked(Config, DestIdx,
99+
fun() ->
100+
ok = rabbit_ct_broker_helpers:rpc(
101+
Config, DestIdx,
102+
rabbit_runtime_parameters, set,
103+
[
104+
<<"/">>, <<"shovel">>, <<"test">>,
105+
[{<<"src-uri">>, SourceUri},
106+
{<<"dest-uri">>, [DestUri]},
107+
{<<"src-queue">>, <<"src">>},
108+
{<<"dest-queue">>, <<"dest">>},
109+
{<<"src-prefetch-count">>, 50},
110+
{<<"ack-mode">>, AckMode},
111+
{<<"src-delete-after">>, <<"never">>}], none]),
112+
%% The destination is blocked, so the shovel is blocked.
113+
?awaitMatch(
114+
blocked,
115+
shovel_test_utils:get_shovel_status(Config, DestIdx,
116+
<<"test">>),
117+
3_000),
118+
119+
%% The shoveled message triggered a
120+
%% connection.blocked notification, but hasn't
121+
%% reached the dest queue because of the resource
122+
%% alarm
123+
InitialMsgCnt =
124+
case AckMode of
125+
<<"on-confirm">> -> 1;
126+
_ -> 0
127+
end,
128+
#{messages := InitialMsgCnt,
129+
messages_unacknowledged := InitialMsgCnt
130+
} = message_count(Config, SourceIdx, <<"src">>),
131+
#{messages := 0} = message_count(Config, DestIdx, <<"dest">>),
132+
133+
%% Now publish more messages to "src" queue.
134+
publish_count(Ch1, <<>>, <<"src">>, <<"hello">>, 1000),
135+
true = amqp_channel:wait_for_confirms(Ch1),
136+
137+
%% No messages reached the dest queue
138+
#{messages := 0} = message_count(Config, DestIdx, <<"dest">>),
139+
140+
%% When the shovel sets a prefetch_count
141+
%% (on-confirm/on-publish mode), all messages are in
142+
%% the source queue, prefrech count are
143+
%% unacknowledged and buffered in the shovel
144+
MsgCnts =
145+
case AckMode of
146+
<<"on-confirm">> ->
147+
#{messages => 1001,
148+
messages_unacknowledged => 50};
149+
<<"on-publish">> ->
150+
#{messages => 1000,
151+
messages_unacknowledged => 50};
152+
<<"no-ack">> ->
153+
%% no prefetch limit, all messages are
154+
%% buffered in the shovel
155+
#{messages => 0,
156+
messages_unacknowledged => 0}
157+
end,
158+
159+
MsgCnts = message_count(Config, SourceIdx, <<"src">>),
160+
161+
%% There should be no process with a message buildup
162+
?awaitMatch(
163+
0,
164+
begin
165+
Top = [{_, P, _}] = rabbit_ct_broker_helpers:rpc(
166+
Config, 0, recon, proc_count, [message_queue_len, 1]),
167+
ct:pal("Top process by message queue length: ~p", [Top]),
168+
P
169+
end, 5_000),
170+
171+
ok
172+
end),
173+
174+
%% After the alarm clears, all messages should arrive in the dest queue.
175+
?awaitMatch(
176+
#{messages := 1001},
177+
message_count(Config, DestIdx, <<"dest">>),
178+
5_000),
179+
#{messages := 0} = message_count(Config, SourceIdx, <<"src">>),
180+
running = shovel_test_utils:get_shovel_status(Config, DestIdx, <<"test">>),
181+
182+
rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Ch1),
183+
cleanup(Config),
184+
ok.
185+
186+
%%----------------------------------------------------------------------------
187+
188+
conserve_resources(Pid, Source, {_, Conserve, AlarmedNode}) ->
189+
case Conserve of
190+
true ->
191+
ct:log("node ~w set alarm for resource ~ts",
192+
[AlarmedNode, Source]),
193+
Pid ! {block, Source};
194+
false ->
195+
ct:log("node ~w cleared alarm for resource ~ts",
196+
[AlarmedNode, Source]),
197+
Pid ! {unblock, Source}
198+
end,
199+
ok.
200+
201+
while_blocked(Config, Node, Fun) when is_function(Fun, 0) ->
202+
OrigLimit = rabbit_ct_broker_helpers:rpc(Config, Node,
203+
vm_memory_monitor,
204+
get_vm_memory_high_watermark, []),
205+
206+
ok = rabbit_ct_broker_helpers:add_code_path_to_node(
207+
rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename),
208+
?MODULE),
209+
[] = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_alarm, register,
210+
[self(),
211+
{?MODULE, conserve_resources, []}]),
212+
ok = rabbit_ct_broker_helpers:rpc(Config, Node, vm_memory_monitor,
213+
set_vm_memory_high_watermark, [0]),
214+
Source = receive
215+
{block, S} ->
216+
S
217+
after
218+
15_000 ->
219+
ct:fail(alarm_set_timeout)
220+
end,
221+
try
222+
Fun()
223+
after
224+
ok = rabbit_ct_broker_helpers:rpc(Config, Node, vm_memory_monitor,
225+
set_vm_memory_high_watermark,
226+
[OrigLimit]),
227+
receive
228+
{unblock, Source} ->
229+
ok
230+
after
231+
10_000 ->
232+
ct:fail(alarm_clear_timeout)
233+
end
234+
end.
235+
236+
publish(Ch, X, Key, Payload) when is_binary(Payload) ->
237+
publish(Ch, X, Key, #amqp_msg{payload = Payload});
238+
239+
publish(Ch, X, Key, Msg = #amqp_msg{}) ->
240+
amqp_channel:cast(Ch, #'basic.publish'{exchange = X,
241+
routing_key = Key}, Msg).
242+
243+
publish_count(Ch, X, Key, M, Count) ->
244+
[begin
245+
246+
publish(Ch, X, Key, M)
247+
end || _ <- lists:seq(1, Count)].
248+
249+
message_count(Config, Node, QueueName) ->
250+
Resource = rabbit_misc:r(<<"/">>, queue, QueueName),
251+
{ok, Q} = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue,
252+
lookup, [Resource]),
253+
maps:from_list(
254+
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, info,
255+
[Q, [messages, messages_unacknowledged]])).
256+
257+
cleanup(Config) ->
258+
rabbit_ct_broker_helpers:rpc_all(Config, ?MODULE, cleanup1, []).
259+
260+
cleanup1() ->
261+
[rabbit_runtime_parameters:clear(rabbit_misc:pget(vhost, P),
262+
rabbit_misc:pget(component, P),
263+
rabbit_misc:pget(name, P),
264+
<<"acting-user">>) ||
265+
P <- rabbit_runtime_parameters:list()],
266+
[rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>)
267+
|| Q <- rabbit_amqqueue:list()].

0 commit comments

Comments
 (0)