Skip to content

Commit 2d0f9a6

Browse files
committed
add tests for acquiring qq member with highest index
1 parent 0024e65 commit 2d0f9a6

File tree

2 files changed

+166
-1
lines changed

2 files changed

+166
-1
lines changed

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ groups() ->
115115
node_removal_is_not_quorum_critical,
116116
select_nodes_with_least_replicas,
117117
select_nodes_with_least_replicas_node_down,
118-
subscribe_from_each
118+
subscribe_from_each,
119+
get_member_with_highest_index
119120

120121

121122
]},
@@ -365,6 +366,8 @@ init_per_testcase(Testcase, Config) ->
365366
{skip, "peek_with_wrong_queue_type isn't mixed versions compatible"};
366367
cancel_consumer_gh_3729 when IsMixed andalso RabbitMQ3 ->
367368
{skip, "this test is not compatible with RabbitMQ 3.13.x"};
369+
get_member_with_highest_index when IsMixed ->
370+
{skip, "get_member_with_highest_index isn't mixed versions compatible"};
368371
_ ->
369372
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
370373
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
@@ -4576,6 +4579,95 @@ leader_health_check(Config) ->
45764579
amqp_connection:close(Conn1),
45774580
amqp_connection:close(Conn2).
45784581

4582+
get_member_with_highest_index(Config) ->
4583+
[Node1, Node2, Node3, Node4, Node5] =
4584+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
4585+
4586+
Q = ?config(queue_name, Config),
4587+
VHost = <<"/">>,
4588+
4589+
Statuses =
4590+
%% [{Node, Member, LogIdx, CommitIdx, SnapshotIdx}, ...]
4591+
[{Node1, leader, 1015, 1010, 1010}, %% highest SnapshotIdx
4592+
{Node2, follower, 1015, 1010, 1010}, %% highest SnapshotIdx (duplicate)
4593+
{Node3, follower, 1013, 1013, 1009}, %% highest CommitIdx
4594+
{Node4, follower, 1016, 1009, 1008}, %% highest LogIdx
4595+
{Node5, follower, 1013, 1012, undefined}],
4596+
4597+
Term = 1,
4598+
MachineVersion = 7,
4599+
4600+
meck:new(rabbit_quorum_queue, [passthrough, no_link]),
4601+
meck:expect(
4602+
rabbit_quorum_queue, status,
4603+
fun(_, _) ->
4604+
[[{<<"Node Name">>, Node},
4605+
{<<"Raft State">>, Member},
4606+
{<<"Last Log Index">>, LogIndex},
4607+
{<<"Last Written">>, LogIndex},
4608+
{<<"Last Applied">>, LogIndex},
4609+
{<<"Commit Index">>, CommitIndex},
4610+
{<<"Snapshot Index">>, SnapshotIdx},
4611+
{<<"Term">>, Term},
4612+
{<<"Machine Version">>, MachineVersion}]
4613+
|| {Node, Member, LogIndex, CommitIndex, SnapshotIdx} <- Statuses]
4614+
end),
4615+
4616+
ct:pal("quorum status: ~tp", [rabbit_quorum_queue:status(VHost, Q)]),
4617+
4618+
ExpectedHighestLogIdx =
4619+
[[{<<"Node Name">>, Node4},
4620+
{<<"Raft State">>, follower},
4621+
{<<"Last Log Index">>, 1016},
4622+
{<<"Last Written">>,1016},
4623+
{<<"Last Applied">>,1016},
4624+
{<<"Commit Index">>, 1009},
4625+
{<<"Snapshot Index">>, 1008},
4626+
{<<"Term">>, Term},
4627+
{<<"Machine Version">>, MachineVersion}]],
4628+
4629+
[?assertEqual(ExpectedHighestLogIdx,
4630+
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [log, log_index]],
4631+
4632+
ExpectedHighestCommitIdx =
4633+
[[{<<"Node Name">>, Node3},
4634+
{<<"Raft State">>, follower},
4635+
{<<"Last Log Index">>, 1013},
4636+
{<<"Last Written">>,1013},
4637+
{<<"Last Applied">>,1013},
4638+
{<<"Commit Index">>, 1013},
4639+
{<<"Snapshot Index">>, 1009},
4640+
{<<"Term">>, Term},
4641+
{<<"Machine Version">>, MachineVersion}]],
4642+
4643+
[?assertEqual(ExpectedHighestCommitIdx,
4644+
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [commit, commit_index]],
4645+
4646+
ExpectedHighestSnapshotIdx =
4647+
[[{<<"Node Name">>, Node1},
4648+
{<<"Raft State">>, leader},
4649+
{<<"Last Log Index">>, 1015},
4650+
{<<"Last Written">>,1015},
4651+
{<<"Last Applied">>,1015},
4652+
{<<"Commit Index">>, 1010},
4653+
{<<"Snapshot Index">>, 1010},
4654+
{<<"Term">>, Term},
4655+
{<<"Machine Version">>, MachineVersion}]],
4656+
% Duplicate:
4657+
% [{<<"Node Name">>, Node2},
4658+
% {<<"Raft State">>, follower},
4659+
% {<<"Last Log Index">>, 1015},
4660+
% {<<"Last Written">>,1015},
4661+
% {<<"Last Applied">>,1015},
4662+
% {<<"Commit Index">>, 1010},
4663+
% {<<"Snapshot Index">>, 1010},
4664+
% {<<"Term">>, Term},
4665+
% {<<"Machine Version">>, MachineVersion}],
4666+
4667+
[?assertEqual(ExpectedHighestSnapshotIdx,
4668+
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [snapshot, snapshot_index]],
4669+
4670+
ok.
45794671

45804672
leader_locator_client_local(Config) ->
45814673
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommand do
8+
alias RabbitMQ.CLI.Core.DocGuide
9+
import RabbitMQ.CLI.Core.DataCoercion
10+
11+
@behaviour RabbitMQ.CLI.CommandBehaviour
12+
13+
use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument
14+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
15+
16+
def switches(), do: [index: :string, timeout: :integer]
17+
def aliases(), do: [i: :index, t: :timeout]
18+
19+
def merge_defaults(args, opts) do
20+
{args, Map.merge(%{vhost: "/", index: "log"}, opts)}
21+
end
22+
23+
def run([name] = _args, %{vhost: vhost, index: index, node: node_name}) do
24+
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :get_member_with_highest_index, [
25+
vhost,
26+
name,
27+
to_atom(String.downcase(index))
28+
]) do
29+
{:error, :classic_queue_not_supported} ->
30+
index = format_index(String.downcase(index))
31+
{:error, "Cannot get #{index} index from a classic queue"}
32+
33+
{:error, :not_found} ->
34+
{:error, {:not_found, :queue, vhost, name}}
35+
36+
other ->
37+
other
38+
end
39+
end
40+
41+
use RabbitMQ.CLI.DefaultOutput
42+
43+
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
44+
45+
def usage, do: "pick_member_with_highest_index <queue> [--vhost <vhost>] [--index <commit|commit_index|log|log_index|snapshot|snapshot_index>]"
46+
47+
def usage_additional do
48+
[
49+
["<queue>", "quorum queue name"],
50+
["--index <commit|commit_index|log|log_index|snapshot|snapshot_index>", "name of the index to use to lookup highest member"]
51+
]
52+
end
53+
54+
def usage_doc_guides() do
55+
[
56+
DocGuide.quorum_queues()
57+
]
58+
end
59+
60+
def help_section, do: :replication
61+
62+
def description, do: "Look up the member of a quorum queue with the highest commit, log or snapshot index."
63+
64+
def banner([name], %{node: node, index: index, vhost: vhost}) do
65+
index = format_index(String.downcase(index))
66+
"Member with highest #{index} index for queue #{name} in vhost #{vhost} on node #{node}..."
67+
end
68+
69+
defp format_index("log_index"), do: "log"
70+
defp format_index("commit_index"), do: "commit"
71+
defp format_index("snapshot_index"), do: "snapshot"
72+
defp format_index(index_name), do: index_name
73+
end

0 commit comments

Comments
 (0)