Skip to content

Commit 8d21428

Browse files
committed
Move get_nodes from amqqueue to rabbit_amqqueue
We are moving the functionality of getting the nodes/members of an amqqueue from the `amqqueue` module to `rabbit_amqqueue`. This goes in the line of previous PRs work towards reducing direct access to the `QueueTypeState`, such as #13905. Also, we will need to discretize different formats of the `nodes` entry in the `QueueTypeState`, to support both the previous one as a list of nodes and the new one as a map of nodes to Ra UIds. Doing so in a module such as `amqqueue`, which feels like an accessor module around the `amqqueue` record, doesn't feel right.
1 parent 1997e95 commit 8d21428

File tree

8 files changed

+15
-24
lines changed

8 files changed

+15
-24
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
% exclusive_owner
3131
get_exclusive_owner/1,
3232
get_leader_node/1,
33-
get_nodes/1,
3433
% name (#resource)
3534
get_name/1,
3635
set_name/2,
@@ -425,15 +424,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
425424
get_leader_node(#amqqueue{pid = none}) -> none;
426425
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).
427426

428-
-spec get_nodes(amqqueue_v2()) -> [node(),...].
429-
430-
get_nodes(Q) ->
431-
case amqqueue:get_type_state(Q) of
432-
#{nodes := Nodes} ->
433-
Nodes;
434-
_ ->
435-
[get_leader_node(Q)]
436-
end.
437427

438428
% operator_policy
439429

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
471471
{Leader :: node() | none, Replicas :: [node(),...]}.
472472
queue_topology(Q) ->
473473
Leader = amqqueue:get_leader_node(Q),
474-
Replicas = amqqueue:get_nodes(Q),
474+
Replicas = rabbit_amqqueue:get_nodes(Q),
475475
{Leader, Replicas}.
476476

477477
decode_exchange({map, KVList}) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
-export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
2424
emit_info_all/5, list_local/1, info_local/1,
2525
emit_info_local/4, emit_info_down/4]).
26+
-export([get_nodes/1]).
2627
-export([count/0]).
2728
-export([list_down/1, list_down/2, list_all/1,
2829
count/1, list_names/0, list_names/1, list_local_names/0,
@@ -1226,6 +1227,12 @@ list() ->
12261227
count() ->
12271228
rabbit_db_queue:count().
12281229

1230+
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
1231+
1232+
get_nodes(Q) ->
1233+
[{members, Nodes}] = info(Q, [members]),
1234+
Nodes.
1235+
12291236
-spec list_names() -> [name()].
12301237

12311238
list_names() ->
@@ -2035,12 +2042,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
20352042
).
20362043

20372044
get_quorum_nodes(Q) ->
2038-
case amqqueue:get_type_state(Q) of
2039-
#{nodes := Nodes} ->
2040-
Nodes;
2041-
_ ->
2042-
[]
2043-
end.
2045+
rabbit_amqqueue:get_nodes(Q).
20442046

20452047
-spec prepend_extra_bcc(Qs) ->
20462048
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].

deps/rabbit/src/rabbit_queue_location.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) ->
143143
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]),
144144
Queues = GetQueues(),
145145
Counters = lists:foldl(fun(Q, Acc) ->
146-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
146+
Nodes = rabbit_amqqueue:get_nodes(Q),
147147
lists:foldl(fun(N, A)
148148
when is_map_key(N, A) ->
149149
maps:update_with(N, fun(C) -> C+1 end, A);

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2189,7 +2189,7 @@ force_checkpoint_on_queue(QName) ->
21892189
{ok, Q} when ?amqqueue_is_quorum(Q) ->
21902190
{RaName, _} = amqqueue:get_pid(Q),
21912191
?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2192-
Nodes = amqqueue:get_nodes(Q),
2192+
Nodes = rabbit_amqqueue:get_nodes(Q),
21932193
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
21942194
|| Node <- Nodes],
21952195
ok;

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ stop() ->
153153

154154
new_stream(Q, LeaderNode)
155155
when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
156-
#{name := StreamId,
157-
nodes := Nodes} = amqqueue:get_type_state(Q),
156+
#{name := StreamId} = amqqueue:get_type_state(Q),
157+
Nodes = rabbit_amqqueue:get_nodes(Q),
158158
%% assertion leader is in nodes configuration
159159
true = lists:member(LeaderNode, Nodes),
160160
process_command({new_stream, StreamId,

deps/rabbitmq_ct_helpers/src/queue_utils.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ assert_number_of_replicas(Config, Server, VHost, QQ, Count) ->
208208
begin
209209
{ok, Q} = rabbit_ct_broker_helpers:rpc(
210210
Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]),
211-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
211+
Nodes = rabbit_amqqueue:get_nodes(Q),
212212
length(Nodes)
213213
end,
214214
30000).

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) ->
451451
true -> Acc;
452452
false ->
453453
Type = amqqueue:get_type(Q),
454-
TypeState = amqqueue:get_type_state(Q),
455-
Members = maps:get(nodes, TypeState, []),
454+
Members = rabbit_amqqueue:get_nodes(Q),
456455
case membership(amqqueue:get_pid(Q), Members) of
457456
not_a_member ->
458457
Acc;

0 commit comments

Comments
 (0)