Skip to content

Commit 2c91964

Browse files
Reconcile QQ node dead during delete and redeclare
Co-authored-by: Péter Gömöri <[email protected]>
1 parent 8b45047 commit 2c91964

File tree

10 files changed

+282
-57
lines changed

10 files changed

+282
-57
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
% exclusive_owner
3131
get_exclusive_owner/1,
3232
get_leader_node/1,
33-
get_nodes/1,
3433
% name (#resource)
3534
get_name/1,
3635
set_name/2,
@@ -425,15 +424,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
425424
get_leader_node(#amqqueue{pid = none}) -> none;
426425
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).
427426

428-
-spec get_nodes(amqqueue_v2()) -> [node(),...].
429-
430-
get_nodes(Q) ->
431-
case amqqueue:get_type_state(Q) of
432-
#{nodes := Nodes} ->
433-
Nodes;
434-
_ ->
435-
[get_leader_node(Q)]
436-
end.
437427

438428
% operator_policy
439429

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
471471
{Leader :: node() | none, Replicas :: [node(),...]}.
472472
queue_topology(Q) ->
473473
Leader = amqqueue:get_leader_node(Q),
474-
Replicas = amqqueue:get_nodes(Q),
474+
Replicas = rabbit_amqqueue:get_nodes(Q),
475475
{Leader, Replicas}.
476476

477477
decode_exchange({map, KVList}) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
-export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
2424
emit_info_all/5, list_local/1, info_local/1,
2525
emit_info_local/4, emit_info_down/4]).
26+
-export([get_nodes/1]).
2627
-export([count/0]).
2728
-export([list_down/1, list_down/2, list_all/1,
2829
count/1, list_names/0, list_names/1, list_local_names/0,
@@ -1231,6 +1232,12 @@ list() ->
12311232
count() ->
12321233
rabbit_db_queue:count().
12331234

1235+
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
1236+
1237+
get_nodes(Q) ->
1238+
[{members, Nodes}] = info(Q, [members]),
1239+
Nodes.
1240+
12341241
-spec list_names() -> [name()].
12351242

12361243
list_names() ->
@@ -2044,12 +2051,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
20442051
).
20452052

20462053
get_quorum_nodes(Q) ->
2047-
case amqqueue:get_type_state(Q) of
2048-
#{nodes := Nodes} ->
2049-
Nodes;
2050-
_ ->
2051-
[]
2052-
end.
2054+
rabbit_amqqueue:get_nodes(Q).
20532055

20542056
-spec prepend_extra_bcc(Qs) ->
20552057
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,10 @@
218218
depends_on => ['rabbitmq_4.1.0'],
219219
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
220220
}}).
221+
222+
-rabbit_feature_flag(
223+
{'track_qq_members_uids',
224+
#{desc => "Track queue members UIDs in the metadata store",
225+
stability => stable,
226+
depends_on => []
227+
}}).

deps/rabbit/src/rabbit_queue_location.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) ->
143143
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]),
144144
Queues = GetQueues(),
145145
Counters = lists:foldl(fun(Q, Acc) ->
146-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
146+
Nodes = rabbit_amqqueue:get_nodes(Q),
147147
lists:foldl(fun(N, A)
148148
when is_map_key(N, A) ->
149149
maps:update_with(N, fun(C) -> C+1 end, A);

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 104 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,17 @@ start_cluster(Q) ->
268268
{LeaderNode, FollowerNodes} =
269269
rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
270270
LeaderId = {RaName, LeaderNode},
271+
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
272+
|| Node <- [LeaderNode | FollowerNodes]]),
271273
NewQ0 = amqqueue:set_pid(Q, LeaderId),
272-
NewQ1 = amqqueue:set_type_state(NewQ0,
273-
#{nodes => [LeaderNode | FollowerNodes]}),
274+
NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
275+
false ->
276+
amqqueue:set_type_state(NewQ0,
277+
#{nodes => [LeaderNode | FollowerNodes]});
278+
true ->
279+
amqqueue:set_type_state(NewQ0,
280+
#{nodes => UIDs})
281+
end,
274282

275283
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
276284
rabbit_fifo, version, [],
@@ -716,7 +724,7 @@ repair_amqqueue_nodes(Q0) ->
716724
{Name, _} = amqqueue:get_pid(Q0),
717725
Members = ra_leaderboard:lookup_members(Name),
718726
RaNodes = [N || {_, N} <- Members],
719-
#{nodes := Nodes} = amqqueue:get_type_state(Q0),
727+
Nodes = get_nodes(Q0),
720728
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
721729
true ->
722730
%% up to date
@@ -725,7 +733,18 @@ repair_amqqueue_nodes(Q0) ->
725733
%% update amqqueue record
726734
Fun = fun (Q) ->
727735
TS0 = amqqueue:get_type_state(Q),
728-
TS = TS0#{nodes => RaNodes},
736+
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids)
737+
andalso has_uuid_tracking(TS0)
738+
of
739+
false ->
740+
TS0#{nodes => RaNodes};
741+
true ->
742+
RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of,
743+
[?RA_SYSTEM, Name],
744+
?RPC_TIMEOUT)}
745+
|| N <- RaNodes]),
746+
TS0#{nodes => RaUids}
747+
end,
729748
amqqueue:set_type_state(Q, TS)
730749
end,
731750
_ = rabbit_amqqueue:update(QName, Fun),
@@ -789,6 +808,23 @@ recover(_Vhost, Queues) ->
789808
ServerId = {Name, node()},
790809
QName = amqqueue:get_name(Q0),
791810
MutConf = make_mutable_config(Q0),
811+
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
812+
#{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0),
813+
QTypeState = case Nodes of
814+
List when is_list(List) ->
815+
%% Queue is not aware of node to uid mapping, do nothing
816+
QTypeState0;
817+
#{node() := RaUId} ->
818+
%% Queue is aware and uid for current node is correct, do nothing
819+
QTypeState0;
820+
_ ->
821+
%% Queue is aware but either current node has no UId or it
822+
%% does not match the one returned by ra_directory, regen uid
823+
maybe_delete_data_dir(RaUId),
824+
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
825+
QTypeState0#{nodes := Nodes#{node() => NewRaUId}}
826+
end,
827+
Q = amqqueue:set_type_state(Q0, QTypeState),
792828
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
793829
ok ->
794830
% queue was restarted, good
@@ -801,7 +837,7 @@ recover(_Vhost, Queues) ->
801837
[rabbit_misc:rs(QName), Err1]),
802838
% queue was never started on this node
803839
% so needs to be started from scratch.
804-
case start_server(make_ra_conf(Q0, ServerId)) of
840+
case start_server(make_ra_conf(Q, ServerId)) of
805841
ok -> ok;
806842
Err2 ->
807843
?LOG_WARNING("recover: quorum queue ~w could not"
@@ -823,8 +859,7 @@ recover(_Vhost, Queues) ->
823859
%% present in the rabbit_queue table and not just in
824860
%% rabbit_durable_queue
825861
%% So many code paths are dependent on this.
826-
ok = rabbit_db_queue:set_dirty(Q0),
827-
Q = Q0,
862+
ok = rabbit_db_queue:set_dirty(Q),
828863
case Res of
829864
ok ->
830865
{[Q | R0], F0};
@@ -1204,12 +1239,17 @@ cleanup_data_dir() ->
12041239
maybe_delete_data_dir(UId) ->
12051240
_ = ra_directory:unregister_name(?RA_SYSTEM, UId),
12061241
Dir = ra_env:server_data_dir(?RA_SYSTEM, UId),
1207-
{ok, Config} = ra_log:read_config(Dir),
1208-
case maps:get(machine, Config) of
1209-
{module, rabbit_fifo, _} ->
1210-
ra_lib:recursive_delete(Dir);
1211-
_ ->
1212-
ok
1242+
case filelib:is_dir(Dir) of
1243+
false ->
1244+
ok;
1245+
true ->
1246+
{ok, Config} = ra_log:read_config(Dir),
1247+
case maps:get(machine, Config) of
1248+
{module, rabbit_fifo, _} ->
1249+
ra_lib:recursive_delete(Dir);
1250+
_ ->
1251+
ok
1252+
end
12131253
end.
12141254

12151255
policy_changed(Q) ->
@@ -1374,16 +1414,29 @@ add_member(Q, Node, Membership) ->
13741414
do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT).
13751415

13761416

1377-
do_add_member(Q, Node, Membership, Timeout)
1378-
when ?is_amqqueue(Q) andalso
1379-
?amqqueue_is_quorum(Q) andalso
1417+
do_add_member(Q0, Node, Membership, Timeout)
1418+
when ?is_amqqueue(Q0) andalso
1419+
?amqqueue_is_quorum(Q0) andalso
13801420
is_atom(Node) ->
1381-
{RaName, _} = amqqueue:get_pid(Q),
1382-
QName = amqqueue:get_name(Q),
1421+
{RaName, _} = amqqueue:get_pid(Q0),
1422+
QName = amqqueue:get_name(Q0),
13831423
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
13841424
ServerId = {RaName, Node},
1385-
Members = members(Q),
1386-
1425+
Members = members(Q0),
1426+
QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0),
1427+
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1428+
QTypeState = case Nodes of
1429+
L when is_list(L) ->
1430+
%% Queue is not aware of node to uid mapping, just add the new node
1431+
QTypeState0#{nodes => lists:usort([Node | Nodes])};
1432+
#{Node := _} ->
1433+
%% Queue is aware and uid for targeted node exists, do nothing
1434+
QTypeState0;
1435+
_ ->
1436+
%% Queue is aware but current node has no UId, regen uid
1437+
QTypeState0#{nodes => Nodes#{Node => NewRaUId}}
1438+
end,
1439+
Q = amqqueue:set_type_state(Q0, QTypeState),
13871440
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
13881441
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
13891442
case ra:start_server(?RA_SYSTEM, Conf) of
@@ -1393,8 +1446,12 @@ do_add_member(Q, Node, Membership, Timeout)
13931446
{ok, {RaIndex, RaTerm}, Leader} ->
13941447
Fun = fun(Q1) ->
13951448
Q2 = update_type_state(
1396-
Q1, fun(#{nodes := Nodes} = Ts) ->
1397-
Ts#{nodes => lists:usort([Node | Nodes])}
1449+
Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) ->
1450+
Ts#{nodes => lists:usort([Node | NodesList])};
1451+
(#{nodes := #{Node := _}} = Ts) ->
1452+
Ts;
1453+
(#{nodes := NodesMap} = Ts) when is_map(NodesMap) ->
1454+
Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)}
13981455
end),
13991456
amqqueue:set_pid(Q2, Leader)
14001457
end,
@@ -1467,8 +1524,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
14671524
Fun = fun(Q1) ->
14681525
update_type_state(
14691526
Q1,
1470-
fun(#{nodes := Nodes} = Ts) ->
1471-
Ts#{nodes => lists:delete(Node, Nodes)}
1527+
fun(#{nodes := Nodes} = Ts) when is_list(Nodes) ->
1528+
Ts#{nodes => lists:delete(Node, Nodes)};
1529+
(#{nodes := Nodes} = Ts) when is_map(Nodes) ->
1530+
Ts#{nodes => maps:remove(Node, Nodes)}
14721531
end)
14731532
end,
14741533
_ = rabbit_amqqueue:update(QName, Fun),
@@ -1988,7 +2047,15 @@ make_ra_conf(Q, ServerId, TickTimeout,
19882047
#resource{name = QNameBin} = QName,
19892048
RaMachine = ra_machine(Q),
19902049
[{ClusterName, _} | _] = Members = members(Q),
1991-
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
2050+
{_, Node} = ServerId,
2051+
UId = case amqqueue:get_type_state(Q) of
2052+
#{nodes := #{Node := Id}} ->
2053+
Id;
2054+
_ ->
2055+
%% Queue was declared on an older version of RabbitMQ
2056+
%% and does not have the node to uid mappings
2057+
ra:new_uid(ra_lib:to_binary(ClusterName))
2058+
end,
19922059
FName = rabbit_misc:rs(QName),
19932060
Formatter = {?MODULE, format_ra_event, [QName]},
19942061
LogCfg = #{uid => UId,
@@ -2020,7 +2087,12 @@ make_mutable_config(Q) ->
20202087

20212088
get_nodes(Q) when ?is_amqqueue(Q) ->
20222089
#{nodes := Nodes} = amqqueue:get_type_state(Q),
2023-
Nodes.
2090+
case Nodes of
2091+
List when is_list(List) ->
2092+
List;
2093+
Map when is_map(Map) ->
2094+
maps:keys(Map)
2095+
end.
20242096

20252097
get_connected_nodes(Q) when ?is_amqqueue(Q) ->
20262098
ErlangNodes = [node() | nodes()],
@@ -2127,7 +2199,7 @@ force_checkpoint_on_queue(QName) ->
21272199
{ok, Q} when ?amqqueue_is_quorum(Q) ->
21282200
{RaName, _} = amqqueue:get_pid(Q),
21292201
?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2130-
Nodes = amqqueue:get_nodes(Q),
2202+
Nodes = rabbit_amqqueue:get_nodes(Q),
21312203
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
21322204
|| Node <- Nodes],
21332205
ok;
@@ -2400,3 +2472,8 @@ queue_vm_ets() ->
24002472

24012473
tick_interval() ->
24022474
application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).
2475+
2476+
has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) ->
2477+
true;
2478+
has_uuid_tracking(_QTypeState) ->
2479+
false.

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ stop() ->
153153

154154
new_stream(Q, LeaderNode)
155155
when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
156-
#{name := StreamId,
157-
nodes := Nodes} = amqqueue:get_type_state(Q),
156+
#{name := StreamId} = amqqueue:get_type_state(Q),
157+
Nodes = rabbit_amqqueue:get_nodes(Q),
158158
%% assertion leader is in nodes configuration
159159
true = lists:member(LeaderNode, Nodes),
160160
process_command({new_stream, StreamId,

0 commit comments

Comments
 (0)