Skip to content

Commit 0024e65

Browse files
committed
implement rabbitmq-queues pick_member_with_highest_index <queue> cli command for quorum queues
1 parent cdd9ba1 commit 0024e65

File tree

2 files changed

+90
-1
lines changed

2 files changed

+90
-1
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
notify_decorators/3,
6969
spawn_notify_decorators/3]).
7070

71+
-export([get_member_with_highest_index/3]).
72+
7173
-export([is_enabled/0,
7274
is_compatible/3,
7375
declare/2,
@@ -1245,7 +1247,7 @@ key_metrics_rpc(ServerId) ->
12451247
Metrics = ra:key_metrics(ServerId),
12461248
Metrics#{machine_version => rabbit_fifo:version()}.
12471249

1248-
-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
1250+
-spec status(rabbit_types:vhost(), rabbit_misc:resource_name()) ->
12491251
[[{binary(), term()}]] | {error, term()}.
12501252
status(Vhost, QueueName) ->
12511253
%% Handle not found queues
@@ -1335,6 +1337,38 @@ get_sys_status(Proc) ->
13351337

13361338
end.
13371339

1340+
-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom()) ->
1341+
[[{binary(), term()}]] | {error, term()}.
1342+
get_member_with_highest_index(Vhost, QueueName, IndexName) ->
1343+
case ?MODULE:status(Vhost, QueueName) of
1344+
Status when is_list(Status) ->
1345+
IndexNameInternal = rabbit_data_coercion:to_atom(IndexName),
1346+
case index_name_to_status_key(IndexNameInternal) of
1347+
Key when is_binary(Key) ->
1348+
{_HighestIndexValue, HighestEntry} =
1349+
lists:foldl(
1350+
fun(Entry, {PreviousIndexValue, _PreviousEntry} = Acc) ->
1351+
case rabbit_misc:pget(Key, Entry) of
1352+
CurrentIndexValue when is_integer(CurrentIndexValue),
1353+
CurrentIndexValue > PreviousIndexValue ->
1354+
{CurrentIndexValue, Entry};
1355+
_ ->
1356+
Acc
1357+
end
1358+
end, {-100, []}, Status),
1359+
[HighestEntry];
1360+
undefined ->
1361+
[]
1362+
end;
1363+
{error, _} = Error ->
1364+
Error
1365+
end.
1366+
1367+
index_name_to_status_key(I) when I =:= commit; I =:= commit_index -> <<"Commit Index">>;
1368+
index_name_to_status_key(I) when I =:= log; I =:= log_index -> <<"Last Log Index">>;
1369+
index_name_to_status_key(I) when I =:= snapshot; I =:= snapshot_index -> <<"Snapshot Index">>;
1370+
index_name_to_status_key(_I) -> undefined.
1371+
13381372
add_member(VHost, Name, Node, Membership, Timeout)
13391373
when is_binary(VHost) andalso
13401374
is_binary(Name) andalso
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommandTest do
8+
use ExUnit.Case, async: false
9+
import TestHelper
10+
11+
@command RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommand
12+
13+
setup_all do
14+
RabbitMQ.CLI.Core.Distribution.start()
15+
16+
:ok
17+
end
18+
19+
setup context do
20+
{:ok,
21+
opts: %{
22+
node: get_rabbit_hostname(),
23+
timeout: context[:test_timeout] || 30000
24+
}}
25+
end
26+
27+
test "validate: when no arguments are provided, returns a failure" do
28+
assert @command.validate([], %{}) == {:validation_failure, :not_enough_args}
29+
end
30+
31+
test "validate: when two or more arguments are provided, returns a failure" do
32+
assert @command.validate(["quorum-queue-a", "one-extra-arg"], %{}) ==
33+
{:validation_failure, :too_many_args}
34+
35+
assert @command.validate(
36+
["quorum-queue-a", "extra-arg", "another-extra-arg"],
37+
%{}
38+
) == {:validation_failure, :too_many_args}
39+
end
40+
41+
test "validate: treats one positional arguments and default switches as a success" do
42+
assert @command.validate(["quorum-queue-a"], %{}) == :ok
43+
end
44+
45+
@tag test_timeout: 3000
46+
test "run: targeting an unreachable node throws a badrpc" do
47+
assert match?(
48+
{:badrpc, _},
49+
@command.run(
50+
["quorum-queue-a"],
51+
%{node: :jake@thedog, vhost: "/", index: "log", timeout: 200}
52+
)
53+
)
54+
end
55+
end

0 commit comments

Comments
 (0)