Skip to content
19 changes: 19 additions & 0 deletions deps/rabbit/src/amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
% exclusive_owner
get_exclusive_owner/1,
get_leader/1,
get_nodes/1,
% name (#resource)
get_name/1,
set_name/2,
Expand Down Expand Up @@ -391,6 +392,24 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->

get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.

-spec get_leader_node(amqqueue_v2()) -> node() | none.

%% Introduced in rabbitmq/rabbitmq-server#13905 for 4.2.0,
%% used in v4.1.x as of rabbitmq/rabbitmq-server#13548. MK.
get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
get_leader_node(#amqqueue{pid = none}) -> none;
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).

-spec get_nodes(amqqueue_v2()) -> [node(),...].

get_nodes(Q) ->
case amqqueue:get_type_state(Q) of
#{nodes := Nodes} ->
Nodes;
_ ->
[get_leader_node(Q)]
end.

% operator_policy

-spec get_operator_policy(amqqueue()) -> binary() | none | undefined.
Expand Down
37 changes: 37 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
force_vhost_queues_shrink_member_to_current_member/1,
force_all_queues_shrink_member_to_current_member/0]).

-export([force_checkpoint/2, force_checkpoint_on_queue/1]).

%% for backwards compatibility
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0,
Expand Down Expand Up @@ -141,6 +143,7 @@
-define(RPC_TIMEOUT, 1000).
-define(START_CLUSTER_TIMEOUT, 5000).
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
-define(TICK_INTERVAL, 5000). %% the ra server tick time
-define(DELETE_TIMEOUT, 5000).
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
Expand Down Expand Up @@ -2105,6 +2108,40 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
rabbit_log:warning("Shrinking finished"),
ok.

force_checkpoint_on_queue(QName) ->
QNameFmt = rabbit_misc:rs(QName),
case rabbit_db_queue:get_durable(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
Nodes = amqqueue:get_nodes(Q),
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
|| Node <- Nodes],
ok;
{ok, _Q} ->
{error, not_quorum_queue};
{error, _} = E ->
E
end.

force_checkpoint(VhostSpec, QueueSpec) ->
[begin
QName = amqqueue:get_name(Q),
case force_checkpoint_on_queue(QName) of
ok ->
{QName, {ok}};
{error, Err} ->
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
[rabbit_misc:rs(QName), Err]),
{QName, {error, Err}}
end
end
|| Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE),
is_match(amqqueue:get_vhost(Q), VhostSpec)
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].

is_minority(All, Up) ->
MinQuorum = length(All) div 2 + 1,
length(Up) < MinQuorum.
Expand Down
93 changes: 93 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbit/src/rabbit_fifo.hrl").

-import(queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
Expand Down Expand Up @@ -98,6 +99,8 @@ groups() ->
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
force_checkpoint_on_queue,
force_checkpoint,
policy_repair,
gh_12635,
replica_states
Expand Down Expand Up @@ -1333,6 +1336,96 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].

force_checkpoint_on_queue(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
QName = rabbit_misc:r(<<"/">>, queue, QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

N = 20_000,
rabbit_ct_client_helpers:publish(Ch, QQ, N),
wait_for_messages_ready([Server0], RaName, N),

%% The state before any checkpoints
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),

{ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]),

%% wait for longer than ?CHECK_MIN_INTERVAL_MS ms
timer:sleep(?CHECK_MIN_INTERVAL_MS + 1000),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint_on_queue, [QName]),

%% Wait for initial checkpoint and make sure it's not 0
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end).

force_checkpoint(Config) ->
[Server0, _Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
CQ = <<"force_checkpoint_cq">>,
RaName = ra_name(QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

?assertEqual({'queue.declare_ok', CQ, 0, 0},
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),

rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),

ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint, [<<".*">>, <<".*">>]),
ExpectedRes = [{QQName, {ok}}],

% Result should only have quorum queue
?assertEqual(ExpectedRes, ForceCheckpointRes).

% Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
alias RabbitMQ.CLI.Core.{DocGuide}

@behaviour RabbitMQ.CLI.CommandBehaviour

defp default_opts,
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}

def switches(),
do: [
vhost_pattern: :string,
queue_pattern: :string,
errors_only: :boolean
]

def merge_defaults(args, opts) do
{args, Map.merge(default_opts(), opts)}
end

use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments

def run([], %{
node: node_name,
vhost_pattern: vhost_pat,
queue_pattern: queue_pat,
errors_only: errors_only
}) do
args = [vhost_pat, queue_pat]

case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
{:badrpc, _} = error ->
error

results when errors_only ->
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, res}
]

results ->
for {{:resource, vhost, _kind, name}, res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, res}
]
end
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.Table

def usage,
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"

def usage_additional do
[
["--queue-pattern <pattern>", "regular expression to match queue names"],
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
["--errors-only", "only list queues which reported an error"]
]
end

def usage_doc_guides() do
[
DocGuide.quorum_queues()
]
end

def help_section, do: :replication

def description,
do: "Forces checkpoints for all matching quorum queues"

def banner([], _) do
"Forcing checkpoint for all matching quorum queues..."
end
end
64 changes: 64 additions & 0 deletions deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
use ExUnit.Case, async: false
import TestHelper

@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand

setup_all do
RabbitMQ.CLI.Core.Distribution.start()

:ok
end

setup context do
{:ok,
opts: %{
node: get_rabbit_hostname(),
timeout: context[:test_timeout] || 30000,
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "merge_defaults: defaults to reporting complete results" do
assert @command.merge_defaults([], %{}) ==
{[],
%{
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "validate: accepts no positional arguments" do
assert @command.validate([], %{}) == :ok
end

test "validate: any positional arguments fail validation" do
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two"], %{}) ==
{:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
{:validation_failure, :too_many_args}
end

@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc", context do
assert match?(
{:badrpc, _},
@command.run(
[],
Map.merge(context[:opts], %{node: :jake@thedog})
)
)
end
end
Loading