Skip to content

Commit 7b727f0

Browse files
committed
Reset log after aborted snapshot installation.
1 parent a941ad4 commit 7b727f0

File tree

8 files changed

+104
-50
lines changed

8 files changed

+104
-50
lines changed

src/ra_kv_harness.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ start_single_peer_node(NodeId, Opts) ->
181181
erpc:call(NodeName, logger, set_primary_config, [level, warning]),
182182
erpc:call(NodeName, application, set_env, [sasl, sasl_error_logger, false]),
183183
erpc:call(NodeName, application, stop, [sasl]),
184+
erpc:call(NodeName, application, set_env,
185+
[kernel, prevent_overlapping_partitions, false]),
184186
Dir = filename:join(BaseDir, NodeName),
185187
{ok, _} = erpc:call(NodeName, ra, start_in, [Dir]),
186188
% Set logger level to reduce verbosity on peer node

src/ra_log.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,7 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
732732
mem_table = Mt0,
733733
last_written_index_term = {LWIdx0, _}} = State0) ->
734734
Cur = ra_snapshot:current(SnapState),
735+
%% TODO: can a log recover to the right reset point? I doubt it
735736
case fetch_term(Idx, State0) of
736737
{undefined, State} when element(1, Cur) =/= Idx ->
737738
%% not found and Idx isn't equal to latest snapshot index
@@ -745,7 +746,7 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
745746
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx),
746747
{ok, State#?MODULE{range = ra_range:limit(Idx + 1, Range),
747748
last_term = SnapTerm,
748-
mem_table = Mt,
749+
mem_table = Mt,
749750
last_written_index_term = Cur}};
750751
{Term, State1} ->
751752
LWIdx = min(Idx, LWIdx0),
@@ -947,8 +948,8 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
947948
current_snapshot = Snap,
948949
snapshot_state = SnapState},
949950
{Reader, CompEffs} = ra_log_segments:schedule_compaction(minor, SnapIdx,
950-
LiveIndexes,
951-
State#?MODULE.reader),
951+
LiveIndexes,
952+
State#?MODULE.reader),
952953
Effects = CompEffs ++ Effects0,
953954
{State#?MODULE{reader = Reader}, Effects};
954955
checkpoint ->
@@ -1165,7 +1166,6 @@ assert(#?MODULE{cfg = #cfg{log_id = LogId},
11651166
current_snapshot = CurrSnap,
11661167
live_indexes = LiveIndexes
11671168
} = State) ->
1168-
ra_log_segments:range/1
11691169
%% TODO: remove this at some point?
11701170
?DEBUG("~ts: ra_log: asserting Range ~p Snapshot ~p",
11711171
[LogId, Range, CurrSnap]),

src/ra_server.erl

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1569,11 +1569,8 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15691569
%% the snapshot sending must have been interrupted and restarted
15701570
%% during the init or pre-phase
15711571
%% abort the snapshot, and revert to follower
1572-
SnapState0 = ra_log:snapshot_state(Log00),
1573-
SnapState = ra_snapshot:abort_accept(SnapState0),
1574-
Log = ra_log:set_snapshot_state(SnapState, Log00),
1575-
{follower, maps:remove(snapshot_phase, State0#{log => Log}),
1576-
[{next_event, Rpc}]};
1572+
State = abort_receive(State0),
1573+
{follower, State, [{next_event, Rpc}]};
15771574
pre when is_list(ChunkOrEntries) ->
15781575
[{_FstIdx, _, _} | _] = ChunkOrEntries,
15791576
% ?DEBUG("~ts: receiving snapshot chunk pre first index ~b snap index ~b, term ~b",
@@ -1662,71 +1659,50 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16621659
end;
16631660
handle_receive_snapshot(#append_entries_rpc{term = Term} = Msg,
16641661
#{current_term := CurTerm,
1665-
cfg := #cfg{log_id = LogId},
1666-
log := Log0} = State)
1662+
cfg := #cfg{log_id = LogId}} = State0)
16671663
when Term > CurTerm ->
16681664
?INFO("~ts: follower receiving snapshot saw append_entries_rpc from ~w for term ~b "
16691665
"abdicates term: ~b!",
16701666
[LogId, Msg#append_entries_rpc.leader_id,
16711667
Term, CurTerm]),
1672-
SnapState0 = ra_log:snapshot_state(Log0),
1673-
SnapState = ra_snapshot:abort_accept(SnapState0),
1674-
Log = ra_log:set_snapshot_state(SnapState, Log0),
1675-
{follower, maps:remove(snapshot_phase,
1676-
update_term(Term,
1677-
clear_leader_id(State#{log => Log}))),
1678-
[{next_event, Msg}]};
1668+
State = abort_receive(State0),
1669+
{follower, update_term(Term, State), [{next_event, Msg}]};
16791670
handle_receive_snapshot({ra_log_event, Evt},
16801671
#{cfg := #cfg{log_id = _LogId},
16811672
log := Log0} = State) ->
1682-
% ?DEBUG("~ts: ~s ra_log_event received: ~w",
1683-
% [LogId, ?FUNCTION_NAME, Evt]),
16841673
% simply forward all other events to ra_log
16851674
% whilst the snapshot is being received
16861675
{Log, Effects} = ra_log:handle_event(Evt, Log0),
16871676
{receive_snapshot, State#{log => Log}, Effects};
16881677
handle_receive_snapshot(receive_snapshot_timeout,
1689-
#{cfg := #cfg{log_id = LogId},
1690-
log := Log0} = State) ->
1678+
#{cfg := #cfg{log_id = LogId}} = State0) ->
16911679
?INFO("~ts: ~s receive snapshot timed out.",
16921680
[LogId, ?FUNCTION_NAME]),
1693-
SnapState0 = ra_log:snapshot_state(Log0),
1694-
SnapState = ra_snapshot:abort_accept(SnapState0),
1695-
Log = ra_log:set_snapshot_state(SnapState, Log0),
1696-
{follower, maps:remove(snapshot_phase, State#{log => Log}), []};
1681+
State = abort_receive(State0),
1682+
{follower, State, []};
16971683
handle_receive_snapshot(#info_rpc{term = Term} = Msg,
16981684
#{current_term := CurTerm,
1699-
cfg := #cfg{log_id = LogId},
1700-
log := Log0} = State)
1685+
cfg := #cfg{log_id = LogId}} = State0)
17011686
when CurTerm < Term ->
17021687
?INFO("~ts: follower receiving snapshot saw info_rpc from ~w for term ~b "
1703-
"abdicates term: ~b!",
1688+
"current term: ~b!",
17041689
[LogId, Msg#info_rpc.from,
17051690
Term, CurTerm]),
1706-
SnapState0 = ra_log:snapshot_state(Log0),
1707-
SnapState = ra_snapshot:abort_accept(SnapState0),
1708-
Log = ra_log:set_snapshot_state(SnapState, Log0),
1709-
{follower, maps:remove(snapshot_phase,
1710-
update_term(Term, clear_leader_id(State#{log => Log}))),
1711-
[{next_event, Msg}]};
1691+
State = abort_receive(State0),
1692+
{follower, update_term(Term, State), [{next_event, Msg}]};
17121693
handle_receive_snapshot(#info_rpc{} = InfoRpc, State) ->
17131694
InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
17141695
{receive_snapshot, State, [InfoReplyEffect]};
17151696
handle_receive_snapshot(#info_reply{term = Term} = Msg,
17161697
#{current_term := CurTerm,
1717-
cfg := #cfg{log_id = LogId},
1718-
log := Log0} = State)
1698+
cfg := #cfg{log_id = LogId}} = State0)
17191699
when CurTerm < Term ->
17201700
?INFO("~ts: follower receiving snapshot saw info_reply from ~w for term ~b "
17211701
"abdicates term: ~b!",
17221702
[LogId, Msg#info_reply.from,
17231703
Term, CurTerm]),
1724-
SnapState0 = ra_log:snapshot_state(Log0),
1725-
SnapState = ra_snapshot:abort_accept(SnapState0),
1726-
Log = ra_log:set_snapshot_state(SnapState, Log0),
1727-
{follower, maps:remove(snapshot_phase,
1728-
update_term(Term, clear_leader_id(State#{log => Log}))),
1729-
[{next_event, Msg}]};
1704+
State = abort_receive(State0),
1705+
{follower, update_term(Term, State), [{next_event, Msg}]};
17301706
handle_receive_snapshot(#info_reply{}, State) ->
17311707
{receive_snapshot, State, []};
17321708
handle_receive_snapshot(Msg, State) ->
@@ -1735,6 +1711,21 @@ handle_receive_snapshot(Msg, State) ->
17351711
%% TODO: work out what else to handle
17361712
{receive_snapshot, State, [{reply, {error, {unsupported_call, Msg}}}]}.
17371713

1714+
abort_receive(#{snapshot_phase := Phase,
1715+
last_applied := LastApplied,
1716+
log := Log0} = State) ->
1717+
SnapState0 = ra_log:snapshot_state(Log0),
1718+
SnapState = ra_snapshot:abort_accept(SnapState0),
1719+
Log1 = ra_log:set_snapshot_state(SnapState, Log0),
1720+
Log = case Phase of
1721+
pre ->
1722+
{ok, Log2} = ra_log:set_last_index(LastApplied, Log1),
1723+
Log2;
1724+
_ ->
1725+
Log1
1726+
end,
1727+
clear_leader_id(maps:remove(snapshot_phase, State#{log => Log})).
1728+
17381729
-spec handle_await_condition(ra_msg(), ra_server_state()) ->
17391730
{ra_state(), ra_server_state(), effects()}.
17401731
handle_await_condition(#request_vote_rpc{} = Msg, State) ->

src/ra_server_proc.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,14 @@ receive_snapshot(EventType, {aux_command, Cmd}, State0) ->
939939
?HANDLE_EFFECTS(Effects, EventType,
940940
State0#state{server_state = ServerState}),
941941
{keep_state, State, Actions};
942+
receive_snapshot(info, {'DOWN', MRef, process, _Pid, _Info},
943+
#state{leader_monitor = MRef} = State) ->
944+
%% leader is down
945+
?INFO("~ts: receive_snapshot - Leader monitor down. Aborting snapshot receive. "
946+
"Entering follower state.",
947+
[log_id(State)]),
948+
receive_snapshot(info, receive_snapshot_timeout,
949+
State#state{leader_monitor = undefined});
942950
receive_snapshot(EventType, Msg, State0) ->
943951
case handle_receive_snapshot(Msg, State0) of
944952
{receive_snapshot, State1, Effects} ->

src/ra_snapshot.erl

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,18 @@ find_snapshots(#?MODULE{uid = UId,
204204
State;
205205
Current0 ->
206206
Current = filename:join(SnapshotsDir, Current0),
207-
{ok, #{index := Idx, term := Term}} = Module:read_meta(Current),
208-
%% TODO: recover live indexes and record that
209-
ok = ra_log_snapshot_state:insert(?ETSTBL, UId, Idx, Idx+1, []),
207+
{ok, #{index := Idx,
208+
term := Term}} = Module:read_meta(Current),
209+
%% recover live indexes and record that
210+
{ok, Indexes} = indexes(Current),
211+
SmallestLiveIdx = case ra_seq:first(Indexes) of
212+
undefined ->
213+
Idx+1;
214+
First ->
215+
First
216+
end,
217+
ok = ra_log_snapshot_state:insert(?ETSTBL, UId, Idx, SmallestLiveIdx,
218+
Indexes),
210219

211220
ok = delete_snapshots(SnapshotsDir, lists:delete(Current0, Snaps)),
212221
%% delete old snapshots if any
@@ -494,7 +503,7 @@ complete_snapshot({Idx, _} = IdxTerm, snapshot, LiveIndexes,
494503
I ->
495504
I
496505
end,
497-
%% TODO live indexes
506+
%% live indexes
498507
ok = ra_log_snapshot_state:insert(?ETSTBL, UId, Idx, SmallestIdx,
499508
LiveIndexes),
500509
State#?MODULE{pending = undefined,

test/ra_log_2_SUITE.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,7 +1381,6 @@ snapshot_installation_with_live_indexes(Config) ->
13811381

13821382

13831383
run_effs(Effs4),
1384-
ct:pal("o ~p", [ra_log:overview(Log3)]),
13851384
{15, _} = ra_log:last_index_term(Log3),
13861385
{15, _} = ra_log:last_written(Log3),
13871386
%% write the next index, bearning in mind the last index the WAL saw
@@ -1392,15 +1391,22 @@ snapshot_installation_with_live_indexes(Config) ->
13921391
LW = ra_log:last_written(L),
13931392
{16, 2} == LW
13941393
end),
1395-
ct:pal("o ~p", [ra_log:overview(Log5)]),
13961394
ra_log_wal:force_roll_over(ra_log_wal),
13971395
Log = assert_log_events(Log5,
13981396
fun (L) ->
13991397
#{mem_table_range := R} = ra_log:overview(L),
14001398
R == undefined
14011399
end),
14021400
ct:pal("o ~p", [ra_log:overview(Log)]),
1401+
UId = ?config(uid, Config),
1402+
?assertEqual(LiveIndexes, ra_log_snapshot_state:live_indexes(
1403+
ra_log_snapshot_state, UId)),
1404+
ra_log:close(Log),
14031405
flush(),
1406+
_LogAfter = ra_log_init(Config),
1407+
%% validate recovery recovers the live indexes correctly
1408+
?assertEqual(LiveIndexes, ra_log_snapshot_state:live_indexes(
1409+
ra_log_snapshot_state, UId)),
14041410
ok.
14051411

14061412
snapshot_installation(Config) ->

test/ra_log_memory.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ write_meta_f(_Key, _Value, State) ->
284284
can_write(_Log) ->
285285
true.
286286

287-
overview(Log) ->
287+
overview(#state{} = Log) ->
288288
#{type => ?MODULE,
289289
last_index => Log#state.last_index,
290290
last_written => Log#state.last_written,

test/ra_server_SUITE.erl

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ all() ->
6565
leader_receives_install_snapshot_rpc,
6666
follower_installs_snapshot,
6767
follower_installs_snapshot_with_pre,
68+
follower_aborts_snapshot_with_pre,
6869
follower_ignores_installs_snapshot_with_higher_machine_version,
6970
follower_receives_stale_snapshot,
7071
follower_receives_snapshot_lower_than_last_applied,
@@ -182,6 +183,7 @@ setup_log() ->
182183
ra_log_memory:fold(A, B, C, D, E)
183184
end),
184185
meck:expect(ra_log, release_resources, fun ra_log_memory:release_resources/3),
186+
meck:expect(ra_log, overview, fun ra_log_memory:overview/1),
185187
meck:expect(ra_log, append_sync,
186188
fun({Idx, Term, _} = E, L0) ->
187189
L1 = ra_log_memory:append(E, L0),
@@ -2339,6 +2341,42 @@ follower_installs_snapshot_with_pre(_Config) ->
23392341
ra_server:handle_receive_snapshot(ISRpc1, State3),
23402342
ok.
23412343

2344+
follower_aborts_snapshot_with_pre(_Config) ->
2345+
N1 = ?N1, N2 = ?N2, N3 = ?N3,
2346+
#{N3 := {_, State = #{cluster := Config}, _}} =
2347+
init_servers([N1, N2, N3], {module, ra_queue, #{}}),
2348+
LastTerm = 1, % snapshot term
2349+
Term = 2, % leader term
2350+
Idx = 3,
2351+
ISRpcInit = #install_snapshot_rpc{term = Term, leader_id = N1,
2352+
meta = snap_meta(Idx, LastTerm, Config),
2353+
chunk_state = {0, init},
2354+
data = []},
2355+
%% the init message starts the process
2356+
{receive_snapshot, State1,
2357+
[{next_event, ISRpc}, {record_leader_msg, _}]} =
2358+
ra_server:handle_follower(ISRpcInit, State#{current_term => Term}),
2359+
2360+
%% actually process init message in the correct state
2361+
{receive_snapshot, State2, [{reply, _}]} =
2362+
ra_server:handle_receive_snapshot(ISRpc, State1),
2363+
2364+
%% now send a pre message
2365+
ISRpcPre = ISRpcInit#install_snapshot_rpc{chunk_state = {0, pre},
2366+
data = [{2, 1, <<"e1">>}]},
2367+
{receive_snapshot, State3, [{reply, _}]} =
2368+
ra_server:handle_receive_snapshot(ISRpcPre, State2),
2369+
?assertMatch(#{log := #{last_index := 2}},
2370+
ra_server:overview(State3)),
2371+
2372+
{follower, State3b, []} =
2373+
ra_server:handle_receive_snapshot(receive_snapshot_timeout, State3),
2374+
?assertNot(maps:is_key(snapshot_phase, State3b)),
2375+
%% assert the aborted install reset the log
2376+
?assertMatch(#{log := #{last_index := 0}},
2377+
ra_server:overview(State3b)),
2378+
ok.
2379+
23422380
follower_ignores_installs_snapshot_with_higher_machine_version(_Config) ->
23432381
%% currently followers cannot correctly handle snapshots with a higher
23442382
%% machine version so have to ignore them

0 commit comments

Comments
 (0)