Skip to content

Commit 562756c

Browse files
committed
Use initial_machine_version config to avoid initalising
from rabbit_fifo version 0. QQ: avoid dead lock in queue federation. When processing the queue federation startup even the process may call back into the ra process causing a deadlock. in this case we spawn a temporary process to avoid this.
1 parent 4c30372 commit 562756c

File tree

5 files changed

+41
-17
lines changed

5 files changed

+41
-17
lines changed

MODULE.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ erlang_package.hex_package(
253253
name = "ra",
254254
build_file = "@rabbitmq-server//bazel:BUILD.ra",
255255
pkg = "ra",
256-
sha256 = "a7eae50b0c1c0be4daf9b7ee97be796e6fda372ea6b2047c3aeac89cdc2011df",
257-
version = "2.16.0-pre.10",
256+
sha256 = "cfc0dbe5ebbd54f44081f95ea6a1daeb28a89df82aa9baa234f68abbb36bdc67",
257+
version = "2.16.0-pre.11",
258258
)
259259

260260
erlang_package.git_package(

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,14 @@ start_cluster(Q) ->
259259
NewQ1 = amqqueue:set_type_state(NewQ0,
260260
#{nodes => [LeaderNode | FollowerNodes]}),
261261

262+
Versions = erpc:multicall(FollowerNodes, rabbit_fifo, version, []),
263+
MinVersion = lists:min([rabbit_fifo:version() | Versions]),
264+
262265
rabbit_log:debug("Will start up to ~w replicas for quorum ~ts with leader on node '~ts'",
263266
[QuorumSize, rabbit_misc:rs(QName), LeaderNode]),
264267
case rabbit_amqqueue:internal_declare(NewQ1, false) of
265268
{created, NewQ} ->
266-
RaConfs = [make_ra_conf(NewQ, ServerId)
269+
RaConfs = [make_ra_conf(NewQ, ServerId, voter, MinVersion)
267270
|| ServerId <- members(NewQ)],
268271

269272
%% khepri projections on remote nodes are eventually consistent
@@ -544,6 +547,10 @@ spawn_deleter(QName) ->
544547
delete(Q, false, false, <<"expired">>)
545548
end).
546549

550+
spawn_notify_decorators(QName, startup = Fun, Args) ->
551+
spawn(fun() ->
552+
notify_decorators(QName, Fun, Args)
553+
end);
547554
spawn_notify_decorators(QName, Fun, Args) ->
548555
%% run in ra process for now
549556
catch notify_decorators(QName, Fun, Args).
@@ -1339,7 +1346,9 @@ add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) ->
13391346
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
13401347
ServerId = {RaName, Node},
13411348
Members = members(Q),
1342-
Conf = make_ra_conf(Q, ServerId, Membership),
1349+
1350+
MachineVersion = erpc:call(Node, rabbit_fifo, version, []),
1351+
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
13431352
case ra:start_server(?RA_SYSTEM, Conf) of
13441353
ok ->
13451354
ServerIdSpec =
@@ -1912,9 +1921,9 @@ format_ra_event(ServerId, Evt, QRef) ->
19121921
{'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.
19131922

19141923
make_ra_conf(Q, ServerId) ->
1915-
make_ra_conf(Q, ServerId, voter).
1924+
make_ra_conf(Q, ServerId, voter, rabbit_fifo:version()).
19161925

1917-
make_ra_conf(Q, ServerId, Membership) ->
1926+
make_ra_conf(Q, ServerId, Membership, MacVersion) ->
19181927
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
19191928
?TICK_INTERVAL),
19201929
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
@@ -1923,10 +1932,12 @@ make_ra_conf(Q, ServerId, Membership) ->
19231932
quorum_min_checkpoint_interval,
19241933
?MIN_CHECKPOINT_INTERVAL),
19251934
make_ra_conf(Q, ServerId, TickTimeout,
1926-
SnapshotInterval, CheckpointInterval, Membership).
1935+
SnapshotInterval, CheckpointInterval,
1936+
Membership, MacVersion).
19271937

19281938
make_ra_conf(Q, ServerId, TickTimeout,
1929-
SnapshotInterval, CheckpointInterval, Membership) ->
1939+
SnapshotInterval, CheckpointInterval,
1940+
Membership, MacVersion) ->
19301941
QName = amqqueue:get_name(Q),
19311942
RaMachine = ra_machine(Q),
19321943
[{ClusterName, _} | _] = Members = members(Q),
@@ -1947,6 +1958,7 @@ make_ra_conf(Q, ServerId, TickTimeout,
19471958
log_init_args => LogCfg,
19481959
tick_timeout => TickTimeout,
19491960
machine => RaMachine,
1961+
initial_machine_version => MacVersion,
19501962
ra_event_formatter => Formatter}).
19511963

19521964
make_mutable_config(Q) ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
55
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6-
%%
76

87
-module(rabbit_stream_coordinator).
98

@@ -495,7 +494,11 @@ start_coordinator_cluster() ->
495494
Nodes = rabbit_nodes:list_reachable(),
496495
rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]),
497496
true = Nodes =/= [],
498-
case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of
497+
Versions = erpc:multicall(Nodes, ?MODULE, version, []),
498+
MinVersion = lists:min([version() | Versions]),
499+
case ra:start_cluster(?RA_SYSTEM,
500+
[make_ra_conf(Node, Nodes, MinVersion)
501+
|| Node <- Nodes]) of
499502
{ok, Started, _} ->
500503
rabbit_log:debug("Started stream coordinator on ~w", [Started]),
501504
Started;
@@ -813,7 +816,8 @@ maybe_resize_coordinator_cluster() ->
813816
end).
814817

815818
add_member(Members, Node) ->
816-
Conf = make_ra_conf(Node, [N || {_, N} <- Members]),
819+
MinMacVersion = erpc:call(Node, ?MODULE, version, []),
820+
Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion),
817821
ServerId = {?MODULE, Node},
818822
case ra:start_server(?RA_SYSTEM, Conf) of
819823
ok ->
@@ -1255,7 +1259,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
12551259
format_ra_event(ServerId, Evt) ->
12561260
{stream_coordinator_event, ServerId, Evt}.
12571261

1258-
make_ra_conf(Node, Nodes) ->
1262+
make_ra_conf(Node, Nodes, MinMacVersion) ->
12591263
UId = ra:new_uid(ra_lib:to_binary(?MODULE)),
12601264
Formatter = {?MODULE, format_ra_event, []},
12611265
Members = [{?MODULE, N} || N <- Nodes],
@@ -1270,6 +1274,7 @@ make_ra_conf(Node, Nodes) ->
12701274
log_init_args => #{uid => UId},
12711275
tick_timeout => TickTimeout,
12721276
machine => {module, ?MODULE, #{}},
1277+
initial_machine_version => MinMacVersion,
12731278
ra_event_formatter => Formatter}.
12741279

12751280
filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamId,

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ init_per_testcase(TestCase, Config) ->
7272
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
7373
meck:new(rabbit_feature_flags, []),
7474
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
75+
meck:expect(rabbit_feature_flags, is_enabled, fun (_, _) -> true end),
7576
ra_server_sup_sup:remove_all(?RA_SYSTEM),
7677
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
7778
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
@@ -880,10 +881,16 @@ discard_next_delivery(ClusterName, State0, Wait) ->
880881
end.
881882

882883
start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
883-
{ok, Started, _} = ra:start_cluster(?RA_SYSTEM,
884-
ClusterName#resource.name,
885-
{module, rabbit_fifo, RaFifoConfig},
886-
ServerIds),
884+
UId = ra:new_uid(ra_lib:to_binary(ClusterName#resource.name)),
885+
Confs = [#{id => Id,
886+
uid => UId,
887+
cluster_name => ClusterName#resource.name,
888+
log_init_args => #{uid => UId},
889+
initial_members => ServerIds,
890+
initial_machine_version => rabbit_fifo:version(),
891+
machine => {module, rabbit_fifo, RaFifoConfig}}
892+
|| Id <- ServerIds],
893+
{ok, Started, _} = ra:start_cluster(?RA_SYSTEM, Confs),
887894
?assertEqual(length(Started), length(ServerIds)),
888895
ok.
889896

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ dep_khepri = hex 0.16.0
5050
dep_khepri_mnesia_migration = hex 0.7.1
5151
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.5
5252
dep_prometheus = hex 4.11.0
53-
dep_ra = hex 2.16.0-pre.10
53+
dep_ra = hex 2.16.0-pre.11
5454
dep_ranch = hex 2.1.0
5555
dep_recon = hex 2.5.6
5656
dep_redbug = hex 2.0.7

0 commit comments

Comments
 (0)