Skip to content

Commit d0c3974

Browse files
committed
Make snapshots with a pre phase restartable
1 parent 7207b2c commit d0c3974

File tree

7 files changed

+123
-26
lines changed

7 files changed

+123
-26
lines changed

src/ra.hrl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
%% represent a unique entry in the ra log
8484
-type log_entry() :: {ra_index(), ra_term(), term()}.
8585

86-
-type chunk_flag() :: pre | next | last.
86+
-type chunk_flag() :: init | pre | next | last.
8787

8888
-type consistent_query_ref() :: {From :: term(), Query :: ra:query_fun(), ConmmitIndex :: ra_index()}.
8989

@@ -169,7 +169,7 @@
169169
{term :: ra_term(), % the leader's term
170170
leader_id :: ra_server_id(),
171171
meta :: snapshot_meta(),
172-
chunk_state = {0, pre} :: {non_neg_integer(), chunk_flag()},
172+
chunk_state :: {non_neg_integer(), chunk_flag()},
173173
data :: term()
174174
}).
175175

src/ra_log.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,7 @@ handle_event(major_compaction, #?MODULE{reader = Reader0,
875875
end;
876876
handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
877877
#?MODULE{cfg = #cfg{uid = UId,
878+
log_id = LogId,
878879
names = Names} = Cfg,
879880
range = {FstIdx, _} = Range,
880881
mem_table = Mt0,
@@ -886,6 +887,8 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
886887
% ?assert(ra_snapshot:pending(SnapState0) =/= undefined),
887888
SnapState1 = ra_snapshot:complete_snapshot(Snap, SnapKind, LiveIndexes,
888889
SnapState0),
890+
?DEBUG("~ts: ra_log: ~s written at index ~b with ~b live indexes",
891+
[LogId, SnapKind, SnapIdx, ra_seq:length(LiveIndexes)]),
889892
case SnapKind of
890893
snapshot ->
891894
put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),

src/ra_log_wal.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ handle_msg({append, {UId, Pid} = Id, MtTid, ExpectedPrevIdx, Idx, Term, Entry},
524524
{in_seq, PrevIdx} ->
525525
% writer was in seq but has sent an out of seq entry
526526
% notify writer
527-
?DEBUG("WAL in ~ts: requesting resend from `~w`, "
527+
?DEBUG("WAL in ~ts: requesting resend for `~s`, "
528528
"last idx ~b idx received (~b,~b)",
529529
[Conf#conf.system, UId, PrevIdx, ExpectedPrevIdx, Idx]),
530530
Pid ! {ra_log_event, {resend_write, PrevIdx + 1}},

src/ra_server.erl

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@
100100
queries_waiting_heartbeats := queue:queue({non_neg_integer(),
101101
consistent_query_ref()}),
102102
pending_consistent_queries := [consistent_query_ref()],
103-
commit_latency => option(non_neg_integer())
103+
commit_latency => option(non_neg_integer()),
104+
snapshot_phase => chunk_flag()
104105
}.
105106

106107
-type state() :: ra_server_state().
@@ -1445,25 +1446,27 @@ handle_follower(#install_snapshot_rpc{term = Term,
14451446
SnapIdx > LastApplied andalso
14461447
%% only install snapshot if the machine version is understood
14471448
MacVer >= SnapMacVer andalso
1448-
Num =< 1 ->
1449+
Num =< 1 andalso
1450+
ChunkFlag /= pre ->
14491451
%% only begin snapshot procedure if Idx is higher than the last_applied
14501452
%% index.
1451-
?DEBUG("~ts: begin_accept snapshot at index ~b in term ~b",
1452-
[LogId, SnapIdx, Term]),
1453+
?DEBUG("~ts: begin_accept snapshot at index ~b in term ~b, phase ~s",
1454+
[LogId, SnapIdx, Term, ChunkFlag]),
14531455
SnapState0 = ra_log:snapshot_state(Log0),
14541456
{ok, SS} = ra_snapshot:begin_accept(Meta, SnapState0),
14551457
Log1 = ra_log:set_snapshot_state(SS, Log0),
14561458

14571459
%% if the snaphost includes pre entries (live entries) then we need
14581460
%% to reset the log to the last applied index to avoid issues
14591461
Log = case ChunkFlag of
1460-
pre ->
1462+
init ->
14611463
{ok, L} = ra_log:set_last_index(LastApplied, Log1),
14621464
L;
14631465
_ ->
14641466
Log1
14651467
end,
14661468
{receive_snapshot, update_term(Term, State0#{log => Log,
1469+
snapshot_phase => ChunkFlag,
14671470
leader_id => LeaderId}),
14681471
[{next_event, Rpc}, {record_leader_msg, LeaderId}]};
14691472
handle_follower(#install_snapshot_rpc{term = Term,
@@ -1538,7 +1541,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15381541
cluster := ClusterIds,
15391542
term := SnapTerm} = SnapMeta,
15401543
chunk_state = {Num, ChunkFlag},
1541-
data = ChunkOrEntries},
1544+
data = ChunkOrEntries} = Rpc,
15421545
#{cfg := #cfg{id = Id,
15431546
log_id = LogId,
15441547
effective_machine_version = CurEffMacVer,
@@ -1548,15 +1551,33 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15481551
cluster := Cluster,
15491552
current_term := CurTerm,
15501553
last_applied := LastApplied,
1551-
machine_state := OldMacState} = State0)
1554+
machine_state := OldMacState,
1555+
snapshot_phase := SnapPhase} = State0)
15521556
when Term >= CurTerm ->
1553-
?DEBUG("~ts: receiving snapshot chunk: ~b / ~w, index ~b, term ~b",
1554-
[LogId, Num, ChunkFlag, SnapIndex, SnapTerm]),
15551557
Reply = #install_snapshot_result{term = CurTerm,
15561558
last_term = SnapTerm,
15571559
last_index = SnapIndex},
15581560
case ChunkFlag of
1561+
init when SnapPhase == init ->
1562+
%% this is ok, just reply
1563+
{receive_snapshot, State0, [{reply, Reply}]};
1564+
init ->
1565+
?DEBUG("~ts: receiving snapshot saw unexpected init phase at snapshot"
1566+
" index term {~b, ~b}, current phase ~s restarting
1567+
snapshot receive process",
1568+
[LogId, SnapIndex, SnapTerm, SnapPhase]),
1569+
%% the snapshot sending must have been interrupted and restarted
1570+
%% during the init or pre-phase
1571+
%% abort the snapshot, and revert to follower
1572+
SnapState0 = ra_log:snapshot_state(Log00),
1573+
SnapState = ra_snapshot:abort_accept(SnapState0),
1574+
Log = ra_log:set_snapshot_state(SnapState, Log00),
1575+
{follower, maps:remove(snapshot_phase, State0#{log => Log}),
1576+
[{next_event, Rpc}]};
15591577
pre when is_list(ChunkOrEntries) ->
1578+
[{_FstIdx, _, _} | _] = ChunkOrEntries,
1579+
% ?DEBUG("~ts: receiving snapshot chunk pre first index ~b snap index ~b, term ~b",
1580+
% [LogId, FstIdx, SnapIndex, SnapTerm]),
15601581
%% reset last index to last applied
15611582
%% as we dont know for sure indexes after last applied
15621583
%% are of the right term
@@ -1566,15 +1587,21 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15661587
{ok, L} = ra_log:write_sparse(E, LstIdx, L0),
15671588
{L, I}
15681589
end, {Log00, LastIdx}, ChunkOrEntries),
1569-
State = update_term(Term, State0#{log => Log}),
1590+
State = update_term(Term, State0#{log => Log,
1591+
snapshot_phase => pre}),
15701592
{receive_snapshot, State, [{reply, Reply}]};
15711593
next ->
1594+
?DEBUG("~ts: receiving snapshot chunk: ~b / ~w, index ~b, term ~b",
1595+
[LogId, Num, ChunkFlag, SnapIndex, SnapTerm]),
15721596
SnapState0 = ra_log:snapshot_state(Log00),
15731597
SnapState = ra_snapshot:accept_chunk(ChunkOrEntries, Num, SnapState0),
15741598
Log0 = ra_log:set_snapshot_state(SnapState, Log00),
1575-
State = update_term(Term, State0#{log => Log0}),
1599+
State = update_term(Term, State0#{log => Log0,
1600+
snapshot_phase => next}),
15761601
{receive_snapshot, State, [{reply, Reply}]};
15771602
last ->
1603+
?DEBUG("~ts: receiving snapshot chunk: ~b / ~w, index ~b, term ~b",
1604+
[LogId, Num, ChunkFlag, SnapIndex, SnapTerm]),
15781605
SnapState0 = ra_log:snapshot_state(Log00),
15791606
{SnapState, MacState, LiveIndexes, Effs0} =
15801607
ra_snapshot:complete_accept(ChunkOrEntries, Num, Machine,
@@ -1612,7 +1639,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16121639
MacState,
16131640
OldMeta,
16141641
OldMacState),
1615-
State = update_term(Term,
1642+
State1 = update_term(Term,
16161643
State0#{cfg => Cfg,
16171644
log => Log,
16181645
commit_index => SnapIndex,
@@ -1624,6 +1651,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16241651
membership =>
16251652
get_membership(ClusterIds, State0),
16261653
machine_state => MacState}),
1654+
State = maps:remove(snapshot_phase, State1),
16271655
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapIndex),
16281656
%% it was the last snapshot chunk so we can revert back to
16291657
%% follower status
@@ -1643,13 +1671,15 @@ handle_receive_snapshot(#append_entries_rpc{term = Term} = Msg,
16431671
SnapState0 = ra_log:snapshot_state(Log0),
16441672
SnapState = ra_snapshot:abort_accept(SnapState0),
16451673
Log = ra_log:set_snapshot_state(SnapState, Log0),
1646-
{follower, update_term(Term, clear_leader_id(State#{log => Log})),
1674+
{follower, maps:remove(snapshot_phase,
1675+
update_term(Term,
1676+
clear_leader_id(State#{log => Log}))),
16471677
[{next_event, Msg}]};
16481678
handle_receive_snapshot({ra_log_event, Evt},
1649-
#{cfg := #cfg{log_id = LogId},
1679+
#{cfg := #cfg{log_id = _LogId},
16501680
log := Log0} = State) ->
1651-
?DEBUG("~ts: ~s ra_log_event received: ~w",
1652-
[LogId, ?FUNCTION_NAME, Evt]),
1681+
% ?DEBUG("~ts: ~s ra_log_event received: ~w",
1682+
% [LogId, ?FUNCTION_NAME, Evt]),
16531683
% simply forward all other events to ra_log
16541684
% whilst the snapshot is being received
16551685
{Log, Effects} = ra_log:handle_event(Evt, Log0),
@@ -1662,7 +1692,7 @@ handle_receive_snapshot(receive_snapshot_timeout,
16621692
SnapState0 = ra_log:snapshot_state(Log0),
16631693
SnapState = ra_snapshot:abort_accept(SnapState0),
16641694
Log = ra_log:set_snapshot_state(SnapState, Log0),
1665-
{follower, State#{log => Log}, []};
1695+
{follower, maps:remove(snapshot_phase, State#{log => Log}), []};
16661696
handle_receive_snapshot(#info_rpc{term = Term} = Msg,
16671697
#{current_term := CurTerm,
16681698
cfg := #cfg{log_id = LogId},
@@ -1675,7 +1705,8 @@ handle_receive_snapshot(#info_rpc{term = Term} = Msg,
16751705
SnapState0 = ra_log:snapshot_state(Log0),
16761706
SnapState = ra_snapshot:abort_accept(SnapState0),
16771707
Log = ra_log:set_snapshot_state(SnapState, Log0),
1678-
{follower, update_term(Term, clear_leader_id(State#{log => Log})),
1708+
{follower, maps:remove(snapshot_phase,
1709+
update_term(Term, clear_leader_id(State#{log => Log}))),
16791710
[{next_event, Msg}]};
16801711
handle_receive_snapshot(#info_rpc{} = InfoRpc, State) ->
16811712
InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
@@ -1692,7 +1723,8 @@ handle_receive_snapshot(#info_reply{term = Term} = Msg,
16921723
SnapState0 = ra_log:snapshot_state(Log0),
16931724
SnapState = ra_snapshot:abort_accept(SnapState0),
16941725
Log = ra_log:set_snapshot_state(SnapState, Log0),
1695-
{follower, update_term(Term, clear_leader_id(State#{log => Log})),
1726+
{follower, maps:remove(snapshot_phase,
1727+
update_term(Term, clear_leader_id(State#{log => Log}))),
16961728
[{next_event, Msg}]};
16971729
handle_receive_snapshot(#info_reply{}, State) ->
16981730
{receive_snapshot, State, []};

src/ra_server_proc.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1944,6 +1944,7 @@ send_snapshots(Id, Term, {_, ToNode} = To, ChunkSize,
19441944
[To, [last_applied]]),
19451945
RPC = #install_snapshot_rpc{term = Term,
19461946
leader_id = Id,
1947+
chunk_state = {0, init},
19471948
meta = Meta},
19481949
case ra_snapshot:indexes(
19491950
ra_snapshot:current_snapshot_dir(SnapState)) of
@@ -1952,6 +1953,9 @@ send_snapshots(Id, Term, {_, ToNode} = To, ChunkSize,
19521953
Indexes = ra_seq:floor(LastApplied + 1, Indexes0),
19531954
?DEBUG("~ts: sending live indexes ~w to ~w ",
19541955
[LogId, ra_seq:range(Indexes), To]),
1956+
%% first send the init phase
1957+
_Res0 = gen_statem:call(To, RPC,
1958+
{dirty_timeout, InstallTimeout}),
19551959
%% there are live indexes to send before the snapshot
19561960
%% TODO: write ra_seq:list_chunk function to avoid expansion
19571961
Idxs = lists:reverse(ra_seq:expand(Indexes)),
@@ -1963,7 +1967,7 @@ send_snapshots(Id, Term, {_, ToNode} = To, ChunkSize,
19631967
RPC1 = RPC#install_snapshot_rpc{chunk_state = {0, pre},
19641968
data = Ents},
19651969
_Res1 = gen_statem:call(To, RPC1,
1966-
{dirty_timeout, InstallTimeout}),
1970+
{dirty_timeout, InstallTimeout}),
19671971
%% TODO: assert Res1 is successful
19681972
F
19691973
end, undefined, ra_lib:lists_chunk(16, Idxs)),

test/ra_log_memory.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
close/1,
1111
append/2,
1212
write/2,
13+
write_sparse/3,
1314
take/3,
1415
fold/5,
1516
last_index_term/1,
@@ -94,7 +95,7 @@ write([{FirstIdx, _, _} | _] = Entries,
9495
{Acc#{Idx => {Term, Data}}, Idx}
9596
end, {Log1, FirstIdx}, Entries),
9697
{ok, State#state{last_index = LastInIdx,
97-
entries = Log}};
98+
entries = Log}};
9899
write([{FirstIdx, _, _} | _] = Entries,
99100
#state{snapshot = {#{index := SnapIdx}, _}, entries = Log0} = State)
100101
when SnapIdx + 1 =:= FirstIdx ->
@@ -106,6 +107,12 @@ write([{FirstIdx, _, _} | _] = Entries,
106107
write(_Entries, _State) ->
107108
{error, {integrity_error, undefined}}.
108109

110+
write_sparse({Idx, Term, Data}, PrevIdx, #state{last_index = PrevIdx,
111+
entries = Log0} = State) ->
112+
{ok, State#state{last_index = Idx,
113+
entries = Log0#{Idx => {Term, Data}}}};
114+
write_sparse(_, _, _) ->
115+
{error, gap_detected}.
109116

110117
take(Start, Num, #state{last_index = LastIdx, entries = Log} = State) ->
111118
Entries = sparse_take(Start, Log, Num, LastIdx, []),

test/ra_server_SUITE.erl

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ all() ->
6464
leader_appends_cluster_change_then_steps_before_applying_it,
6565
leader_receives_install_snapshot_rpc,
6666
follower_installs_snapshot,
67+
follower_installs_snapshot_with_pre,
6768
follower_ignores_installs_snapshot_with_higher_machine_version,
6869
follower_receives_stale_snapshot,
6970
follower_receives_snapshot_lower_than_last_applied,
@@ -166,8 +167,8 @@ setup_log() ->
166167
{ok, {Meta, Data}, []}
167168
end),
168169
meck:expect(ra_snapshot, complete_accept,
169-
fun(_Data, _Num, _Machine, {_Meta, MacSt} = State) ->
170-
{State, MacSt, [], []}
170+
fun(Data, _Num, _Machine, {_Meta, MacSt} = State) ->
171+
{State, Data ++ MacSt, [], []}
171172
end),
172173
meck:expect(ra_snapshot, abort_accept, fun(SS) -> SS end),
173174
meck:expect(ra_snapshot, accepting, fun(_SS) -> undefined end),
@@ -191,6 +192,7 @@ setup_log() ->
191192
meck:expect(ra_log, next_index, fun ra_log_memory:next_index/1),
192193
meck:expect(ra_log, append, fun ra_log_memory:append/2),
193194
meck:expect(ra_log, write, fun ra_log_memory:write/2),
195+
meck:expect(ra_log, write_sparse, fun ra_log_memory:write_sparse/3),
194196
meck:expect(ra_log, handle_event, fun ra_log_memory:handle_event/2),
195197
meck:expect(ra_log, last_written, fun ra_log_memory:last_written/1),
196198
meck:expect(ra_log, last_index_term, fun ra_log_memory:last_index_term/1),
@@ -2288,6 +2290,55 @@ follower_installs_snapshot(_Config) ->
22882290

22892291
ok.
22902292

2293+
follower_installs_snapshot_with_pre(_Config) ->
2294+
N1 = ?N1, N2 = ?N2, N3 = ?N3,
2295+
#{N3 := {_, State = #{cluster := Config}, _}} =
2296+
init_servers([N1, N2, N3], {module, ra_queue, #{}}),
2297+
LastTerm = 1, % snapshot term
2298+
Term = 2, % leader term
2299+
Idx = 3,
2300+
ISRpcInit = #install_snapshot_rpc{term = Term, leader_id = N1,
2301+
meta = snap_meta(Idx, LastTerm, Config),
2302+
chunk_state = {0, init},
2303+
data = []},
2304+
%% the init message starts the process
2305+
{receive_snapshot, State1,
2306+
[{next_event, ISRpc}, {record_leader_msg, _}]} =
2307+
ra_server:handle_follower(ISRpcInit, State#{current_term => Term}),
2308+
2309+
%% actually process init message in the correct state
2310+
{receive_snapshot, State2, [{reply, _}]} =
2311+
ra_server:handle_receive_snapshot(ISRpc, State1),
2312+
2313+
%% now send a pre message
2314+
ISRpcPre = ISRpcInit#install_snapshot_rpc{chunk_state = {0, pre},
2315+
data = [{2, 1, <<"e1">>}]},
2316+
{receive_snapshot, State3, [{reply, _}]} =
2317+
ra_server:handle_receive_snapshot(ISRpcPre, State2),
2318+
2319+
%% test that init returns to follower and retries
2320+
{follower, State3b, [{next_event, ISRpcInit}]} =
2321+
ra_server:handle_receive_snapshot(ISRpcInit, State3),
2322+
?assertNot(maps:is_key(snapshot_phase, State3b)),
2323+
2324+
meck:expect(ra_snapshot, complete_accept,
2325+
fun (Mac, _, _, S) ->
2326+
{S, Mac, [], []}
2327+
end),
2328+
2329+
%% finally process the actual snapshot
2330+
ISRpc1 = ISRpc#install_snapshot_rpc{chunk_state = {1, last},
2331+
data = [2]},
2332+
{follower, #{current_term := Term,
2333+
commit_index := Idx,
2334+
last_applied := Idx,
2335+
cluster := Config,
2336+
machine_state := [2],
2337+
leader_id := N1} = _State,
2338+
[{reply, #install_snapshot_result{}}]} =
2339+
ra_server:handle_receive_snapshot(ISRpc1, State3),
2340+
ok.
2341+
22912342
follower_ignores_installs_snapshot_with_higher_machine_version(_Config) ->
22922343
%% currently followers cannot correctly handle snapshots with a higher
22932344
%% machine version so have to ignore them

0 commit comments

Comments
 (0)