Skip to content

Commit a33664c

Browse files
committed
pre.11 - add support for initial_machine_version config
1 parent 627244f commit a33664c

File tree

4 files changed

+26
-13
lines changed

4 files changed

+26
-13
lines changed

MODULE.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ erlang_package.hex_package(
253253
name = "ra",
254254
build_file = "@rabbitmq-server//bazel:BUILD.ra",
255255
pkg = "ra",
256-
sha256 = "a7eae50b0c1c0be4daf9b7ee97be796e6fda372ea6b2047c3aeac89cdc2011df",
257-
version = "2.16.0-pre.10",
256+
sha256 = "cfc0dbe5ebbd54f44081f95ea6a1daeb28a89df82aa9baa234f68abbb36bdc67",
257+
version = "2.16.0-pre.11",
258258
)
259259

260260
erlang_package.git_package(

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,14 @@ start_cluster(Q) ->
259259
NewQ1 = amqqueue:set_type_state(NewQ0,
260260
#{nodes => [LeaderNode | FollowerNodes]}),
261261

262+
Versions = erpc:multicall(FollowerNodes, rabbit_fifo, version, []),
263+
MinVersion = lists:min([rabbit_fifo:version() | Versions]),
264+
262265
rabbit_log:debug("Will start up to ~w replicas for quorum ~ts with leader on node '~ts'",
263266
[QuorumSize, rabbit_misc:rs(QName), LeaderNode]),
264267
case rabbit_amqqueue:internal_declare(NewQ1, false) of
265268
{created, NewQ} ->
266-
RaConfs = [make_ra_conf(NewQ, ServerId)
269+
RaConfs = [make_ra_conf(NewQ, ServerId, voter, MinVersion)
267270
|| ServerId <- members(NewQ)],
268271

269272
%% khepri projections on remote nodes are eventually consistent
@@ -1339,7 +1342,9 @@ add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) ->
13391342
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
13401343
ServerId = {RaName, Node},
13411344
Members = members(Q),
1342-
Conf = make_ra_conf(Q, ServerId, Membership),
1345+
1346+
MachineVersion = erpc:call(Node, rabbit_fifo, version, []),
1347+
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
13431348
case ra:start_server(?RA_SYSTEM, Conf) of
13441349
ok ->
13451350
ServerIdSpec =
@@ -1912,9 +1917,9 @@ format_ra_event(ServerId, Evt, QRef) ->
19121917
{'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.
19131918

19141919
make_ra_conf(Q, ServerId) ->
1915-
make_ra_conf(Q, ServerId, voter).
1920+
make_ra_conf(Q, ServerId, voter, rabbit_fifo:version()).
19161921

1917-
make_ra_conf(Q, ServerId, Membership) ->
1922+
make_ra_conf(Q, ServerId, Membership, MacVersion) ->
19181923
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
19191924
?TICK_INTERVAL),
19201925
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
@@ -1923,10 +1928,12 @@ make_ra_conf(Q, ServerId, Membership) ->
19231928
quorum_min_checkpoint_interval,
19241929
?MIN_CHECKPOINT_INTERVAL),
19251930
make_ra_conf(Q, ServerId, TickTimeout,
1926-
SnapshotInterval, CheckpointInterval, Membership).
1931+
SnapshotInterval, CheckpointInterval,
1932+
Membership, MacVersion).
19271933

19281934
make_ra_conf(Q, ServerId, TickTimeout,
1929-
SnapshotInterval, CheckpointInterval, Membership) ->
1935+
SnapshotInterval, CheckpointInterval,
1936+
Membership, MacVersion) ->
19301937
QName = amqqueue:get_name(Q),
19311938
RaMachine = ra_machine(Q),
19321939
[{ClusterName, _} | _] = Members = members(Q),
@@ -1947,6 +1954,7 @@ make_ra_conf(Q, ServerId, TickTimeout,
19471954
log_init_args => LogCfg,
19481955
tick_timeout => TickTimeout,
19491956
machine => RaMachine,
1957+
initial_machine_version => MacVersion,
19501958
ra_event_formatter => Formatter}).
19511959

19521960
make_mutable_config(Q) ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
55
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6-
%%
76

87
-module(rabbit_stream_coordinator).
98

@@ -495,7 +494,11 @@ start_coordinator_cluster() ->
495494
Nodes = rabbit_nodes:list_reachable(),
496495
rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]),
497496
true = Nodes =/= [],
498-
case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of
497+
Versions = erpc:multicall(Nodes, ?MODULE, version, []),
498+
MinVersion = lists:min([version() | Versions]),
499+
case ra:start_cluster(?RA_SYSTEM,
500+
[make_ra_conf(Node, Nodes, MinVersion)
501+
|| Node <- Nodes]) of
499502
{ok, Started, _} ->
500503
rabbit_log:debug("Started stream coordinator on ~w", [Started]),
501504
Started;
@@ -813,7 +816,8 @@ maybe_resize_coordinator_cluster() ->
813816
end).
814817

815818
add_member(Members, Node) ->
816-
Conf = make_ra_conf(Node, [N || {_, N} <- Members]),
819+
MinMacVersion = erpc:call(Node, ?MODULE, version, []),
820+
Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion),
817821
ServerId = {?MODULE, Node},
818822
case ra:start_server(?RA_SYSTEM, Conf) of
819823
ok ->
@@ -1255,7 +1259,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
12551259
format_ra_event(ServerId, Evt) ->
12561260
{stream_coordinator_event, ServerId, Evt}.
12571261

1258-
make_ra_conf(Node, Nodes) ->
1262+
make_ra_conf(Node, Nodes, MinMacVersion) ->
12591263
UId = ra:new_uid(ra_lib:to_binary(?MODULE)),
12601264
Formatter = {?MODULE, format_ra_event, []},
12611265
Members = [{?MODULE, N} || N <- Nodes],
@@ -1270,6 +1274,7 @@ make_ra_conf(Node, Nodes) ->
12701274
log_init_args => #{uid => UId},
12711275
tick_timeout => TickTimeout,
12721276
machine => {module, ?MODULE, #{}},
1277+
initial_machine_version => MinMacVersion,
12731278
ra_event_formatter => Formatter}.
12741279

12751280
filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamId,

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ dep_khepri = hex 0.16.0
5050
dep_khepri_mnesia_migration = hex 0.7.1
5151
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.5
5252
dep_prometheus = hex 4.11.0
53-
dep_ra = hex 2.16.0-pre.10
53+
dep_ra = hex 2.16.0-pre.11
5454
dep_ranch = hex 2.1.0
5555
dep_recon = hex 2.5.6
5656
dep_redbug = hex 2.0.7

0 commit comments

Comments
 (0)