Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
force_vhost_queues_shrink_member_to_current_member/1,
force_all_queues_shrink_member_to_current_member/0]).

-export([force_checkpoint/2, force_checkpoint_on_queue/1]).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put each export on a new line.


%% for backwards compatibility
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0,
Expand Down Expand Up @@ -2084,6 +2086,39 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
rabbit_log:warning("Shrinking finished"),
ok.

force_checkpoint_on_queue(QName) ->
Node = node(),
QNameFmt = rabbit_misc:rs(QName),
case rabbit_amqqueue:lookup(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a call with an implicit timeout (most likely of 5s). Timeouts lower than 15s are very likely to cause false positives sooner or later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look to see how to invoke the aux command with a specified timeout of 15s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not able to find a way to increase the implicit timeout, would appreciate any pointers here :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also rabbit_misc:rpc_call/5 but I doubt it is very relevant on Erlang 26+.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, by "implicit timeout" I misunderstood and thought you were mentioning a timeout to do within ra and not rpc:call/5.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I was referring to the typical OTP timeout when it comes to rpc calls.

rabbit_log:debug("Sent command to force checkpoint ~ts", [QNameFmt]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"to force ~ts to take a checkpoint "

{ok, _Q} ->
{error, not_quorum_queue};
{error, _} = E ->
E
end.

force_checkpoint(VhostSpec, QueueSpec) ->
[begin
QName = amqqueue:get_name(Q),
case force_checkpoint_on_queue(QName) of
ok ->
{QName, {ok}};
{error, Err} ->
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
[rabbit_misc:rs(QName), Err]),
{QName, {error, Err}}
end
end
|| Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
is_match(amqqueue:get_vhost(Q), VhostSpec)
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].

is_minority(All, Up) ->
MinQuorum = length(All) div 2 + 1,
length(Up) < MinQuorum.
Expand Down
76 changes: 76 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ groups() ->
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
force_checkpoint_on_queue,
force_checkpoint,
policy_repair,
gh_12635,
replica_states
Expand Down Expand Up @@ -1308,6 +1310,80 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].

force_checkpoint_on_queue(Config) ->
[Server0, _Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
QName = rabbit_misc:r(<<"/">>, queue, QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),

% Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet.
rabbit_ct_helpers:await_condition(
fun() ->
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
case Index of
0 -> true;
_ -> false
end
end),

rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint_on_queue, [QName]),

% Wait for initial checkpoint and make sure it's not 0
rabbit_ct_helpers:await_condition(
fun() ->
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
case Index of
0 -> false;
_ -> true
end
end).

force_checkpoint(Config) ->
[Server0, _Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
CQ = <<"force_checkpoint_cq">>,
RaName = ra_name(QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

?assertEqual({'queue.declare_ok', CQ, 0, 0},
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),

rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),

meck:expect(rabbit_quorum_queue, force_checkpoint_on_queue, fun(Q) -> ok end),

rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint, [<<".*">>, <<".*">>]),

% Waiting here to make sure checkpoint has been forced
rabbit_ct_helpers:await_condition(
fun() ->
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
case Index of
0 -> false;
_ -> true
end
end),

% Make sure force_checkpoint_on_queue was only called for the quorun queue
?assertEqual(1, meck:num_calls(rabbit_quorum_queue, force_checkpoint_on_queue, '_')).

% Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
alias RabbitMQ.CLI.Core.{DocGuide}

@behaviour RabbitMQ.CLI.CommandBehaviour

defp default_opts,
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}

def switches(),
do: [
vhost_pattern: :string,
queue_pattern: :string,
errors_only: :boolean
]

def merge_defaults(args, opts) do
{args, Map.merge(default_opts(), opts)}
end

use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments

def run([], %{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned value can easily be formatted as JSON but formatter: "json" is not supported in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems formatter: "json" is supported (presumably via RabbitMQ.CLI.DefaultOutput
?)

aaronseo: ~/workplace/rabbitMq/rabbitmq-server/sbin $ ./rabbitmq-queues -n force_checkpoint2 force_checkpoint --formatter json
[
{"vhost":"/","name":"qq2","result":"ok"}
,{"vhost":"/","name":"qwe","result":"ok"}
,{"vhost":"/","name":"testqq","result":"ok"}
...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tabular-looking data (so, lists of maps), yes, you can at least try DefaultOutput and see if there may be any reasons to override what it defines.

node: node_name,
vhost_pattern: vhost_pat,
queue_pattern: queue_pat,
errors_only: errors_only
}) do
args = [vhost_pat, queue_pat]

case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
{:error, _} = error ->
error

{:badrpc, _} = error ->
error

results when errors_only ->
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, format_result(res)}
]

results ->
for {{:resource, vhost, _kind, name}, res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, format_result(res)}
]
end
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.Table

def usage,
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"

def usage_additional do
[
["--queue-pattern <pattern>", "regular expression to match queue names"],
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
["--errors-only", "only list queues which reported an error"]
]
end

def usage_doc_guides() do
[
DocGuide.quorum_queues()
]
end

def help_section, do: :replication

def description,
do: "Forces checkpoints for all matching quorum queues"

def banner([], _) do
"Forcing checkpoint for all matching quorum queues..."
end

#
# Implementation
#

defp format_result({:ok}) do
Copy link
Collaborator

@michaelklishin michaelklishin Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reinvent the output interface used extensively by all existing commands.

:ok, {:error, :timeout}, {:error, _} are all handled by RabbitMQ.CLI.DefaultOutput, including JSON formatting of certain common returned values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The custom format_result does have a cosmetic effect, adopted from a couple other commands (namely grow and shrink). For example

With just RabbitMQ.CLI.DefaultOutput:

aaronseo: ~/workplace/rabbitMq/rabbitmq-server/sbin $ ./rabbitmq-queues -n force_checkpoint2 force_checkpoint
Forcing checkpoint for all matching quorum queues...
vhost   name    result
/       qq2     {ok}
/       qwe     {ok}
/       testqq  {ok}
/       qq1     {ok}
test    qwe     {ok}
test    qq1     {ok}
test    qq2     {ok}

...
aaronseo: ~/workplace/rabbitMq/rabbitmq-server/sbin $ ./rabbitmq-queues -n force_checkpoint2 force_checkpoint --formatter json
[
{"vhost":"/","name":"qq2","result":["ok"]}
,{"vhost":"/","name":"qwe","result":["ok"]}
,{"vhost":"/","name":"testqq","result":["ok"]}
...

With the custom format_result:

Forcing checkpoint for all matching quorum queues...
vhost   name    result
/       qq2     ok
/       qwe     ok
/       testqq  ok
/       qq1     ok
test    qwe     ok
test    qq1     ok
test    qq2     ok

...
aaronseo: ~/workplace/rabbitMq/rabbitmq-server/sbin $ ./rabbitmq-queues -n force_checkpoint2 force_checkpoint --formatter json
[
{"vhost":"/","name":"qq2","result":"ok"}
,{"vhost":"/","name":"qwe","result":"ok"}
,{"vhost":"/","name":"testqq","result":"ok"}
...

The cosmetic changes are, I think, more beneficial with the error messages, too.

However, I'm also open to just going to the RabbitMQ.CLI.DefaultOutput formatting, if still found to be preferred.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can override output/2 before the line where RabbitMQ.CLI.DefaultOutput is included, and rely on RabbitMQ.CLI.DefaultOutput as a catch-all for, say, rpc:call/4 error reporting.

Copy link
Contributor Author

@aaron-seo aaron-seo Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to forego with the custom format_result, I think the default output is a fine alternative, and allows better structuring of the error messages. Please lmk what you think

"ok"
end

defp format_result({:error, :timeout}) do
"error: the operation timed out and may not have been completed"
end

defp format_result({:error, err}) do
to_string(:io_lib.format("error: ~W", [err, 10]))
end
end
64 changes: 64 additions & 0 deletions deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
use ExUnit.Case, async: false
import TestHelper

@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand

setup_all do
RabbitMQ.CLI.Core.Distribution.start()

:ok
end

setup context do
{:ok,
opts: %{
node: get_rabbit_hostname(),
timeout: context[:test_timeout] || 30000,
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "merge_defaults: defaults to reporting complete results" do
assert @command.merge_defaults([], %{}) ==
{[],
%{
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "validate: accepts no positional arguments" do
assert @command.validate([], %{}) == :ok
end

test "validate: any positional arguments fail validation" do
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two"], %{}) ==
{:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
{:validation_failure, :too_many_args}
end

@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc", context do
assert match?(
{:badrpc, _},
@command.run(
[],
Map.merge(context[:opts], %{node: :jake@thedog})
)
)
end
end
Loading