Skip to content

Commit 5aaf212

Browse files
authored
Merge pull request #6738 from rabbitmq/coordinator-quorum-check
Check if stream coordinator is quorum critial
2 parents 0f96f21 + 44ed4eb commit 5aaf212

File tree

4 files changed

+71
-13
lines changed

4 files changed

+71
-13
lines changed

deps/rabbit/src/rabbit_upgrade_preparation.erl

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,39 @@ await_online_synchronised_mirrors(Timeout) ->
3232
%% Implementation
3333
%%
3434

35+
online_members(Component) ->
36+
lists:filter(fun erlang:is_pid/1,
37+
rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
38+
erlang, whereis, [Component])).
39+
40+
endangered_critical_components() ->
41+
CriticalComponents = [rabbit_stream_coordinator],
42+
Nodes = rabbit_nodes:all(),
43+
lists:filter(fun (Component) ->
44+
NumAlive = length(online_members(Component)),
45+
ServerIds = lists:zip(lists:duplicate(length(Nodes), Component), Nodes),
46+
case ra:members(ServerIds) of
47+
{error, _E} ->
48+
%% we've asked all nodes about it; if we didn't get an answer,
49+
%% the component is probably not running at all
50+
%% (eg. rabbit_stream_coordinator is only started when the
51+
%% first straem is declared)
52+
false;
53+
{ok, Members, _Leader} ->
54+
NumAlive =< (length(Members) div 2) + 1
55+
end
56+
end,
57+
CriticalComponents).
58+
3559
do_await_safe_online_quorum(0) ->
3660
false;
3761
do_await_safe_online_quorum(IterationsLeft) ->
3862
EndangeredQueues = lists:append(
3963
rabbit_quorum_queue:list_with_minimum_quorum(),
4064
rabbit_stream_queue:list_with_minimum_quorum()),
41-
case EndangeredQueues of
42-
[] -> true;
43-
List when is_list(List) ->
65+
case EndangeredQueues =:= [] andalso endangered_critical_components() =:= [] of
66+
true -> true;
67+
false ->
4468
timer:sleep(?SAMPLING_INTERVAL),
4569
do_await_safe_online_quorum(IterationsLeft - 1)
4670
end.
@@ -69,4 +93,10 @@ list_with_minimum_quorum_for_cli() ->
6993
<<"virtual_host">> => amqqueue:get_vhost(Q),
7094
<<"type">> => amqqueue:get_type(Q)
7195
}
72-
end || Q <- EndangeredQueues].
96+
end || Q <- EndangeredQueues] ++
97+
[#{
98+
<<"readable_name">> => C,
99+
<<"name">> => C,
100+
<<"virtual_host">> => "-",
101+
<<"type">> => process
102+
} || C <- endangered_critical_components()].

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,9 @@ declare_queue(Config) ->
360360
%% Test declare an existing queue
361361
?assertEqual({'queue.declare_ok', Q, 0, 0},
362362
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
363-
363+
364364
?assertMatch([_], find_queue_info(Config, [])),
365-
365+
366366
%% Test declare an existing queue with different arguments
367367
?assertExit(_, declare(Ch, Q, [])),
368368
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@@ -1075,7 +1075,7 @@ consume_and_nack(Config) ->
10751075
multiple = false,
10761076
requeue = true}),
10771077
%% Nack will throw a not implemented exception. As it is a cast operation,
1078-
%% we'll detect the conneciton/channel closure on the next call.
1078+
%% we'll detect the conneciton/channel closure on the next call.
10791079
%% Let's try to redeclare and see what happens
10801080
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
10811081
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]))
@@ -1299,7 +1299,7 @@ consume_and_reject(Config) ->
12991299
ok = amqp_channel:cast(Ch1, #'basic.reject'{delivery_tag = DeliveryTag,
13001300
requeue = true}),
13011301
%% Reject will throw a not implemented exception. As it is a cast operation,
1302-
%% we'll detect the conneciton/channel closure on the next call.
1302+
%% we'll detect the conneciton/channel closure on the next call.
13031303
%% Let's try to redeclare and see what happens
13041304
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
13051305
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]))
@@ -1710,7 +1710,12 @@ max_length_bytes(Config) ->
17101710

17111711
Payload = << <<"1">> || _ <- lists:seq(1, 100) >>,
17121712

1713-
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 500)]), %% 100 bytes/msg * 500 = 50000 bytes
1713+
%% 100 bytes/msg * 500 = 50000 bytes
1714+
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 100)]),
1715+
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 100)]),
1716+
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 100)]),
1717+
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 100)]),
1718+
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 100)]),
17141719
ensure_retention_applied(Config, Server),
17151720

17161721
%% We don't yet have reliable metrics, as the committed offset doesn't work

deps/rabbit/test/upgrade_preparation_SUITE.erl

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ groups() ->
2626
]},
2727
{stream, [], [
2828
await_quorum_plus_one_stream
29+
]},
30+
{stream_coordinator, [], [
31+
await_quorum_plus_one_stream_coordinator
2932
]}
3033
].
3134

@@ -107,6 +110,23 @@ await_quorum_plus_one_stream(Config) ->
107110
ok = rabbit_ct_broker_helpers:start_node(Config, B),
108111
?assert(await_quorum_plus_one(Config, 0)).
109112

113+
await_quorum_plus_one_stream_coordinator(Config) ->
114+
catch delete_queues(),
115+
[A, B, _C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
116+
Ch = rabbit_ct_client_helpers:open_channel(Config, A),
117+
declare(Ch, <<"st.1">>, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
118+
timer:sleep(100),
119+
?assert(await_quorum_plus_one(Config, 0)),
120+
delete(Ch, <<"st.1">>),
121+
%% no queues/streams beyond this point
122+
123+
ok = rabbit_ct_broker_helpers:stop_node(Config, B),
124+
%% this should fail because the corrdinator has only 2 running nodes
125+
?assertNot(await_quorum_plus_one(Config, 0)),
126+
127+
ok = rabbit_ct_broker_helpers:start_node(Config, B),
128+
?assert(await_quorum_plus_one(Config, 0)).
129+
110130
%%
111131
%% Implementation
112132
%%
@@ -120,6 +140,9 @@ declare(Ch, Q, Args) ->
120140
auto_delete = false,
121141
arguments = Args}).
122142

143+
delete(Ch, Q) ->
144+
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
145+
123146
delete_queues() ->
124147
[rabbit_amqqueue:delete(Q, false, false, <<"tests">>) || Q <- rabbit_amqqueue:list()].
125148

deps/rabbitmq_cli/lib/rabbitmq/cli/upgrade/commands/await_online_quorum_plus_one_command.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ defmodule RabbitMQ.CLI.Upgrade.Commands.AwaitOnlineQuorumPlusOneCommand do
5757

5858
false ->
5959
{:error,
60-
"time is up, no quorum + 1 online replicas came online for at least some quorum queues"}
60+
"time is up, no quorum + 1 online replicas came online for at least some quorum queues or streams"}
6161
end
6262

6363
other ->
@@ -103,11 +103,11 @@ defmodule RabbitMQ.CLI.Upgrade.Commands.AwaitOnlineQuorumPlusOneCommand do
103103
def help_section, do: :upgrade
104104

105105
def description() do
106-
"Waits for all quorum queues to have an above minimum online quorum. " <>
107-
"This makes sure that no queues would lose their quorum if the target node is shut down"
106+
"Waits for all quorum queues and streams to have an above minimum online quorum. " <>
107+
"This makes sure that no queues/streams would lose their quorum if the target node is shut down"
108108
end
109109

110110
def banner([], %{timeout: timeout}) do
111-
"Will wait for a quorum + 1 of nodes to be online for all quorum queues for #{round(timeout / 1000)} seconds..."
111+
"Will wait for a quorum + 1 of nodes to be online for all quorum queues and streams for #{round(timeout / 1000)} seconds..."
112112
end
113113
end

0 commit comments

Comments
 (0)