Skip to content

Commit 60b5a13

Browse files
committed
Apply PR suggestions
1 parent 2c91964 commit 60b5a13

File tree

8 files changed

+57
-65
lines changed

8 files changed

+57
-65
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
471471
{Leader :: node() | none, Replicas :: [node(),...]}.
472472
queue_topology(Q) ->
473473
Leader = amqqueue:get_leader_node(Q),
474-
Replicas = rabbit_amqqueue:get_nodes(Q),
474+
Replicas = rabbit_queue_type:get_nodes(Q),
475475
{Leader, Replicas}.
476476

477477
decode_exchange({map, KVList}) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
-export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
2424
emit_info_all/5, list_local/1, info_local/1,
2525
emit_info_local/4, emit_info_down/4]).
26-
-export([get_nodes/1]).
2726
-export([count/0]).
2827
-export([list_down/1, list_down/2, list_all/1,
2928
count/1, list_names/0, list_names/1, list_local_names/0,
@@ -1232,12 +1231,6 @@ list() ->
12321231
count() ->
12331232
rabbit_db_queue:count().
12341233

1235-
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
1236-
1237-
get_nodes(Q) ->
1238-
[{members, Nodes}] = info(Q, [members]),
1239-
Nodes.
1240-
12411234
-spec list_names() -> [name()].
12421235

12431236
list_names() ->
@@ -2051,7 +2044,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
20512044
).
20522045

20532046
get_quorum_nodes(Q) ->
2054-
rabbit_amqqueue:get_nodes(Q).
2047+
rabbit_queue_type:get_nodes(Q).
20552048

20562049
-spec prepend_extra_bcc(Qs) ->
20572050
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].

deps/rabbit/src/rabbit_queue_location.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) ->
143143
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]),
144144
Queues = GetQueues(),
145145
Counters = lists:foldl(fun(Q, Acc) ->
146-
Nodes = rabbit_amqqueue:get_nodes(Q),
146+
Nodes = rabbit_queue_type:get_nodes(Q),
147147
lists:foldl(fun(N, A)
148148
when is_map_key(N, A) ->
149149
maps:update_with(N, fun(C) -> C+1 end, A);

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
format/2,
3939
remove/2,
4040
info/2,
41+
get_nodes/1,
4142
state_info/1,
4243
format_status/1,
4344
info_down/2,
@@ -413,6 +414,11 @@ info(Q, Items) ->
413414
Mod = amqqueue:get_type(Q),
414415
Mod:info(Q, Items).
415416

417+
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
418+
get_nodes(Q) ->
419+
[{members, Nodes}] = info(Q, [members]),
420+
Nodes.
421+
416422
fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) ->
417423
maps:fold(Fun, Acc, Ctxs).
418424

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -272,13 +272,13 @@ start_cluster(Q) ->
272272
|| Node <- [LeaderNode | FollowerNodes]]),
273273
NewQ0 = amqqueue:set_pid(Q, LeaderId),
274274
NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
275-
false ->
276-
amqqueue:set_type_state(NewQ0,
277-
#{nodes => [LeaderNode | FollowerNodes]});
278-
true ->
279-
amqqueue:set_type_state(NewQ0,
280-
#{nodes => UIDs})
281-
end,
275+
false ->
276+
amqqueue:set_type_state(NewQ0,
277+
#{nodes => [LeaderNode | FollowerNodes]});
278+
true ->
279+
amqqueue:set_type_state(NewQ0,
280+
#{nodes => UIDs})
281+
end,
282282

283283
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
284284
rabbit_fifo, version, [],
@@ -733,16 +733,14 @@ repair_amqqueue_nodes(Q0) ->
733733
%% update amqqueue record
734734
Fun = fun (Q) ->
735735
TS0 = amqqueue:get_type_state(Q),
736-
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids)
737-
andalso has_uuid_tracking(TS0)
738-
of
736+
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
739737
false ->
740738
TS0#{nodes => RaNodes};
741739
true ->
742740
RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of,
743-
[?RA_SYSTEM, Name],
744-
?RPC_TIMEOUT)}
745-
|| N <- RaNodes]),
741+
[?RA_SYSTEM, Name],
742+
?RPC_TIMEOUT)}
743+
|| N <- RaNodes]),
746744
TS0#{nodes => RaUids}
747745
end,
748746
amqqueue:set_type_state(Q, TS)
@@ -803,28 +801,28 @@ maybe_apply_policies(Q, #{config := CurrentConfig}) ->
803801
{[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
804802
recover(_Vhost, Queues) ->
805803
lists:foldl(
806-
fun (Q0, {R0, F0}) ->
807-
{Name, _} = amqqueue:get_pid(Q0),
804+
fun (Q, {R0, F0}) ->
805+
{Name, _} = amqqueue:get_pid(Q),
808806
ServerId = {Name, node()},
809-
QName = amqqueue:get_name(Q0),
810-
MutConf = make_mutable_config(Q0),
807+
QName = amqqueue:get_name(Q),
808+
MutConf = make_mutable_config(Q),
811809
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
812-
#{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0),
813-
QTypeState = case Nodes of
810+
#{nodes := Nodes} = amqqueue:get_type_state(Q),
811+
case Nodes of
814812
List when is_list(List) ->
815813
%% Queue is not aware of node to uid mapping, do nothing
816-
QTypeState0;
814+
ok;
817815
#{node() := RaUId} ->
818-
%% Queue is aware and uid for current node is correct, do nothing
819-
QTypeState0;
820-
_ ->
821-
%% Queue is aware but either current node has no UId or it
822-
%% does not match the one returned by ra_directory, regen uid
823-
maybe_delete_data_dir(RaUId),
824-
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
825-
QTypeState0#{nodes := Nodes#{node() => NewRaUId}}
816+
%% Queue is aware and uid for current node is correct, do
817+
%% nothing
818+
ok;
819+
#{node() := _NewRaUId} ->
820+
%% Queue is aware but it does not match the one returned by
821+
%% ra_directory
822+
rabbit_log:info("Quorum queue ~ts: detected node uuid change, "
823+
"deleting old data directory", [rabbit_misc:rs(QName)]),
824+
maybe_delete_data_dir(RaUId)
826825
end,
827-
Q = amqqueue:set_type_state(Q0, QTypeState),
828826
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
829827
ok ->
830828
% queue was restarted, good
@@ -1426,16 +1424,16 @@ do_add_member(Q0, Node, Membership, Timeout)
14261424
QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0),
14271425
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
14281426
QTypeState = case Nodes of
1429-
L when is_list(L) ->
1430-
%% Queue is not aware of node to uid mapping, just add the new node
1431-
QTypeState0#{nodes => lists:usort([Node | Nodes])};
1432-
#{Node := _} ->
1433-
%% Queue is aware and uid for targeted node exists, do nothing
1434-
QTypeState0;
1435-
_ ->
1436-
%% Queue is aware but current node has no UId, regen uid
1437-
QTypeState0#{nodes => Nodes#{Node => NewRaUId}}
1438-
end,
1427+
L when is_list(L) ->
1428+
%% Queue is not aware of node to uid mapping, just add the new node
1429+
QTypeState0#{nodes => lists:usort([Node | Nodes])};
1430+
#{Node := _} ->
1431+
%% Queue is aware and uid for targeted node exists, do nothing
1432+
QTypeState0;
1433+
_ ->
1434+
%% Queue is aware but current node has no UId, regen uid
1435+
QTypeState0#{nodes => Nodes#{Node => NewRaUId}}
1436+
end,
14391437
Q = amqqueue:set_type_state(Q0, QTypeState),
14401438
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
14411439
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
@@ -2049,13 +2047,13 @@ make_ra_conf(Q, ServerId, TickTimeout,
20492047
[{ClusterName, _} | _] = Members = members(Q),
20502048
{_, Node} = ServerId,
20512049
UId = case amqqueue:get_type_state(Q) of
2052-
#{nodes := #{Node := Id}} ->
2053-
Id;
2054-
_ ->
2055-
%% Queue was declared on an older version of RabbitMQ
2056-
%% and does not have the node to uid mappings
2057-
ra:new_uid(ra_lib:to_binary(ClusterName))
2058-
end,
2050+
#{nodes := #{Node := Id}} ->
2051+
Id;
2052+
_ ->
2053+
%% Queue was declared on an older version of RabbitMQ
2054+
%% or does not have the node to uid mappings
2055+
ra:new_uid(ra_lib:to_binary(ClusterName))
2056+
end,
20592057
FName = rabbit_misc:rs(QName),
20602058
Formatter = {?MODULE, format_ra_event, [QName]},
20612059
LogCfg = #{uid => UId,
@@ -2199,7 +2197,7 @@ force_checkpoint_on_queue(QName) ->
21992197
{ok, Q} when ?amqqueue_is_quorum(Q) ->
22002198
{RaName, _} = amqqueue:get_pid(Q),
22012199
?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2202-
Nodes = rabbit_amqqueue:get_nodes(Q),
2200+
Nodes = rabbit_queue_type:get_nodes(Q),
22032201
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
22042202
|| Node <- Nodes],
22052203
ok;
@@ -2472,8 +2470,3 @@ queue_vm_ets() ->
24722470

24732471
tick_interval() ->
24742472
application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).
2475-
2476-
has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) ->
2477-
true;
2478-
has_uuid_tracking(_QTypeState) ->
2479-
false.

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ stop() ->
154154
new_stream(Q, LeaderNode)
155155
when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
156156
#{name := StreamId} = amqqueue:get_type_state(Q),
157-
Nodes = rabbit_amqqueue:get_nodes(Q),
157+
Nodes = rabbit_queue_type:get_nodes(Q),
158158
%% assertion leader is in nodes configuration
159159
true = lists:member(LeaderNode, Nodes),
160160
process_command({new_stream, StreamId,

deps/rabbitmq_ct_helpers/src/queue_utils.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ assert_number_of_replicas(Config, Server, VHost, QQ, Count) ->
208208
begin
209209
{ok, Q} = rabbit_ct_broker_helpers:rpc(
210210
Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]),
211-
Nodes = rabbit_amqqueue:get_nodes(Q),
211+
Nodes = rabbit_queue_type:get_nodes(Q),
212212
length(Nodes)
213213
end,
214214
30000).

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) ->
451451
true -> Acc;
452452
false ->
453453
Type = amqqueue:get_type(Q),
454-
Members = rabbit_amqqueue:get_nodes(Q),
454+
Members = rabbit_queue_type:get_nodes(Q),
455455
case membership(amqqueue:get_pid(Q), Members) of
456456
not_a_member ->
457457
Acc;

0 commit comments

Comments
 (0)