Skip to content
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c26edbe
Implement rabbitmq-queues leader_health_check command for quorum queues
Ayanda-D Jun 27, 2024
6cc03b0
Tests for rabbitmq-queues leader_health_check command
Ayanda-D Jun 27, 2024
76d66a1
Ensure calling ParentPID in leader health check execution and
Ayanda-D Jun 28, 2024
857e2a7
Extend core leader health check tests and update badrpc error handlin…
Ayanda-D Jun 28, 2024
6cf9339
Refactor leader_health_check command validators and ignore vhost arg
Ayanda-D Jun 28, 2024
96b8bce
Update leader_health_check_command description and banner
Ayanda-D Jul 3, 2024
239a69b
Improve output formatting for healthy leaders and support
Ayanda-D Jul 4, 2024
48ba3e1
Support global flag to run leader health check for
Ayanda-D Jul 11, 2024
7873737
Return immediately for leader health checks on empty vhosts
Ayanda-D Jul 18, 2024
b7dec89
Rename leader health check timeout refs
Ayanda-D Jul 18, 2024
c7da4d5
Update banner message for global leader health check
Ayanda-D Aug 2, 2024
1736845
QQ leader-health-check: check_process_limit_safety before spawning le…
Ayanda-D Nov 11, 2024
1084179
Log leader health check result in broker logs (if any leaderless queues)
Ayanda-D Jan 10, 2025
68739a6
Ensure check_passed result for leader health internal calls)
Ayanda-D Jan 13, 2025
5f5e992
Extend CLI format output to process check_passed payload
Ayanda-D Jan 23, 2025
ebffd7d
Format leader healthcheck result log and function exports
Ayanda-D Feb 21, 2025
663fc98
Change leader_health_check command scope from queues to diagnostics
Ayanda-D Feb 26, 2025
df82f12
Update (c) line year
Ayanda-D Feb 26, 2025
b2acbae
Rename command to check_for_quorum_queues_without_an_elected_leader
Ayanda-D Feb 26, 2025
7a8e166
Use rabbit_db_queue for qq leader health check lookups
Ayanda-D Feb 26, 2025
9bdb81f
Update tests: quorum_queue_SUITE and rabbit_db_queue_SUITE
Ayanda-D Feb 26, 2025
6158568
Fix typo (cli test module)
Ayanda-D Feb 27, 2025
ea07938
Small refactor - simpler final leader health check result return on f…
Ayanda-D Feb 28, 2025
a45aa81
Clear dialyzer warning & fix type spec
Ayanda-D Mar 3, 2025
bb43c0b
Ignore result without strict match to avoid diayzer warning
Ayanda-D Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions deps/rabbit/src/amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
set_immutable/1,
qnode/1,
to_printable/1,
to_printable/2,
macros/0]).

-define(record_version, amqqueue_v2).
Expand Down Expand Up @@ -564,6 +565,14 @@ to_printable(#amqqueue{name = QName = #resource{name = Name},
<<"virtual_host">> => VHost,
<<"type">> => Type}.

-spec to_printable(rabbit_types:r(), atom() | binary()) -> #{binary() => any()}.
to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
_ = rabbit_queue_type:discover(Type),
#{<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QName)),
<<"name">> => Name,
<<"virtual_host">> => VHost,
<<"type">> => Type}.

% private

macros() ->
Expand Down
78 changes: 78 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
file_handle_other_reservation/0,
file_handle_release_reservation/0]).

-export([leader_health_check/2,
run_leader_health_check/4]).

-ifdef(TEST).
-export([filter_promotable/2,
ra_machine_config/1]).
Expand Down Expand Up @@ -144,6 +147,8 @@
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384
-define(LEADER_HEALTH_CHECK_TIMEOUT, 1_000).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeouts lower than 5s are guaranteed to result in false positives.

-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).

%%----------- QQ policies ---------------------------------------------------

Expand Down Expand Up @@ -2145,3 +2150,76 @@ file_handle_other_reservation() ->
file_handle_release_reservation() ->
ok.

leader_health_check(QueueNameOrRegEx, VHost) ->
%% Set a process limit threshold to 40% of ErlangVM process limit, beyond which
%% we cannot spawn any new processes for executing QQ leader health checks.
ProcessLimitThreshold = round(0.4 * erlang:system_info(process_limit)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

40% sounds like a lot for a health check. I'd use 20% at most.


leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold).

leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold) ->
Qs =
case VHost of
global ->
rabbit_amqqueue:list();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modern modules for working with schema data stores is rabbit_db_queue. It is aware of the metadata store used.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also provides functions such as get_all_by_type/1.

VHost when is_binary(VHost) ->
rabbit_amqqueue:list(VHost)
end,
check_process_limit_safety(length(Qs), ProcessLimitThreshold),
ParentPID = self(),
HealthCheckRef = make_ref(),
HealthCheckPids =
lists:flatten(
[begin
{resource, _VHostN, queue, QueueName} = QResource = amqqueue:get_name(Q),
case re:run(QueueName, QueueNameOrRegEx, [{capture, none}]) of
match ->
{ClusterName, _} = rabbit_amqqueue:pid_of(Q),
_Pid = spawn(fun() -> run_leader_health_check(ClusterName, QResource, HealthCheckRef, ParentPID) end);
_ ->
[]
end
end || Q <- Qs, amqqueue:get_type(Q) == ?MODULE]),
Result = wait_for_leader_health_checks(HealthCheckRef, length(HealthCheckPids), []),
_ = spawn(fun() -> maybe_log_leader_health_check_result(Result) end),
Result.

run_leader_health_check(ClusterName, QResource, HealthCheckRef, From) ->
Leader = ra_leaderboard:lookup_leader(ClusterName),
case ra_server_proc:ping(Leader, ?LEADER_HEALTH_CHECK_TIMEOUT) of
{pong,leader} ->
From ! {ok, HealthCheckRef, QResource};
_ ->
From ! {error, HealthCheckRef, QResource}
end,
ok.

wait_for_leader_health_checks(_Ref, 0, UnhealthyAcc = []) -> UnhealthyAcc;
wait_for_leader_health_checks(Ref, N, UnhealthyAcc) ->
receive
{ok, Ref, _QResource} when N == 1 ->
UnhealthyAcc;
{error, Ref, QResource} when N == 1 ->
[amqqueue:to_printable(QResource, ?MODULE) | UnhealthyAcc];
{ok, Ref, _QResource} ->
wait_for_leader_health_checks(Ref, N - 1, UnhealthyAcc);
{error, Ref, QResource} ->
wait_for_leader_health_checks(Ref, N - 1, [amqqueue:to_printable(QResource, ?MODULE) | UnhealthyAcc])
after
?GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT ->
UnhealthyAcc
end.

check_process_limit_safety(QCount, ProcessLimitThreshold) ->
case (erlang:system_info(process_count) + QCount) >= ProcessLimitThreshold of
true ->
rabbit_log:warning("Leader health check not permitted, process limit threshold will be exceeded."),
throw({error, leader_health_check_process_limit_exceeded});
false ->
ok
end.

maybe_log_leader_health_check_result([]) -> ok;
maybe_log_leader_health_check_result(Result) ->
Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result),
rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]).
131 changes: 130 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ all_tests() ->
priority_queue_2_1_ratio,
requeue_multiple_true,
requeue_multiple_false,
subscribe_from_each
subscribe_from_each,
leader_health_check
].

memory_tests() ->
Expand Down Expand Up @@ -4145,6 +4146,129 @@ amqpl_headers(Config) ->
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}).

leader_health_check(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,

set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),

%% check empty vhost
?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1])),
?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, global])),

Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1),
{ok, Ch1} = amqp_connection:open_channel(Conn1),

Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost2),
{ok, Ch2} = amqp_connection:open_channel(Conn2),

Qs1 = [<<"Q.1">>, <<"Q.2">>, <<"Q.3">>],
Qs2 = [<<"Q.4">>, <<"Q.5">>, <<"Q.6">>],

%% in vhost1
[?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
|| Q <- Qs1],

%% in vhost2
[?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
|| Q <- Qs2],

%% test sucessful health checks in vhost1, vhost2, global
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost1])),
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[Q, VHost1])) || Q <- Qs1],

?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost2])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost2])),
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[Q, VHost2])) || Q <- Qs2],

?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, global])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, global])),

%% clear leaderboard
Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),

[{_Q1_ClusterName, _Q1Res},
{_Q2_ClusterName, _Q2Res},
{_Q3_ClusterName, _Q3Res},
{_Q4_ClusterName, _Q4Res},
{_Q5_ClusterName, _Q5Res},
{_Q6_ClusterName, _Q6Res}] = QQ_Clusters =
lists:usort(
[begin
{ClusterName, _} = amqqueue:get_pid(Q),
{ClusterName, amqqueue:get_name(Q)}
end
|| Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue]),

[Q1Data, Q2Data, Q3Data, Q4Data, Q5Data, Q6Data] = QQ_Data =
[begin
rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q_ClusterName]),
_QData = amqqueue:to_printable(Q_Res, rabbit_quorum_queue)
end
|| {Q_ClusterName, Q_Res} <- QQ_Clusters],

%% test failed health checks in vhost1, vhost2, global
?assertEqual([Q1Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.1">>, VHost1])),
?assertEqual([Q2Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.2">>, VHost1])),
?assertEqual([Q3Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.3">>, VHost1])),
?assertEqual([Q1Data, Q2Data, Q3Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1]))),
?assertEqual([Q1Data, Q2Data, Q3Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost1]))),

?assertEqual([Q4Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.4">>, VHost2])),
?assertEqual([Q5Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.5">>, VHost2])),
?assertEqual([Q6Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.6">>, VHost2])),
?assertEqual([Q4Data, Q5Data, Q6Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost2]))),
?assertEqual([Q4Data, Q5Data, Q6Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost2]))),

?assertEqual(QQ_Data,
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, global]))),
?assertEqual(QQ_Data,
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, global]))),

%% cleanup
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
|| Q <- Qs1],
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
|| Q <- Qs2],

amqp_connection:close(Conn1),
amqp_connection:close(Conn2).


leader_locator_client_local(Config) ->
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Q = ?config(queue_name, Config),
Expand Down Expand Up @@ -4465,6 +4589,11 @@ declare_passive(Ch, Q, Args) ->
auto_delete = false,
passive = true,
arguments = Args}).

set_up_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:add_vhost(Config, VHost),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost).

assert_queue_type(Server, Q, Expected) ->
assert_queue_type(Server, <<"/">>, Q, Expected).

Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_cli/lib/rabbitmq/cli/core/output.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ defmodule RabbitMQ.CLI.Core.Output do
:ok
end

def format_output({:ok, :check_passed, output}, formatter, options) do
{:ok, formatter.format_output(output, options)}
end

def format_output({:ok, output}, formatter, options) do
{:ok, formatter.format_output(output, options)}
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 VMware, Inc. or its affiliates. All rights reserved.

defmodule RabbitMQ.CLI.Diagnostics.Commands.LeaderHealthCheckCommand do
alias RabbitMQ.CLI.Core.DocGuide

@behaviour RabbitMQ.CLI.CommandBehaviour

import RabbitMQ.CLI.Core.Platform, only: [line_separator: 0]

def switches(), do: [global: :boolean]

def scopes(), do: [:diagnostics]

def merge_defaults(args, opts) do
{args, Map.merge(%{global: false, vhost: "/"}, opts)}
end

use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning

def run([pattern] = _args, %{node: node_name, vhost: vhost, global: global_opt}) do
vhost = if global_opt, do: :global, else: vhost

case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :leader_health_check, [pattern, vhost]) do
[] ->
:ok

unhealthy_queues_or_error ->
{:error, unhealthy_queues_or_error}
end
end

def output(:ok, %{node: node_name, formatter: "json"}) do
{:ok,
%{
"result" => "ok",
"message" =>
"Node #{node_name} reported all quorum queue leaders as healthy"
}}
end

def output(:ok, %{silent: true}) do
{:ok, :check_passed}
end

def output(:ok, %{node: node_name}) do
{:ok, "Node #{node_name} reported all quorum queue leaders as healthy"}
end

def output({:error, unhealthy_queues}, %{node: node_name, formatter: "json"}) when is_list(unhealthy_queues) do
{:ok, :check_passed,
%{
"result" => "error",
"queues" => unhealthy_queues,
"message" => "Node #{node_name} reported unhealthy quorum queue leaders"
}}
end

def output({:error, unhealthy_queues}, %{silent: true}) when is_list(unhealthy_queues) do
{:ok, :check_passed}
end

def output({:error, unhealthy_queues}, %{vhost: _vhost}) when is_list(unhealthy_queues) do
lines = queue_lines(unhealthy_queues)

{:ok, :check_passed, Enum.join(lines, line_separator())}
end

def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable

def usage() do
"leader_health_check [--vhost <vhost>] [--global] <pattern>"
end

def usage_additional do
[
["<pattern>", "regular expression pattern used to match quorum queues"],
["--global", "run leader health check for all queues in all virtual hosts on the node"]
]
end

def help_section(), do: :observability_and_health_checks

def usage_doc_guides() do
[
DocGuide.quorum_queues()
]
end

def description(), do: "Checks availability and health status of quorum queue leaders"

def banner([name], %{global: true}),
do: "Checking availability and health status of leaders for quorum queues matching #{name} in all vhosts ..."

def banner([name], %{vhost: vhost}),
do: "Checking availability and health status of leaders for quorum queues matching #{name} in vhost #{vhost} ..."

def queue_lines(qs) do
for q <- qs, do: "Leader for #{q["readable_name"]} is unhealthy and unavailable"
end
end
Loading
Loading