Skip to content

Commit f3b7638

Browse files
committed
Move get_nodes from amqqueue to rabbit_amqqueue
We are moving the functionality of getting the nodes/members of an amqqueue from the `amqqueue` module to `rabbit_amqqueue`. This goes in the line of previous PRs work towards reducing direct access to the `QueueTypeState`, such as #13905. Also, we will need to discretize different formats of the `nodes` entry in the `QueueTypeState`, to support both the previous one as a list of nodes and the new one as a map of nodes to Ra UIds. Doing so in a module such as `amqqueue`, which feels like an accessor module around the `amqqueue` record, doesn't feel right.
1 parent 782fccf commit f3b7638

File tree

8 files changed

+15
-24
lines changed

8 files changed

+15
-24
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
% exclusive_owner
3030
get_exclusive_owner/1,
3131
get_leader_node/1,
32-
get_nodes/1,
3332
% name (#resource)
3433
get_name/1,
3534
set_name/2,
@@ -394,15 +393,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
394393
get_leader_node(#amqqueue{pid = none}) -> none;
395394
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).
396395

397-
-spec get_nodes(amqqueue_v2()) -> [node(),...].
398-
399-
get_nodes(Q) ->
400-
case amqqueue:get_type_state(Q) of
401-
#{nodes := Nodes} ->
402-
Nodes;
403-
_ ->
404-
[get_leader_node(Q)]
405-
end.
406396

407397
% operator_policy
408398

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
465465
{Leader :: node() | none, Replicas :: [node(),...]}.
466466
queue_topology(Q) ->
467467
Leader = amqqueue:get_leader_node(Q),
468-
Replicas = amqqueue:get_nodes(Q),
468+
Replicas = rabbit_amqqueue:get_nodes(Q),
469469
{Leader, Replicas}.
470470

471471
decode_exchange({map, KVList}) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
-export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
2424
emit_info_all/5, list_local/1, info_local/1,
2525
emit_info_local/4, emit_info_down/4]).
26+
-export([get_nodes/1]).
2627
-export([count/0]).
2728
-export([list_down/1, list_down/2, list_all/1,
2829
count/1, list_names/0, list_names/1, list_local_names/0,
@@ -1234,6 +1235,12 @@ list() ->
12341235
count() ->
12351236
rabbit_db_queue:count().
12361237

1238+
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
1239+
1240+
get_nodes(Q) ->
1241+
[{members, Nodes}] = info(Q, [members]),
1242+
Nodes.
1243+
12371244
-spec list_names() -> [name()].
12381245

12391246
list_names() ->
@@ -2043,12 +2050,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
20432050
).
20442051

20452052
get_quorum_nodes(Q) ->
2046-
case amqqueue:get_type_state(Q) of
2047-
#{nodes := Nodes} ->
2048-
Nodes;
2049-
_ ->
2050-
[]
2051-
end.
2053+
rabbit_amqqueue:get_nodes(Q).
20522054

20532055
-spec prepend_extra_bcc(Qs) ->
20542056
Qs when Qs :: [amqqueue:amqqueue() |

deps/rabbit/src/rabbit_queue_location.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) ->
143143
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]),
144144
Queues = GetQueues(),
145145
Counters = lists:foldl(fun(Q, Acc) ->
146-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
146+
Nodes = rabbit_amqqueue:get_nodes(Q),
147147
lists:foldl(fun(N, A)
148148
when is_map_key(N, A) ->
149149
maps:update_with(N, fun(C) -> C+1 end, A);

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2178,7 +2178,7 @@ force_checkpoint_on_queue(QName) ->
21782178
{ok, Q} when ?amqqueue_is_quorum(Q) ->
21792179
{RaName, _} = amqqueue:get_pid(Q),
21802180
?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2181-
Nodes = amqqueue:get_nodes(Q),
2181+
Nodes = rabbit_amqqueue:get_nodes(Q),
21822182
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
21832183
|| Node <- Nodes],
21842184
ok;

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ stop() ->
153153

154154
new_stream(Q, LeaderNode)
155155
when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
156-
#{name := StreamId,
157-
nodes := Nodes} = amqqueue:get_type_state(Q),
156+
#{name := StreamId} = amqqueue:get_type_state(Q),
157+
Nodes = rabbit_amqqueue:get_nodes(Q),
158158
%% assertion leader is in nodes configuration
159159
true = lists:member(LeaderNode, Nodes),
160160
process_command({new_stream, StreamId,

deps/rabbitmq_ct_helpers/src/queue_utils.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ assert_number_of_replicas(Config, Server, VHost, QQ, Count) ->
208208
begin
209209
{ok, Q} = rabbit_ct_broker_helpers:rpc(
210210
Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]),
211-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
211+
Nodes = rabbit_amqqueue:get_nodes(Q),
212212
length(Nodes)
213213
end,
214214
30000).

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) ->
451451
true -> Acc;
452452
false ->
453453
Type = amqqueue:get_type(Q),
454-
TypeState = amqqueue:get_type_state(Q),
455-
Members = maps:get(nodes, TypeState, []),
454+
Members = rabbit_amqqueue:get_nodes(Q),
456455
case membership(amqqueue:get_pid(Q), Members) of
457456
not_a_member ->
458457
Acc;

0 commit comments

Comments
 (0)