Skip to content

Commit 0ffb2d0

Browse files
committed
Use initial_machine_version config to avoid initalising
from rabbit_fifo version 0. The same was also implemented for the stream coordinator. QQ: avoid dead lock in queue federation. When processing the queue federation startup even the process may call back into the ra process causing a deadlock. in this case we spawn a temporary process to avoid this.
1 parent bc47fbd commit 0ffb2d0

File tree

3 files changed

+52
-20
lines changed

3 files changed

+52
-20
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,16 @@ start_cluster(Q) ->
259259
NewQ1 = amqqueue:set_type_state(NewQ0,
260260
#{nodes => [LeaderNode | FollowerNodes]}),
261261

262-
rabbit_log:debug("Will start up to ~w replicas for quorum ~ts with leader on node '~ts'",
263-
[QuorumSize, rabbit_misc:rs(QName), LeaderNode]),
262+
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
263+
rabbit_fifo, version, [])],
264+
MinVersion = lists:min([rabbit_fifo:version() | Versions]),
265+
266+
rabbit_log:debug("Will start up to ~w replicas for quorum queue ~ts with "
267+
"leader on node '~ts', initial machine version ~b",
268+
[QuorumSize, rabbit_misc:rs(QName), LeaderNode, MinVersion]),
264269
case rabbit_amqqueue:internal_declare(NewQ1, false) of
265270
{created, NewQ} ->
266-
RaConfs = [make_ra_conf(NewQ, ServerId)
271+
RaConfs = [make_ra_conf(NewQ, ServerId, voter, MinVersion)
267272
|| ServerId <- members(NewQ)],
268273

269274
%% khepri projections on remote nodes are eventually consistent
@@ -544,6 +549,10 @@ spawn_deleter(QName) ->
544549
delete(Q, false, false, <<"expired">>)
545550
end).
546551

552+
spawn_notify_decorators(QName, startup = Fun, Args) ->
553+
spawn(fun() ->
554+
notify_decorators(QName, Fun, Args)
555+
end);
547556
spawn_notify_decorators(QName, Fun, Args) ->
548557
%% run in ra process for now
549558
catch notify_decorators(QName, Fun, Args).
@@ -860,7 +869,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
860869
notify_decorators(QName, shutdown),
861870
case delete_queue_data(Q, ActingUser) of
862871
ok ->
863-
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
872+
_ = erpc_call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
864873
?RPC_TIMEOUT),
865874
{ok, ReadyMsgs};
866875
{error, timeout} = Err ->
@@ -1339,7 +1348,9 @@ add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) ->
13391348
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
13401349
ServerId = {RaName, Node},
13411350
Members = members(Q),
1342-
Conf = make_ra_conf(Q, ServerId, Membership),
1351+
1352+
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
1353+
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
13431354
case ra:start_server(?RA_SYSTEM, Conf) of
13441355
ok ->
13451356
ServerIdSpec =
@@ -1383,8 +1394,9 @@ add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) ->
13831394
E
13841395
end;
13851396
E ->
1386-
rabbit_log:warning("Could not add a replica of quorum ~ts on node ~ts: ~p", [rabbit_misc:rs(QName), Node, E]),
1387-
E
1397+
rabbit_log:warning("Could not add a replica of quorum ~ts on node ~ts: ~p",
1398+
[rabbit_misc:rs(QName), Node, E]),
1399+
E
13881400
end.
13891401

13901402
delete_member(VHost, Name, Node) ->
@@ -1912,9 +1924,10 @@ format_ra_event(ServerId, Evt, QRef) ->
19121924
{'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.
19131925

19141926
make_ra_conf(Q, ServerId) ->
1915-
make_ra_conf(Q, ServerId, voter).
1927+
make_ra_conf(Q, ServerId, voter, rabbit_fifo:version()).
19161928

1917-
make_ra_conf(Q, ServerId, Membership) ->
1929+
make_ra_conf(Q, ServerId, Membership, MacVersion)
1930+
when is_integer(MacVersion) ->
19181931
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
19191932
?TICK_INTERVAL),
19201933
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
@@ -1923,10 +1936,12 @@ make_ra_conf(Q, ServerId, Membership) ->
19231936
quorum_min_checkpoint_interval,
19241937
?MIN_CHECKPOINT_INTERVAL),
19251938
make_ra_conf(Q, ServerId, TickTimeout,
1926-
SnapshotInterval, CheckpointInterval, Membership).
1939+
SnapshotInterval, CheckpointInterval,
1940+
Membership, MacVersion).
19271941

19281942
make_ra_conf(Q, ServerId, TickTimeout,
1929-
SnapshotInterval, CheckpointInterval, Membership) ->
1943+
SnapshotInterval, CheckpointInterval,
1944+
Membership, MacVersion) ->
19301945
QName = amqqueue:get_name(Q),
19311946
RaMachine = ra_machine(Q),
19321947
[{ClusterName, _} | _] = Members = members(Q),
@@ -1947,6 +1962,7 @@ make_ra_conf(Q, ServerId, TickTimeout,
19471962
log_init_args => LogCfg,
19481963
tick_timeout => TickTimeout,
19491964
machine => RaMachine,
1965+
initial_machine_version => MacVersion,
19501966
ra_event_formatter => Formatter}).
19511967

19521968
make_mutable_config(Q) ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
55
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6-
%%
76

87
-module(rabbit_stream_coordinator).
98

@@ -493,9 +492,17 @@ locally_known_members() ->
493492

494493
start_coordinator_cluster() ->
495494
Nodes = rabbit_nodes:list_reachable(),
496-
rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]),
497495
true = Nodes =/= [],
498-
case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of
496+
497+
Versions = [V || {ok, V} <- erpc:multicall(Nodes,
498+
?MODULE, version, [])],
499+
MinVersion = lists:min([version() | Versions]),
500+
rabbit_log:debug("Starting stream coordinator on nodes: ~w, "
501+
"initial machine version ~b",
502+
[Nodes, MinVersion]),
503+
case ra:start_cluster(?RA_SYSTEM,
504+
[make_ra_conf(Node, Nodes, MinVersion)
505+
|| Node <- Nodes]) of
499506
{ok, Started, _} ->
500507
rabbit_log:debug("Started stream coordinator on ~w", [Started]),
501508
Started;
@@ -813,7 +820,8 @@ maybe_resize_coordinator_cluster() ->
813820
end).
814821

815822
add_member(Members, Node) ->
816-
Conf = make_ra_conf(Node, [N || {_, N} <- Members]),
823+
MinMacVersion = erpc:call(Node, ?MODULE, version, []),
824+
Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion),
817825
ServerId = {?MODULE, Node},
818826
case ra:start_server(?RA_SYSTEM, Conf) of
819827
ok ->
@@ -1255,7 +1263,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
12551263
format_ra_event(ServerId, Evt) ->
12561264
{stream_coordinator_event, ServerId, Evt}.
12571265

1258-
make_ra_conf(Node, Nodes) ->
1266+
make_ra_conf(Node, Nodes, MinMacVersion) ->
12591267
UId = ra:new_uid(ra_lib:to_binary(?MODULE)),
12601268
Formatter = {?MODULE, format_ra_event, []},
12611269
Members = [{?MODULE, N} || N <- Nodes],
@@ -1270,6 +1278,7 @@ make_ra_conf(Node, Nodes) ->
12701278
log_init_args => #{uid => UId},
12711279
tick_timeout => TickTimeout,
12721280
machine => {module, ?MODULE, #{}},
1281+
initial_machine_version => MinMacVersion,
12731282
ra_event_formatter => Formatter}.
12741283

12751284
filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamId,

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ init_per_testcase(TestCase, Config) ->
7676
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
7777
meck:new(rabbit_feature_flags, []),
7878
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
79+
meck:expect(rabbit_feature_flags, is_enabled, fun (_, _) -> true end),
7980
ra_server_sup_sup:remove_all(?RA_SYSTEM),
8081
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
8182
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
@@ -941,10 +942,16 @@ discard_next_delivery(ClusterName, State0, Wait) ->
941942
end.
942943

943944
start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
944-
{ok, Started, _} = ra:start_cluster(?RA_SYSTEM,
945-
ClusterName#resource.name,
946-
{module, rabbit_fifo, RaFifoConfig},
947-
ServerIds),
945+
UId = ra:new_uid(ra_lib:to_binary(ClusterName#resource.name)),
946+
Confs = [#{id => Id,
947+
uid => UId,
948+
cluster_name => ClusterName#resource.name,
949+
log_init_args => #{uid => UId},
950+
initial_members => ServerIds,
951+
initial_machine_version => rabbit_fifo:version(),
952+
machine => {module, rabbit_fifo, RaFifoConfig}}
953+
|| Id <- ServerIds],
954+
{ok, Started, _} = ra:start_cluster(?RA_SYSTEM, Confs),
948955
?assertEqual(length(Started), length(ServerIds)),
949956
ok.
950957

0 commit comments

Comments
 (0)