Skip to content

Commit 93101d4

Browse files
committed
Move get_nodes from amqqueue to rabbit_amqqueue
We are moving the functionality of getting the nodes/members of an amqqueue from the `amqqueue` module to `rabbit_amqqueue`. This goes in the line of previous PRs work towards reducing direct access to the `QueueTypeState`, such as #13905. Also, we will need to discretize different formats of the `nodes` entry in the `QueueTypeState`, to support both the previous one as a list of nodes and the new one as a map of nodes to Ra UIds. Doing so in a module such as `amqqueue`, which feels like an accessor module around the `amqqueue` record, doesn't feel right.
1 parent 72a48e9 commit 93101d4

File tree

9 files changed

+26
-48
lines changed

9 files changed

+26
-48
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
% exclusive_owner
3030
get_exclusive_owner/1,
3131
get_leader_node/1,
32-
get_nodes/1,
3332
% name (#resource)
3433
get_name/1,
3534
set_name/2,
@@ -394,15 +393,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
394393
get_leader_node(#amqqueue{pid = none}) -> none;
395394
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).
396395

397-
-spec get_nodes(amqqueue_v2()) -> [node(),...].
398-
399-
get_nodes(Q) ->
400-
case amqqueue:get_type_state(Q) of
401-
#{nodes := Nodes} ->
402-
Nodes;
403-
_ ->
404-
[get_leader_node(Q)]
405-
end.
406396

407397
% operator_policy
408398

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
464464
{Leader :: node() | none, Replicas :: [node(),...]}.
465465
queue_topology(Q) ->
466466
Leader = amqqueue:get_leader_node(Q),
467-
Replicas = amqqueue:get_nodes(Q),
467+
Replicas = rabbit_amqqueue:get_nodes(Q),
468468
{Leader, Replicas}.
469469

470470
decode_exchange({map, KVList}) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
-export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
2424
emit_info_all/5, list_local/1, info_local/1,
2525
emit_info_local/4, emit_info_down/4]).
26+
-export([get_nodes/1]).
2627
-export([count/0]).
2728
-export([list_down/1, list_down/2, list_all/1,
2829
count/1, list_names/0, list_names/1, list_local_names/0,
@@ -1233,6 +1234,12 @@ list() ->
12331234
count() ->
12341235
rabbit_db_queue:count().
12351236

1237+
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
1238+
1239+
get_nodes(Q) ->
1240+
[{members, Nodes}] = info(Q, [members]),
1241+
Nodes.
1242+
12361243
-spec list_names() -> [name()].
12371244

12381245
list_names() ->
@@ -2042,12 +2049,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
20422049
).
20432050

20442051
get_quorum_nodes(Q) ->
2045-
case amqqueue:get_type_state(Q) of
2046-
#{nodes := Nodes} ->
2047-
Nodes;
2048-
_ ->
2049-
[]
2050-
end.
2052+
rabbit_amqqueue:get_nodes(Q).
20512053

20522054
-spec prepend_extra_bcc(Qs) ->
20532055
Qs when Qs :: [amqqueue:amqqueue() |

deps/rabbit/src/rabbit_jms_selector_parser.erl

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
-file("rabbit_jms_selector_parser.yrl", 0).
21
-module(rabbit_jms_selector_parser).
3-
-file("rabbit_jms_selector_parser.erl", 3).
42
-export([parse/1, parse_and_scan/1, format_error/1]).
53
-file("rabbit_jms_selector_parser.yrl", 122).
64

@@ -26,9 +24,7 @@ process_escape_char({string, Line, Value}) ->
2624
%%
2725
%% %CopyrightBegin%
2826
%%
29-
%% SPDX-License-Identifier: Apache-2.0
30-
%%
31-
%% Copyright Ericsson AB 1996-2025. All Rights Reserved.
27+
%% Copyright Ericsson AB 1996-2021. All Rights Reserved.
3228
%%
3329
%% Licensed under the Apache License, Version 2.0 (the "License");
3430
%% you may not use this file except in compliance with the License.
@@ -50,16 +46,10 @@ process_escape_char({string, Line, Value}) ->
5046

5147
-type yecc_ret() :: {'error', _} | {'ok', _}.
5248

53-
-ifdef (YECC_PARSE_DOC).
54-
-doc ?YECC_PARSE_DOC.
55-
-endif.
5649
-spec parse(Tokens :: list()) -> yecc_ret().
5750
parse(Tokens) ->
5851
yeccpars0(Tokens, {no_func, no_location}, 0, [], []).
5952

60-
-ifdef (YECC_PARSE_AND_SCAN_DOC).
61-
-doc ?YECC_PARSE_AND_SCAN_DOC.
62-
-endif.
6353
-spec parse_and_scan({function() | {atom(), atom()}, [_]}
6454
| {atom(), atom(), [_]}) -> yecc_ret().
6555
parse_and_scan({F, A}) ->
@@ -68,9 +58,6 @@ parse_and_scan({M, F, A}) ->
6858
Arity = length(A),
6959
yeccpars0([], {{fun M:F/Arity, A}, no_location}, 0, [], []).
7060

71-
-ifdef (YECC_FORMAT_ERROR_DOC).
72-
-doc ?YECC_FORMAT_ERROR_DOC.
73-
-endif.
7461
-spec format_error(any()) -> [char() | list()].
7562
format_error(Message) ->
7663
case io_lib:deep_char_list(Message) of
@@ -212,7 +199,7 @@ yecctoken2string1(Other) ->
212199

213200

214201

215-
-file("rabbit_jms_selector_parser.erl", 215).
202+
-file("rabbit_jms_selector_parser.erl", 202).
216203

217204
-dialyzer({nowarn_function, yeccpars2/7}).
218205
-compile({nowarn_unused_function, yeccpars2/7}).

deps/rabbit/src/rabbit_queue_location.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) ->
143143
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]),
144144
Queues = GetQueues(),
145145
Counters = lists:foldl(fun(Q, Acc) ->
146-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
146+
Nodes = rabbit_amqqueue:get_nodes(Q),
147147
lists:foldl(fun(N, A)
148148
when is_map_key(N, A) ->
149149
maps:update_with(N, fun(C) -> C+1 end, A);

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2176,7 +2176,7 @@ force_checkpoint_on_queue(QName) ->
21762176
{ok, Q} when ?amqqueue_is_quorum(Q) ->
21772177
{RaName, _} = amqqueue:get_pid(Q),
21782178
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2179-
Nodes = amqqueue:get_nodes(Q),
2179+
Nodes = rabbit_amqqueue:get_nodes(Q),
21802180
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
21812181
|| Node <- Nodes],
21822182
ok;

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ stop() ->
152152

153153
new_stream(Q, LeaderNode)
154154
when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
155-
#{name := StreamId,
156-
nodes := Nodes} = amqqueue:get_type_state(Q),
155+
#{name := StreamId} = amqqueue:get_type_state(Q),
156+
Nodes = rabbit_amqqueue:get_nodes(Q),
157157
%% assertion leader is in nodes configuration
158158
true = lists:member(LeaderNode, Nodes),
159159
process_command({new_stream, StreamId,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,7 +1310,7 @@ force_shrink_member_to_current_member(Config) ->
13101310
wait_for_messages_ready([Server0], RaName, 3),
13111311

13121312
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1313-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1313+
Nodes0 = rabbit_amqqueue:get_nodes(Q0),
13141314
?assertEqual(3, length(Nodes0)),
13151315

13161316
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
@@ -1319,15 +1319,15 @@ force_shrink_member_to_current_member(Config) ->
13191319
wait_for_messages_ready([Server0], RaName, 3),
13201320

13211321
{ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1322-
#{nodes := Nodes1} = amqqueue:get_type_state(Q1),
1322+
Nodes1 = rabbit_amqqueue:get_nodes(Q1),
13231323
?assertEqual(1, length(Nodes1)),
13241324

13251325
%% grow queues back to all nodes
13261326
[rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]],
13271327

13281328
wait_for_messages_ready([Server0], RaName, 3),
13291329
{ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1330-
#{nodes := Nodes2} = amqqueue:get_type_state(Q2),
1330+
Nodes2 = rabbit_amqqueue:get_nodes(Q2),
13311331
?assertEqual(3, length(Nodes2))
13321332
end.
13331333

@@ -1354,7 +1354,7 @@ force_all_queues_shrink_member_to_current_member(Config) ->
13541354
rabbit_ct_client_helpers:publish(Ch, Q, 3),
13551355
wait_for_messages_ready([Server0], RaName, 3),
13561356
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1357-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1357+
Nodes0 = rabbit_amqqueue:get_nodes(Q0),
13581358
?assertEqual(3, length(Nodes0))
13591359
end || Q <- QQs],
13601360

@@ -1365,7 +1365,7 @@ force_all_queues_shrink_member_to_current_member(Config) ->
13651365
RaName = ra_name(Q),
13661366
wait_for_messages_ready([Server0], RaName, 3),
13671367
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1368-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1368+
Nodes0 = rabbit_amqqueue:get_nodes(Q0),
13691369
?assertEqual(1, length(Nodes0))
13701370
end || Q <- QQs],
13711371

@@ -1376,7 +1376,7 @@ force_all_queues_shrink_member_to_current_member(Config) ->
13761376
RaName = ra_name(Q),
13771377
wait_for_messages_ready([Server0], RaName, 3),
13781378
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1379-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1379+
Nodes0 = rabbit_amqqueue:get_nodes(Q0),
13801380
?assertEqual(3, length(Nodes0))
13811381
end || Q <- QQs]
13821382
end.
@@ -1420,7 +1420,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
14201420
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
14211421
wait_for_messages_ready([Server0], RaName, 3),
14221422
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1423-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1423+
Nodes0 = rabbit_amqqueue:get_nodes(Q0),
14241424
?assertEqual(3, length(Nodes0))
14251425
end || Q <- QQs, VHost <- VHosts],
14261426

@@ -1432,7 +1432,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
14321432
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
14331433
wait_for_messages_ready([Server0], RaName, 3),
14341434
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1435-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1435+
Nodes0 = rabbit_amqqueue:get_nodes(Q0),
14361436
case VHost of
14371437
VHost1 -> ?assertEqual(3, length(Nodes0));
14381438
VHost2 -> ?assertEqual(1, length(Nodes0))
@@ -1447,7 +1447,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
14471447
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
14481448
wait_for_messages_ready([Server0], RaName, 3),
14491449
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1450-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1450+
Nodes0 = rabbit_amqqueue:get_nodes(Q0),
14511451
?assertEqual(3, length(Nodes0))
14521452
end || Q <- QQs, VHost <- VHosts]
14531453
end.
@@ -2955,7 +2955,7 @@ delete_member_member_already_deleted(Config) ->
29552955
rpc:call(Server, rabbit_quorum_queue, delete_member,
29562956
[<<"/">>, QQ, Server2])),
29572957
{ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
2958-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
2958+
Nodes = rabbit_amqqueue:get_nodes(Q),
29592959
?assertEqual(1, length(Nodes)),
29602960
ok.
29612961

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) ->
457457
true -> Acc;
458458
false ->
459459
Type = amqqueue:get_type(Q),
460-
TypeState = amqqueue:get_type_state(Q),
461-
Members = maps:get(nodes, TypeState, []),
460+
Members = rabbit_amqqueue:get_nodes(Q),
462461
case membership(amqqueue:get_pid(Q), Members) of
463462
not_a_member ->
464463
Acc;

0 commit comments

Comments
 (0)