Skip to content

Commit 3e03e18

Browse files
committed
Reset log after aborted snapshot installation.
1 parent 0183b8a commit 3e03e18

File tree

8 files changed

+104
-50
lines changed

8 files changed

+104
-50
lines changed

src/ra_kv_harness.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ start_single_peer_node(NodeId, Opts) ->
181181
erpc:call(NodeName, logger, set_primary_config, [level, warning]),
182182
erpc:call(NodeName, application, set_env, [sasl, sasl_error_logger, false]),
183183
erpc:call(NodeName, application, stop, [sasl]),
184+
erpc:call(NodeName, application, set_env,
185+
[kernel, prevent_overlapping_partitions, false]),
184186
Dir = filename:join(BaseDir, NodeName),
185187
{ok, _} = erpc:call(NodeName, ra, start_in, [Dir]),
186188
% Set logger level to reduce verbosity on peer node

src/ra_log.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,7 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
732732
mem_table = Mt0,
733733
last_written_index_term = {LWIdx0, _}} = State0) ->
734734
Cur = ra_snapshot:current(SnapState),
735+
%% TODO: can a log recover to the right reset point? I doubt it
735736
case fetch_term(Idx, State0) of
736737
{undefined, State} when element(1, Cur) =/= Idx ->
737738
%% not found and Idx isn't equal to latest snapshot index
@@ -745,7 +746,7 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
745746
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx),
746747
{ok, State#?MODULE{range = ra_range:limit(Idx + 1, Range),
747748
last_term = SnapTerm,
748-
mem_table = Mt,
749+
mem_table = Mt,
749750
last_written_index_term = Cur}};
750751
{Term, State1} ->
751752
LWIdx = min(Idx, LWIdx0),
@@ -947,8 +948,8 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
947948
current_snapshot = Snap,
948949
snapshot_state = SnapState},
949950
{Reader, CompEffs} = ra_log_segments:schedule_compaction(minor, SnapIdx,
950-
LiveIndexes,
951-
State#?MODULE.reader),
951+
LiveIndexes,
952+
State#?MODULE.reader),
952953
Effects = CompEffs ++ Effects0,
953954
{State#?MODULE{reader = Reader}, Effects};
954955
checkpoint ->
@@ -1165,7 +1166,6 @@ assert(#?MODULE{cfg = #cfg{log_id = LogId},
11651166
current_snapshot = CurrSnap,
11661167
live_indexes = LiveIndexes
11671168
} = State) ->
1168-
ra_log_segments:range/1
11691169
%% TODO: remove this at some point?
11701170
?DEBUG("~ts: ra_log: asserting Range ~p Snapshot ~p",
11711171
[LogId, Range, CurrSnap]),

src/ra_server.erl

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,11 +1571,8 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15711571
%% the snapshot sending must have been interrupted and restarted
15721572
%% during the init or pre-phase
15731573
%% abort the snapshot, and revert to follower
1574-
SnapState0 = ra_log:snapshot_state(Log00),
1575-
SnapState = ra_snapshot:abort_accept(SnapState0),
1576-
Log = ra_log:set_snapshot_state(SnapState, Log00),
1577-
{follower, maps:remove(snapshot_phase, State0#{log => Log}),
1578-
[{next_event, Rpc}]};
1574+
State = abort_receive(State0),
1575+
{follower, State, [{next_event, Rpc}]};
15791576
pre when is_list(ChunkOrEntries) ->
15801577
[{_FstIdx, _, _} | _] = ChunkOrEntries,
15811578
% ?DEBUG("~ts: receiving snapshot chunk pre first index ~b snap index ~b, term ~b",
@@ -1664,71 +1661,50 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16641661
end;
16651662
handle_receive_snapshot(#append_entries_rpc{term = Term} = Msg,
16661663
#{current_term := CurTerm,
1667-
cfg := #cfg{log_id = LogId},
1668-
log := Log0} = State)
1664+
cfg := #cfg{log_id = LogId}} = State0)
16691665
when Term > CurTerm ->
16701666
?INFO("~ts: follower receiving snapshot saw append_entries_rpc from ~w for term ~b "
16711667
"abdicates term: ~b!",
16721668
[LogId, Msg#append_entries_rpc.leader_id,
16731669
Term, CurTerm]),
1674-
SnapState0 = ra_log:snapshot_state(Log0),
1675-
SnapState = ra_snapshot:abort_accept(SnapState0),
1676-
Log = ra_log:set_snapshot_state(SnapState, Log0),
1677-
{follower, maps:remove(snapshot_phase,
1678-
update_term(Term,
1679-
clear_leader_id(State#{log => Log}))),
1680-
[{next_event, Msg}]};
1670+
State = abort_receive(State0),
1671+
{follower, update_term(Term, State), [{next_event, Msg}]};
16811672
handle_receive_snapshot({ra_log_event, Evt},
16821673
#{cfg := #cfg{log_id = _LogId},
16831674
log := Log0} = State) ->
1684-
% ?DEBUG("~ts: ~s ra_log_event received: ~w",
1685-
% [LogId, ?FUNCTION_NAME, Evt]),
16861675
% simply forward all other events to ra_log
16871676
% whilst the snapshot is being received
16881677
{Log, Effects} = ra_log:handle_event(Evt, Log0),
16891678
{receive_snapshot, State#{log => Log}, Effects};
16901679
handle_receive_snapshot(receive_snapshot_timeout,
1691-
#{cfg := #cfg{log_id = LogId},
1692-
log := Log0} = State) ->
1680+
#{cfg := #cfg{log_id = LogId}} = State0) ->
16931681
?INFO("~ts: ~s receive snapshot timed out.",
16941682
[LogId, ?FUNCTION_NAME]),
1695-
SnapState0 = ra_log:snapshot_state(Log0),
1696-
SnapState = ra_snapshot:abort_accept(SnapState0),
1697-
Log = ra_log:set_snapshot_state(SnapState, Log0),
1698-
{follower, maps:remove(snapshot_phase, State#{log => Log}), []};
1683+
State = abort_receive(State0),
1684+
{follower, State, []};
16991685
handle_receive_snapshot(#info_rpc{term = Term} = Msg,
17001686
#{current_term := CurTerm,
1701-
cfg := #cfg{log_id = LogId},
1702-
log := Log0} = State)
1687+
cfg := #cfg{log_id = LogId}} = State0)
17031688
when CurTerm < Term ->
17041689
?INFO("~ts: follower receiving snapshot saw info_rpc from ~w for term ~b "
1705-
"abdicates term: ~b!",
1690+
"current term: ~b!",
17061691
[LogId, Msg#info_rpc.from,
17071692
Term, CurTerm]),
1708-
SnapState0 = ra_log:snapshot_state(Log0),
1709-
SnapState = ra_snapshot:abort_accept(SnapState0),
1710-
Log = ra_log:set_snapshot_state(SnapState, Log0),
1711-
{follower, maps:remove(snapshot_phase,
1712-
update_term(Term, clear_leader_id(State#{log => Log}))),
1713-
[{next_event, Msg}]};
1693+
State = abort_receive(State0),
1694+
{follower, update_term(Term, State), [{next_event, Msg}]};
17141695
handle_receive_snapshot(#info_rpc{} = InfoRpc, State) ->
17151696
InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
17161697
{receive_snapshot, State, [InfoReplyEffect]};
17171698
handle_receive_snapshot(#info_reply{term = Term} = Msg,
17181699
#{current_term := CurTerm,
1719-
cfg := #cfg{log_id = LogId},
1720-
log := Log0} = State)
1700+
cfg := #cfg{log_id = LogId}} = State0)
17211701
when CurTerm < Term ->
17221702
?INFO("~ts: follower receiving snapshot saw info_reply from ~w for term ~b "
17231703
"abdicates term: ~b!",
17241704
[LogId, Msg#info_reply.from,
17251705
Term, CurTerm]),
1726-
SnapState0 = ra_log:snapshot_state(Log0),
1727-
SnapState = ra_snapshot:abort_accept(SnapState0),
1728-
Log = ra_log:set_snapshot_state(SnapState, Log0),
1729-
{follower, maps:remove(snapshot_phase,
1730-
update_term(Term, clear_leader_id(State#{log => Log}))),
1731-
[{next_event, Msg}]};
1706+
State = abort_receive(State0),
1707+
{follower, update_term(Term, State), [{next_event, Msg}]};
17321708
handle_receive_snapshot(#info_reply{}, State) ->
17331709
{receive_snapshot, State, []};
17341710
handle_receive_snapshot(Msg, State) ->
@@ -1737,6 +1713,21 @@ handle_receive_snapshot(Msg, State) ->
17371713
%% TODO: work out what else to handle
17381714
{receive_snapshot, State, [{reply, {error, {unsupported_call, Msg}}}]}.
17391715

1716+
abort_receive(#{snapshot_phase := Phase,
1717+
last_applied := LastApplied,
1718+
log := Log0} = State) ->
1719+
SnapState0 = ra_log:snapshot_state(Log0),
1720+
SnapState = ra_snapshot:abort_accept(SnapState0),
1721+
Log1 = ra_log:set_snapshot_state(SnapState, Log0),
1722+
Log = case Phase of
1723+
pre ->
1724+
{ok, Log2} = ra_log:set_last_index(LastApplied, Log1),
1725+
Log2;
1726+
_ ->
1727+
Log1
1728+
end,
1729+
clear_leader_id(maps:remove(snapshot_phase, State#{log => Log})).
1730+
17401731
-spec handle_await_condition(ra_msg(), ra_server_state()) ->
17411732
{ra_state(), ra_server_state(), effects()}.
17421733
handle_await_condition(#request_vote_rpc{} = Msg, State) ->

src/ra_server_proc.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,14 @@ receive_snapshot(EventType, {aux_command, Cmd}, State0) ->
938938
?HANDLE_EFFECTS(Effects, EventType,
939939
State0#state{server_state = ServerState}),
940940
{keep_state, State, Actions};
941+
receive_snapshot(info, {'DOWN', MRef, process, _Pid, _Info},
942+
#state{leader_monitor = MRef} = State) ->
943+
%% leader is down
944+
?INFO("~ts: receive_snapshot - Leader monitor down. Aborting snapshot receive. "
945+
"Entering follower state.",
946+
[log_id(State)]),
947+
receive_snapshot(info, receive_snapshot_timeout,
948+
State#state{leader_monitor = undefined});
941949
receive_snapshot(EventType, Msg, State0) ->
942950
case handle_receive_snapshot(Msg, State0) of
943951
{receive_snapshot, State1, Effects} ->

src/ra_snapshot.erl

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,18 @@ find_snapshots(#?MODULE{uid = UId,
204204
State;
205205
Current0 ->
206206
Current = filename:join(SnapshotsDir, Current0),
207-
{ok, #{index := Idx, term := Term}} = Module:read_meta(Current),
208-
%% TODO: recover live indexes and record that
209-
ok = ra_log_snapshot_state:insert(?ETSTBL, UId, Idx, Idx+1, []),
207+
{ok, #{index := Idx,
208+
term := Term}} = Module:read_meta(Current),
209+
%% recover live indexes and record that
210+
{ok, Indexes} = indexes(Current),
211+
SmallestLiveIdx = case ra_seq:first(Indexes) of
212+
undefined ->
213+
Idx+1;
214+
First ->
215+
First
216+
end,
217+
ok = ra_log_snapshot_state:insert(?ETSTBL, UId, Idx, SmallestLiveIdx,
218+
Indexes),
210219

211220
ok = delete_snapshots(SnapshotsDir, lists:delete(Current0, Snaps)),
212221
%% delete old snapshots if any
@@ -494,7 +503,7 @@ complete_snapshot({Idx, _} = IdxTerm, snapshot, LiveIndexes,
494503
I ->
495504
I
496505
end,
497-
%% TODO live indexes
506+
%% live indexes
498507
ok = ra_log_snapshot_state:insert(?ETSTBL, UId, Idx, SmallestIdx,
499508
LiveIndexes),
500509
State#?MODULE{pending = undefined,

test/ra_log_2_SUITE.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,7 +1381,6 @@ snapshot_installation_with_live_indexes(Config) ->
13811381

13821382

13831383
run_effs(Effs4),
1384-
ct:pal("o ~p", [ra_log:overview(Log3)]),
13851384
{15, _} = ra_log:last_index_term(Log3),
13861385
{15, _} = ra_log:last_written(Log3),
13871386
%% write the next index, bearning in mind the last index the WAL saw
@@ -1392,15 +1391,22 @@ snapshot_installation_with_live_indexes(Config) ->
13921391
LW = ra_log:last_written(L),
13931392
{16, 2} == LW
13941393
end),
1395-
ct:pal("o ~p", [ra_log:overview(Log5)]),
13961394
ra_log_wal:force_roll_over(ra_log_wal),
13971395
Log = assert_log_events(Log5,
13981396
fun (L) ->
13991397
#{mem_table_range := R} = ra_log:overview(L),
14001398
R == undefined
14011399
end),
14021400
ct:pal("o ~p", [ra_log:overview(Log)]),
1401+
UId = ?config(uid, Config),
1402+
?assertEqual(LiveIndexes, ra_log_snapshot_state:live_indexes(
1403+
ra_log_snapshot_state, UId)),
1404+
ra_log:close(Log),
14031405
flush(),
1406+
_LogAfter = ra_log_init(Config),
1407+
%% validate recovery recovers the live indexes correctly
1408+
?assertEqual(LiveIndexes, ra_log_snapshot_state:live_indexes(
1409+
ra_log_snapshot_state, UId)),
14041410
ok.
14051411

14061412
snapshot_installation(Config) ->

test/ra_log_memory.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ write_meta_f(_Key, _Value, State) ->
284284
can_write(_Log) ->
285285
true.
286286

287-
overview(Log) ->
287+
overview(#state{} = Log) ->
288288
#{type => ?MODULE,
289289
last_index => Log#state.last_index,
290290
last_written => Log#state.last_written,

test/ra_server_SUITE.erl

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ all() ->
6565
leader_receives_install_snapshot_rpc,
6666
follower_installs_snapshot,
6767
follower_installs_snapshot_with_pre,
68+
follower_aborts_snapshot_with_pre,
6869
follower_ignores_installs_snapshot_with_higher_machine_version,
6970
follower_receives_stale_snapshot,
7071
follower_receives_snapshot_lower_than_last_applied,
@@ -182,6 +183,7 @@ setup_log() ->
182183
ra_log_memory:fold(A, B, C, D, E)
183184
end),
184185
meck:expect(ra_log, release_resources, fun ra_log_memory:release_resources/3),
186+
meck:expect(ra_log, overview, fun ra_log_memory:overview/1),
185187
meck:expect(ra_log, append_sync,
186188
fun({Idx, Term, _} = E, L0) ->
187189
L1 = ra_log_memory:append(E, L0),
@@ -2339,6 +2341,42 @@ follower_installs_snapshot_with_pre(_Config) ->
23392341
ra_server:handle_receive_snapshot(ISRpc1, State3),
23402342
ok.
23412343

2344+
follower_aborts_snapshot_with_pre(_Config) ->
2345+
N1 = ?N1, N2 = ?N2, N3 = ?N3,
2346+
#{N3 := {_, State = #{cluster := Config}, _}} =
2347+
init_servers([N1, N2, N3], {module, ra_queue, #{}}),
2348+
LastTerm = 1, % snapshot term
2349+
Term = 2, % leader term
2350+
Idx = 3,
2351+
ISRpcInit = #install_snapshot_rpc{term = Term, leader_id = N1,
2352+
meta = snap_meta(Idx, LastTerm, Config),
2353+
chunk_state = {0, init},
2354+
data = []},
2355+
%% the init message starts the process
2356+
{receive_snapshot, State1,
2357+
[{next_event, ISRpc}, {record_leader_msg, _}]} =
2358+
ra_server:handle_follower(ISRpcInit, State#{current_term => Term}),
2359+
2360+
%% actually process init message in the correct state
2361+
{receive_snapshot, State2, [{reply, _}]} =
2362+
ra_server:handle_receive_snapshot(ISRpc, State1),
2363+
2364+
%% now send a pre message
2365+
ISRpcPre = ISRpcInit#install_snapshot_rpc{chunk_state = {0, pre},
2366+
data = [{2, 1, <<"e1">>}]},
2367+
{receive_snapshot, State3, [{reply, _}]} =
2368+
ra_server:handle_receive_snapshot(ISRpcPre, State2),
2369+
?assertMatch(#{log := #{last_index := 2}},
2370+
ra_server:overview(State3)),
2371+
2372+
{follower, State3b, []} =
2373+
ra_server:handle_receive_snapshot(receive_snapshot_timeout, State3),
2374+
?assertNot(maps:is_key(snapshot_phase, State3b)),
2375+
%% assert the aborted install reset the log
2376+
?assertMatch(#{log := #{last_index := 0}},
2377+
ra_server:overview(State3b)),
2378+
ok.
2379+
23422380
follower_ignores_installs_snapshot_with_higher_machine_version(_Config) ->
23432381
%% currently followers cannot correctly handle snapshots with a higher
23442382
%% machine version so have to ignore them

0 commit comments

Comments
 (0)