Skip to content

Commit 8077b75

Browse files
committed
Introduce initial_machine_version server config.
This new key can be used to specify the initial machine version a new Ra server should be initialised against. This allows machines to skip old versions when creating a new Ra cluster. This is particularly useful when machine_version_strategy=all as the initial machine upgrade upgrade after cluster creation is delayed until all members reply to the info requests. If the machine_version_strategy=all, starting a server with an initial machine version that is higher than the locally available machine version will result in an error: {error, invalid_initial_machine_version}. When machine_version_strategy=quorum the initial machine version will be clamped to the locally available machine version.
1 parent 5997bdf commit 8077b75

File tree

6 files changed

+217
-51
lines changed

6 files changed

+217
-51
lines changed

src/ra_machine.erl

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666

6767
-include("ra.hrl").
6868

69-
-export([init/2,
69+
-export([init/3,
7070
apply/4,
7171
tick/3,
7272
snapshot_installed/5,
@@ -90,7 +90,9 @@
9090
-type user_command() :: term().
9191
%% the command type for a given machine implementation
9292

93-
-type machine_init_args() :: #{name := atom(), atom() => term()}.
93+
-type machine_init_args() :: #{name := atom(),
94+
machine_version => version(),
95+
atom() => term()}.
9496
%% the configuration passed to the init callback
9597

9698
-type machine() :: {machine, module(), AddInitArgs :: #{term() => term()}}.
@@ -294,15 +296,11 @@
294296
%% @doc initialise a new machine
295297
%% This is only called on startup only if there isn't yet a snapshot to recover
296298
%% from. Once a snapshot has been taken this is never called again.
297-
-spec init(machine(), atom()) -> state().
298-
init({machine, _, Args} = Machine, Name) ->
299-
%% init always dispatches to the first version
300-
%% as this means every state machine in a mixed version cluster will
301-
%% have a common starting point.
302-
%% TODO: it should be possible to pass a lowest supported state machine
303-
%% version flag in the init args so that old machine version can be purged
304-
Mod = which_module(Machine, 0),
305-
Mod:init(Args#{name => Name}).
299+
-spec init(machine(), atom(), version()) -> state().
300+
init({machine, _, Args} = Machine, Name, Version) ->
301+
Mod = which_module(Machine, Version),
302+
Mod:init(Args#{name => Name,
303+
machine_version => Version}).
306304

307305
-spec apply(module(), command_meta_data(), command(), State) ->
308306
{State, reply(), effects()} | {State, reply()}.

src/ra_server.erl

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@
215215
log_init_args := ra_log:ra_log_init_args(),
216216
initial_members := [ra_server_id()],
217217
machine := machine_conf(),
218+
initial_machine_version => ra_machine:version(),
218219
friendly_name => unicode:chardata(),
219220
metrics_key => term(),
220221
% TODO: review - only really used for
@@ -352,24 +353,26 @@ init(#{id := Id,
352353
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),
353354

354355
LatestMacVer = ra_machine:version(Machine),
356+
InitialMachineVersion = min(LatestMacVer,
357+
maps:get(initial_machine_version, Config, 0)),
355358

356-
{_FirstIndex, Cluster0, MacVer, MacState,
359+
{Cluster0, EffectiveMacVer, MacState,
357360
{SnapshotIdx, _} = SnapshotIndexTerm} =
358361
case ra_log:recover_snapshot(Log0) of
359362
undefined ->
360-
InitialMachineState = ra_machine:init(Machine, Name),
361-
{0, make_cluster(Id, InitialNodes),
362-
0, InitialMachineState, {0, 0}};
363+
InitialMachineState = ra_machine:init(Machine, Name,
364+
InitialMachineVersion),
365+
{make_cluster(Id, InitialNodes),
366+
InitialMachineVersion, InitialMachineState, {0, 0}};
363367
{#{index := Idx,
364368
term := Term,
365369
cluster := ClusterNodes,
366370
machine_version := MacVersion}, MacSt} ->
367371
Clu = make_cluster(Id, ClusterNodes),
368372
%% the snapshot is the last index before the first index
369-
%% TODO: should this be Idx + 1?
370-
{Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}}
373+
{Clu, MacVersion, MacSt, {Idx, Term}}
371374
end,
372-
MacMod = ra_machine:which_module(Machine, MacVer),
375+
MacMod = ra_machine:which_module(Machine, EffectiveMacVer),
373376

374377
CommitIndex = max(LastApplied, SnapshotIdx),
375378
Cfg = #cfg{id = Id,
@@ -378,8 +381,8 @@ init(#{id := Id,
378381
metrics_key = MetricKey,
379382
machine = Machine,
380383
machine_version = LatestMacVer,
381-
machine_versions = [{SnapshotIdx, MacVer}],
382-
effective_machine_version = MacVer,
384+
machine_versions = [{SnapshotIdx, EffectiveMacVer}],
385+
effective_machine_version = EffectiveMacVer,
383386
effective_machine_module = MacMod,
384387
effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod),
385388
max_pipeline_count = MaxPipelineCount,
@@ -389,7 +392,7 @@ init(#{id := Id,
389392
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CommitIndex),
390393
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx),
391394
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm),
392-
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, MacVer),
395+
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, EffectiveMacVer),
393396

394397
NonVoter = get_membership(Cluster0, Id, UId,
395398
maps:get(membership, Config, voter)),

src/ra_server_sup_sup.erl

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@
3939
-include("ra.hrl").
4040

4141
-spec start_server(System :: atom(), ra_server:ra_server_config()) ->
42-
supervisor:startchild_ret() | {error, not_new | system_not_started} | {badrpc, term()}.
42+
supervisor:startchild_ret() |
43+
{error, not_new | system_not_started | invalid_initial_machine_version} |
44+
{badrpc, term()}.
4345
start_server(System, #{id := NodeId,
4446
uid := UId} = Config)
4547
when is_atom(System) ->
@@ -61,9 +63,14 @@ start_server_rpc(System, UId, Config0) ->
6163
%% check that the server isn't already registered
6264
case ra_directory:name_of(System, UId) of
6365
undefined ->
64-
case ra_system:lookup_name(System, server_sup) of
65-
{ok, Name} ->
66-
start_child(Name, Config);
66+
case validate_config(Config) of
67+
ok ->
68+
case ra_system:lookup_name(System, server_sup) of
69+
{ok, Name} ->
70+
start_child(Name, Config);
71+
Err ->
72+
Err
73+
end;
6774
Err ->
6875
Err
6976
end;
@@ -77,6 +84,22 @@ start_server_rpc(System, UId, Config0) ->
7784
end
7885
end.
7986

87+
validate_config(#{system_config := SysConf} = Config) ->
88+
Strat = maps:get(machine_upgrade_strategy, SysConf, all),
89+
case Config of
90+
#{initial_machine_version := InitMacVer,
91+
machine := {module, Mod, Args}} when Strat == all ->
92+
MacVer = ra_machine:version({machine, Mod, Args}),
93+
if MacVer < InitMacVer ->
94+
{error, invalid_initial_machine_version};
95+
true ->
96+
ok
97+
end;
98+
_ ->
99+
ok
100+
end.
101+
102+
80103
restart_server_rpc(System, {RaName, _Node}, AddConfig)
81104
when is_atom(System) ->
82105
case ra_system:fetch(System) of

test/ra_dbg_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ execute_state_machine() ->
7777
%% creating a new WAL file with ra_fifo
7878
[Srv] = Nodes = [{ra_dbg, node()}],
7979
ClusterId = ra_dbg,
80-
Config = #{name => ClusterId},
80+
Config = #{name => ClusterId,
81+
machine_version => 0},
8182
Machine = {module, ra_fifo, Config},
8283
ra:start(),
8384
{ok, _, _} = ra:start_cluster(default, ClusterId, Machine, Nodes),

test/ra_fifo.erl

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@
170170
-opaque state() :: #state{}.
171171

172172
-type config() :: #{name := atom(),
173+
machine_version := ra_machine:version(),
173174
dead_letter_handler => applied_mfa(),
174175
become_leader_handler => applied_mfa(),
175176
cancel_customer_handler => applied_mfa(),
@@ -902,7 +903,8 @@ size_test(NumMsg, NumCust) ->
902903
EnqGen = fun(N) -> {N, {enqueue, N}} end,
903904
CustGen = fun(N) -> {N, {checkout, {auto, 100},
904905
spawn(fun() -> ok end)}} end,
905-
S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test})),
906+
S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test,
907+
machine_version => 0})),
906908
S = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
907909
S2 = S#state{ra_indexes = ra_fifo_index:map(fun(_, _) -> undefined end,
908910
S#state.ra_indexes)},
@@ -918,29 +920,13 @@ perf_test(NumMsg, NumCust) ->
918920
{N, {settle, N - NumMsg - NumCust - 1, Pid}}
919921
end,
920922
S0 = run_log(1, NumMsg, EnqGen,
921-
init(#{name => size_test})),
923+
init(#{name => size_test,
924+
machine_version => 0})),
922925
S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
923926
_ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1),
924927
ok
925928
end).
926929

927-
% profile(File) ->
928-
% GzFile = atom_to_list(File) ++ ".gz",
929-
% lg:trace([ra_fifo, maps, queue, ra_fifo_index], lg_file_tracer,
930-
% GzFile, #{running => false, mode => profile}),
931-
% NumMsg = 10000,
932-
% NumCust = 500,
933-
% EnqGen = fun(N) -> {N, {enqueue, self(), N, N}} end,
934-
% Pid = spawn(fun() -> ok end),
935-
% CustGen = fun(N) -> {N, {checkout, {auto, NumMsg},
936-
% {term_to_binary(N), Pid}}} end,
937-
% SetlGen = fun(N) -> {N, {settle, N - NumMsg - NumCust - 1, Pid}} end,
938-
% S0 = run_log(1, NumMsg, EnqGen, element(1, init(#{name => size_test}))),
939-
% S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
940-
% _ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1),
941-
% lg:stop().
942-
943-
944930
run_log(Num, Num, _Gen, State) ->
945931
State;
946932
run_log(Num, Max, Gen, State0) ->
@@ -995,6 +981,7 @@ dehydrate_state(#state{messages = Messages0,
995981

996982
test_init(Name) ->
997983
init(#{name => Name,
984+
machine_version => 0,
998985
shadow_copy_interval => 0,
999986
metrics_handler => {?MODULE, metrics_handler, []}}).
1000987

@@ -1243,6 +1230,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
12431230
discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
12441231
Cid = {<<"completed_customer_yields_demonitor_effect_test">>, self()},
12451232
State00 = init(#{name => test,
1233+
machine_version => 0,
12461234
dead_letter_handler =>
12471235
{somemod, somefun, [somearg]}}),
12481236
{State0, _, [_, _]} = enq(1, 1, first, State00),
@@ -1430,6 +1418,7 @@ duplicate_delivery_test() ->
14301418
state_enter_test() ->
14311419

14321420
S0 = init(#{name => the_name,
1421+
machine_version => 0,
14331422
become_leader_handler => {m, f, [a]}}),
14341423
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
14351424
ok.
@@ -1505,7 +1494,7 @@ run_log(InitState, Entries) ->
15051494
aux_test() ->
15061495
_ = ra_machine_ets:start_link(),
15071496
Aux0 = init_aux(aux_test),
1508-
MacState = init(#{name => aux_test}),
1497+
MacState = init(#{name => aux_test, machine_version => 0}),
15091498
Log = undefined,
15101499
{no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
15111500
Log, MacState),

0 commit comments

Comments
 (0)