Skip to content

Commit b54ab1d

Browse files
committed
Add force checkpoint functions for quorum queues and command line tool
1 parent 8d9aa3e commit b54ab1d

File tree

4 files changed

+282
-0
lines changed

4 files changed

+282
-0
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
force_vhost_queues_shrink_member_to_current_member/1,
7878
force_all_queues_shrink_member_to_current_member/0]).
7979

80+
-export([force_checkpoint/2, force_checkpoint_on_queue/1]).
81+
8082
%% for backwards compatibility
8183
-export([file_handle_leader_reservation/1,
8284
file_handle_other_reservation/0,
@@ -2084,6 +2086,39 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
20842086
rabbit_log:warning("Shrinking finished"),
20852087
ok.
20862088

2089+
force_checkpoint_on_queue(QName) ->
2090+
Node = node(),
2091+
QNameFmt = rabbit_misc:rs(QName),
2092+
case rabbit_amqqueue:lookup(QName) of
2093+
{ok, Q} when ?amqqueue_is_classic(Q) ->
2094+
{error, classic_queue_not_supported};
2095+
{ok, Q} when ?amqqueue_is_quorum(Q) ->
2096+
{RaName, _} = amqqueue:get_pid(Q),
2097+
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]),
2098+
rabbit_log:debug("Sent command to force checkpoint ~ts", [QNameFmt]);
2099+
{ok, _Q} ->
2100+
{error, not_quorum_queue};
2101+
{error, _} = E ->
2102+
E
2103+
end.
2104+
2105+
force_checkpoint(VhostSpec, QueueSpec) ->
2106+
[begin
2107+
QName = amqqueue:get_name(Q),
2108+
case force_checkpoint_on_queue(QName) of
2109+
ok ->
2110+
{QName, {ok}};
2111+
{error, Err} ->
2112+
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
2113+
[rabbit_misc:rs(QName), Err]),
2114+
{QName, {error, Err}}
2115+
end
2116+
end
2117+
|| Q <- rabbit_amqqueue:list(),
2118+
amqqueue:get_type(Q) == ?MODULE,
2119+
is_match(amqqueue:get_vhost(Q), VhostSpec)
2120+
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].
2121+
20872122
is_minority(All, Up) ->
20882123
MinQuorum = length(All) div 2 + 1,
20892124
length(Up) < MinQuorum.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ groups() ->
9797
force_shrink_member_to_current_member,
9898
force_all_queues_shrink_member_to_current_member,
9999
force_vhost_queues_shrink_member_to_current_member,
100+
force_checkpoint_on_queue,
101+
force_checkpoint,
100102
policy_repair,
101103
gh_12635,
102104
replica_states
@@ -1308,6 +1310,80 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
13081310
?assertEqual(3, length(Nodes0))
13091311
end || Q <- QQs, VHost <- VHosts].
13101312

1313+
force_checkpoint_on_queue(Config) ->
1314+
[Server0, _Server1, _Server2] =
1315+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1316+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1317+
QQ = ?config(queue_name, Config),
1318+
RaName = ra_name(QQ),
1319+
QName = rabbit_misc:r(<<"/">>, queue, QQ),
1320+
1321+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1322+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1323+
1324+
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
1325+
wait_for_messages_ready([Server0], RaName, 3),
1326+
1327+
% Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet.
1328+
rabbit_ct_helpers:await_condition(
1329+
fun() ->
1330+
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1331+
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
1332+
case Index of
1333+
0 -> true;
1334+
_ -> false
1335+
end
1336+
end),
1337+
1338+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1339+
force_checkpoint_on_queue, [QName]),
1340+
1341+
% Wait for initial checkpoint and make sure it's not 0
1342+
rabbit_ct_helpers:await_condition(
1343+
fun() ->
1344+
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1345+
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
1346+
case Index of
1347+
0 -> false;
1348+
_ -> true
1349+
end
1350+
end).
1351+
1352+
force_checkpoint(Config) ->
1353+
[Server0, _Server1, _Server2] =
1354+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1355+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1356+
QQ = ?config(queue_name, Config),
1357+
CQ = <<"force_checkpoint_cq">>,
1358+
RaName = ra_name(QQ),
1359+
1360+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1361+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1362+
1363+
?assertEqual({'queue.declare_ok', CQ, 0, 0},
1364+
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
1365+
1366+
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
1367+
wait_for_messages_ready([Server0], RaName, 3),
1368+
1369+
meck:expect(rabbit_quorum_queue, force_checkpoint_on_queue, fun(Q) -> ok end),
1370+
1371+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1372+
force_checkpoint, [<<".*">>, <<".*">>]),
1373+
1374+
% Waiting here to make sure checkpoint has been forced
1375+
rabbit_ct_helpers:await_condition(
1376+
fun() ->
1377+
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1378+
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
1379+
case Index of
1380+
0 -> false;
1381+
_ -> true
1382+
end
1383+
end),
1384+
1385+
% Make sure force_checkpoint_on_queue was only called for the quorun queue
1386+
?assertEqual(1, meck:num_calls(rabbit_quorum_queue, force_checkpoint_on_queue, '_')).
13111387

13121388
% Tests that, if the process of a QQ is dead in the moment of declaring a policy
13131389
% that affects such queue, when the process is made available again, the policy
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
8+
alias RabbitMQ.CLI.Core.{DocGuide}
9+
10+
@behaviour RabbitMQ.CLI.CommandBehaviour
11+
12+
defp default_opts,
13+
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}
14+
15+
def switches(),
16+
do: [
17+
vhost_pattern: :string,
18+
queue_pattern: :string,
19+
errors_only: :boolean
20+
]
21+
22+
def merge_defaults(args, opts) do
23+
{args, Map.merge(default_opts(), opts)}
24+
end
25+
26+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
27+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
28+
29+
def run([], %{
30+
node: node_name,
31+
vhost_pattern: vhost_pat,
32+
queue_pattern: queue_pat,
33+
errors_only: errors_only
34+
}) do
35+
args = [vhost_pat, queue_pat]
36+
37+
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
38+
{:error, _} = error ->
39+
error
40+
41+
{:badrpc, _} = error ->
42+
error
43+
44+
results when errors_only ->
45+
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
46+
do: [
47+
{:vhost, vhost},
48+
{:name, name},
49+
{:result, format_result(res)}
50+
]
51+
52+
results ->
53+
for {{:resource, vhost, _kind, name}, res} <- results,
54+
do: [
55+
{:vhost, vhost},
56+
{:name, name},
57+
{:result, format_result(res)}
58+
]
59+
end
60+
end
61+
62+
use RabbitMQ.CLI.DefaultOutput
63+
64+
def formatter(), do: RabbitMQ.CLI.Formatters.Table
65+
66+
def usage,
67+
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"
68+
69+
def usage_additional do
70+
[
71+
["--queue-pattern <pattern>", "regular expression to match queue names"],
72+
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
73+
["--errors-only", "only list queues which reported an error"]
74+
]
75+
end
76+
77+
def usage_doc_guides() do
78+
[
79+
DocGuide.quorum_queues()
80+
]
81+
end
82+
83+
def help_section, do: :replication
84+
85+
def description,
86+
do: "Forces checkpoints for all matching quorum queues"
87+
88+
def banner([], _) do
89+
"Forcing checkpoint for all matching quorum queues..."
90+
end
91+
92+
#
93+
# Implementation
94+
#
95+
96+
defp format_result({:ok}) do
97+
"ok"
98+
end
99+
100+
defp format_result({:error, :timeout}) do
101+
"error: the operation timed out and may not have been completed"
102+
end
103+
104+
defp format_result({:error, err}) do
105+
to_string(:io_lib.format("error: ~W", [err, 10]))
106+
end
107+
end
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
8+
use ExUnit.Case, async: false
9+
import TestHelper
10+
11+
@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand
12+
13+
setup_all do
14+
RabbitMQ.CLI.Core.Distribution.start()
15+
16+
:ok
17+
end
18+
19+
setup context do
20+
{:ok,
21+
opts: %{
22+
node: get_rabbit_hostname(),
23+
timeout: context[:test_timeout] || 30000,
24+
vhost_pattern: ".*",
25+
queue_pattern: ".*",
26+
errors_only: false
27+
}}
28+
end
29+
30+
test "merge_defaults: defaults to reporting complete results" do
31+
assert @command.merge_defaults([], %{}) ==
32+
{[],
33+
%{
34+
vhost_pattern: ".*",
35+
queue_pattern: ".*",
36+
errors_only: false
37+
}}
38+
end
39+
40+
test "validate: accepts no positional arguments" do
41+
assert @command.validate([], %{}) == :ok
42+
end
43+
44+
test "validate: any positional arguments fail validation" do
45+
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}
46+
47+
assert @command.validate(["quorum-queue-a", "two"], %{}) ==
48+
{:validation_failure, :too_many_args}
49+
50+
assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
51+
{:validation_failure, :too_many_args}
52+
end
53+
54+
@tag test_timeout: 3000
55+
test "run: targeting an unreachable node throws a badrpc", context do
56+
assert match?(
57+
{:badrpc, _},
58+
@command.run(
59+
[],
60+
Map.merge(context[:opts], %{node: :jake@thedog})
61+
)
62+
)
63+
end
64+
end

0 commit comments

Comments
 (0)