Skip to content

Commit 42405c5

Browse files
committed
Apply PR suggestions
1 parent 3293a24 commit 42405c5

File tree

8 files changed

+57
-65
lines changed

8 files changed

+57
-65
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
471471
{Leader :: node() | none, Replicas :: [node(),...]}.
472472
queue_topology(Q) ->
473473
Leader = amqqueue:get_leader_node(Q),
474-
Replicas = rabbit_amqqueue:get_nodes(Q),
474+
Replicas = rabbit_queue_type:get_nodes(Q),
475475
{Leader, Replicas}.
476476

477477
decode_exchange({map, KVList}) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
-export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
2424
emit_info_all/5, list_local/1, info_local/1,
2525
emit_info_local/4, emit_info_down/4]).
26-
-export([get_nodes/1]).
2726
-export([count/0]).
2827
-export([list_down/1, list_down/2, list_all/1,
2928
count/1, list_names/0, list_names/1, list_local_names/0,
@@ -1227,12 +1226,6 @@ list() ->
12271226
count() ->
12281227
rabbit_db_queue:count().
12291228

1230-
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
1231-
1232-
get_nodes(Q) ->
1233-
[{members, Nodes}] = info(Q, [members]),
1234-
Nodes.
1235-
12361229
-spec list_names() -> [name()].
12371230

12381231
list_names() ->
@@ -2042,7 +2035,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
20422035
).
20432036

20442037
get_quorum_nodes(Q) ->
2045-
rabbit_amqqueue:get_nodes(Q).
2038+
rabbit_queue_type:get_nodes(Q).
20462039

20472040
-spec prepend_extra_bcc(Qs) ->
20482041
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].

deps/rabbit/src/rabbit_queue_location.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) ->
143143
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]),
144144
Queues = GetQueues(),
145145
Counters = lists:foldl(fun(Q, Acc) ->
146-
Nodes = rabbit_amqqueue:get_nodes(Q),
146+
Nodes = rabbit_queue_type:get_nodes(Q),
147147
lists:foldl(fun(N, A)
148148
when is_map_key(N, A) ->
149149
maps:update_with(N, fun(C) -> C+1 end, A);

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
format/2,
3939
remove/2,
4040
info/2,
41+
get_nodes/1,
4142
state_info/1,
4243
format_status/1,
4344
info_down/2,
@@ -422,6 +423,11 @@ info(Q, Items) ->
422423
Mod = amqqueue:get_type(Q),
423424
Mod:info(Q, Items).
424425

426+
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
427+
get_nodes(Q) ->
428+
[{members, Nodes}] = info(Q, [members]),
429+
Nodes.
430+
425431
fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) ->
426432
maps:fold(Fun, Acc, Ctxs).
427433

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -274,13 +274,13 @@ start_cluster(Q) ->
274274
|| Node <- [LeaderNode | FollowerNodes]]),
275275
NewQ0 = amqqueue:set_pid(Q, LeaderId),
276276
NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
277-
false ->
278-
amqqueue:set_type_state(NewQ0,
279-
#{nodes => [LeaderNode | FollowerNodes]});
280-
true ->
281-
amqqueue:set_type_state(NewQ0,
282-
#{nodes => UIDs})
283-
end,
277+
false ->
278+
amqqueue:set_type_state(NewQ0,
279+
#{nodes => [LeaderNode | FollowerNodes]});
280+
true ->
281+
amqqueue:set_type_state(NewQ0,
282+
#{nodes => UIDs})
283+
end,
284284

285285
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
286286
rabbit_fifo, version, [],
@@ -734,16 +734,14 @@ repair_amqqueue_nodes(Q0) ->
734734
%% update amqqueue record
735735
Fun = fun (Q) ->
736736
TS0 = amqqueue:get_type_state(Q),
737-
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids)
738-
andalso has_uuid_tracking(TS0)
739-
of
737+
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
740738
false ->
741739
TS0#{nodes => RaNodes};
742740
true ->
743741
RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of,
744-
[?RA_SYSTEM, Name],
745-
?RPC_TIMEOUT)}
746-
|| N <- RaNodes]),
742+
[?RA_SYSTEM, Name],
743+
?RPC_TIMEOUT)}
744+
|| N <- RaNodes]),
747745
TS0#{nodes => RaUids}
748746
end,
749747
amqqueue:set_type_state(Q, TS)
@@ -804,28 +802,28 @@ maybe_apply_policies(Q, #{config := CurrentConfig}) ->
804802
{[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
805803
recover(_Vhost, Queues) ->
806804
lists:foldl(
807-
fun (Q0, {R0, F0}) ->
808-
{Name, _} = amqqueue:get_pid(Q0),
805+
fun (Q, {R0, F0}) ->
806+
{Name, _} = amqqueue:get_pid(Q),
809807
ServerId = {Name, node()},
810-
QName = amqqueue:get_name(Q0),
811-
MutConf = make_mutable_config(Q0),
808+
QName = amqqueue:get_name(Q),
809+
MutConf = make_mutable_config(Q),
812810
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
813-
#{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0),
814-
QTypeState = case Nodes of
811+
#{nodes := Nodes} = amqqueue:get_type_state(Q),
812+
case Nodes of
815813
List when is_list(List) ->
816814
%% Queue is not aware of node to uid mapping, do nothing
817-
QTypeState0;
815+
ok;
818816
#{node() := RaUId} ->
819-
%% Queue is aware and uid for current node is correct, do nothing
820-
QTypeState0;
821-
_ ->
822-
%% Queue is aware but either current node has no UId or it
823-
%% does not match the one returned by ra_directory, regen uid
824-
maybe_delete_data_dir(RaUId),
825-
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
826-
QTypeState0#{nodes := Nodes#{node() => NewRaUId}}
817+
%% Queue is aware and uid for current node is correct, do
818+
%% nothing
819+
ok;
820+
#{node() := _NewRaUId} ->
821+
%% Queue is aware but it does not match the one returned by
822+
%% ra_directory
823+
rabbit_log:info("Quorum queue ~ts: detected node uuid change, "
824+
"deleting old data directory", [rabbit_misc:rs(QName)]),
825+
maybe_delete_data_dir(RaUId)
827826
end,
828-
Q = amqqueue:set_type_state(Q0, QTypeState),
829827
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
830828
ok ->
831829
% queue was restarted, good
@@ -1430,16 +1428,16 @@ do_add_member(Q0, Node, Membership, Timeout)
14301428
QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0),
14311429
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
14321430
QTypeState = case Nodes of
1433-
L when is_list(L) ->
1434-
%% Queue is not aware of node to uid mapping, just add the new node
1435-
QTypeState0#{nodes => lists:usort([Node | Nodes])};
1436-
#{Node := _} ->
1437-
%% Queue is aware and uid for targeted node exists, do nothing
1438-
QTypeState0;
1439-
_ ->
1440-
%% Queue is aware but current node has no UId, regen uid
1441-
QTypeState0#{nodes => Nodes#{Node => NewRaUId}}
1442-
end,
1431+
L when is_list(L) ->
1432+
%% Queue is not aware of node to uid mapping, just add the new node
1433+
QTypeState0#{nodes => lists:usort([Node | Nodes])};
1434+
#{Node := _} ->
1435+
%% Queue is aware and uid for targeted node exists, do nothing
1436+
QTypeState0;
1437+
_ ->
1438+
%% Queue is aware but current node has no UId, regen uid
1439+
QTypeState0#{nodes => Nodes#{Node => NewRaUId}}
1440+
end,
14431441
Q = amqqueue:set_type_state(Q0, QTypeState),
14441442
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
14451443
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
@@ -2060,13 +2058,13 @@ make_ra_conf(Q, ServerId, TickTimeout,
20602058
[{ClusterName, _} | _] = Members = members(Q),
20612059
{_, Node} = ServerId,
20622060
UId = case amqqueue:get_type_state(Q) of
2063-
#{nodes := #{Node := Id}} ->
2064-
Id;
2065-
_ ->
2066-
%% Queue was declared on an older version of RabbitMQ
2067-
%% and does not have the node to uid mappings
2068-
ra:new_uid(ra_lib:to_binary(ClusterName))
2069-
end,
2061+
#{nodes := #{Node := Id}} ->
2062+
Id;
2063+
_ ->
2064+
%% Queue was declared on an older version of RabbitMQ
2065+
%% or does not have the node to uid mappings
2066+
ra:new_uid(ra_lib:to_binary(ClusterName))
2067+
end,
20702068
FName = rabbit_misc:rs(QName),
20712069
Formatter = {?MODULE, format_ra_event, [QName]},
20722070
LogCfg = #{uid => UId,
@@ -2210,7 +2208,7 @@ force_checkpoint_on_queue(QName) ->
22102208
{ok, Q} when ?amqqueue_is_quorum(Q) ->
22112209
{RaName, _} = amqqueue:get_pid(Q),
22122210
?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2213-
Nodes = rabbit_amqqueue:get_nodes(Q),
2211+
Nodes = rabbit_queue_type:get_nodes(Q),
22142212
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
22152213
|| Node <- Nodes],
22162214
ok;
@@ -2480,8 +2478,3 @@ queue_vm_stats_sups() ->
24802478
queue_vm_ets() ->
24812479
{[quorum_ets],
24822480
[[ra_log_ets]]}.
2483-
2484-
has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) ->
2485-
true;
2486-
has_uuid_tracking(_QTypeState) ->
2487-
false.

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ stop() ->
154154
new_stream(Q, LeaderNode)
155155
when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
156156
#{name := StreamId} = amqqueue:get_type_state(Q),
157-
Nodes = rabbit_amqqueue:get_nodes(Q),
157+
Nodes = rabbit_queue_type:get_nodes(Q),
158158
%% assertion leader is in nodes configuration
159159
true = lists:member(LeaderNode, Nodes),
160160
process_command({new_stream, StreamId,

deps/rabbitmq_ct_helpers/src/queue_utils.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ assert_number_of_replicas(Config, Server, VHost, QQ, Count) ->
208208
begin
209209
{ok, Q} = rabbit_ct_broker_helpers:rpc(
210210
Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]),
211-
Nodes = rabbit_amqqueue:get_nodes(Q),
211+
Nodes = rabbit_queue_type:get_nodes(Q),
212212
length(Nodes)
213213
end,
214214
30000).

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) ->
451451
true -> Acc;
452452
false ->
453453
Type = amqqueue:get_type(Q),
454-
Members = rabbit_amqqueue:get_nodes(Q),
454+
Members = rabbit_queue_type:get_nodes(Q),
455455
case membership(amqqueue:get_pid(Q), Members) of
456456
not_a_member ->
457457
Acc;

0 commit comments

Comments
 (0)