Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
queue_vm_stats_sups/0,
queue_vm_ets/0]).

-export([force_checkpoint/2, force_checkpoint_on_queue/1]).

%% for backwards compatibility
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0,
Expand Down Expand Up @@ -157,6 +159,7 @@
-define(RPC_TIMEOUT, 1000).
-define(START_CLUSTER_TIMEOUT, 5000).
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
-define(TICK_INTERVAL, 5000). %% the ra server tick time
-define(DELETE_TIMEOUT, 5000).
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
Expand Down Expand Up @@ -2115,6 +2118,40 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
rabbit_log:warning("Shrinking finished"),
ok.

force_checkpoint_on_queue(QName) ->
QNameFmt = rabbit_misc:rs(QName),
case rabbit_db_queue:get_durable(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
Nodes = amqqueue:get_nodes(Q),
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
|| Node <- Nodes],
ok;
{ok, _Q} ->
{error, not_quorum_queue};
{error, _} = E ->
E
end.

force_checkpoint(VhostSpec, QueueSpec) ->
[begin
QName = amqqueue:get_name(Q),
case force_checkpoint_on_queue(QName) of
ok ->
{QName, {ok}};
{error, Err} ->
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
[rabbit_misc:rs(QName), Err]),
{QName, {error, Err}}
end
end
|| Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will include queues that do not have any members on the current node but the force_checkpoint_on_queue will only ask any local member to force a checkpoint.

is_match(amqqueue:get_vhost(Q), VhostSpec)
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].

is_minority(All, Up) ->
MinQuorum = length(All) div 2 + 1,
length(Up) < MinQuorum.
Expand Down
93 changes: 93 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbit/src/rabbit_fifo.hrl").

-import(queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
Expand Down Expand Up @@ -98,6 +99,8 @@ groups() ->
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
force_checkpoint_on_queue,
force_checkpoint,
policy_repair,
gh_12635,
replica_states
Expand Down Expand Up @@ -1339,6 +1342,96 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].

force_checkpoint_on_queue(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
QName = rabbit_misc:r(<<"/">>, queue, QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

N = 20_000,
rabbit_ct_client_helpers:publish(Ch, QQ, N),
wait_for_messages_ready([Server0], RaName, N),

%% The state before any checkpoints
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),

{ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]),

%% wait for longer than ?CHECK_MIN_INTERVAL_MS ms
timer:sleep(?CHECK_MIN_INTERVAL_MS + 1000),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint_on_queue, [QName]),

%% Wait for initial checkpoint and make sure it's not 0
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end).

force_checkpoint(Config) ->
[Server0, _Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
CQ = <<"force_checkpoint_cq">>,
RaName = ra_name(QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

?assertEqual({'queue.declare_ok', CQ, 0, 0},
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),

rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),

ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint, [<<".*">>, <<".*">>]),
ExpectedRes = [{QQName, {ok}}],

% Result should only have quorum queue
?assertEqual(ExpectedRes, ForceCheckpointRes).

% Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
alias RabbitMQ.CLI.Core.{DocGuide}

@behaviour RabbitMQ.CLI.CommandBehaviour

defp default_opts,
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}

def switches(),
do: [
vhost_pattern: :string,
queue_pattern: :string,
errors_only: :boolean
]

def merge_defaults(args, opts) do
{args, Map.merge(default_opts(), opts)}
end

use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments

def run([], %{
node: node_name,
vhost_pattern: vhost_pat,
queue_pattern: queue_pat,
errors_only: errors_only
}) do
args = [vhost_pat, queue_pat]

case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
{:badrpc, _} = error ->
error

results when errors_only ->
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, res}
]

results ->
for {{:resource, vhost, _kind, name}, res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, res}
]
end
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.Table

def usage,
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"

def usage_additional do
[
["--queue-pattern <pattern>", "regular expression to match queue names"],
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
["--errors-only", "only list queues which reported an error"]
]
end

def usage_doc_guides() do
[
DocGuide.quorum_queues()
]
end

def help_section, do: :replication

def description,
do: "Forces checkpoints for all matching quorum queues"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it will only force a checkpoint on the local member of the matching queues, if there is a local member.


def banner([], _) do
"Forcing checkpoint for all matching quorum queues..."
end
end
64 changes: 64 additions & 0 deletions deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
use ExUnit.Case, async: false
import TestHelper

@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand

setup_all do
RabbitMQ.CLI.Core.Distribution.start()

:ok
end

setup context do
{:ok,
opts: %{
node: get_rabbit_hostname(),
timeout: context[:test_timeout] || 30000,
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "merge_defaults: defaults to reporting complete results" do
assert @command.merge_defaults([], %{}) ==
{[],
%{
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "validate: accepts no positional arguments" do
assert @command.validate([], %{}) == :ok
end

test "validate: any positional arguments fail validation" do
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two"], %{}) ==
{:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
{:validation_failure, :too_many_args}
end

@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc", context do
assert match?(
{:badrpc, _},
@command.run(
[],
Map.merge(context[:opts], %{node: :jake@thedog})
)
)
end
end
Loading