Skip to content

Commit 0d83b42

Browse files
committed
snapshot api changes
1 parent 3fa486b commit 0d83b42

File tree

10 files changed

+221
-142
lines changed

10 files changed

+221
-142
lines changed

src/ra.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@
169169
{term :: ra_term(), % the leader's term
170170
leader_id :: ra_server_id(),
171171
meta :: snapshot_meta(),
172-
chunk_state = {0, pre} :: {pos_integer(), chunk_flag()},
172+
chunk_state = {0, pre} :: {non_neg_integer(), chunk_flag()},
173173
data :: term()
174174
}).
175175

src/ra_log.erl

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
next_index/1,
3434
snapshot_state/1,
3535
set_snapshot_state/2,
36-
install_snapshot/3,
36+
install_snapshot/4,
3737
recover_snapshot/1,
3838
snapshot_index_term/1,
3939
update_release_cursor/5,
@@ -60,7 +60,6 @@
6060
-define(MIN_CHECKPOINT_INTERVAL, 16384).
6161
-define(LOG_APPEND_TIMEOUT, 5000).
6262
-define(WAL_RESEND_TIMEOUT, 5000).
63-
-define(ETSTBL, ra_log_snapshot_state).
6463

6564
-type ra_meta_key() :: atom().
6665
-type segment_ref() :: {ra_range:range(), File :: file:filename_all()}.
@@ -926,9 +925,9 @@ snapshot_state(State) ->
926925
set_snapshot_state(SnapState, State) ->
927926
State#?MODULE{snapshot_state = SnapState}.
928927

929-
-spec install_snapshot(ra_idxterm(), module(), state()) ->
930-
{ra_snapshot:meta(), MacState :: term(), state(), effects()}.
931-
install_snapshot({SnapIdx, SnapTerm} = IdxTerm, MacMod,
928+
-spec install_snapshot(ra_idxterm(), module(), ra_seq:state(), state()) ->
929+
{ok, state(), effects()}.
930+
install_snapshot({SnapIdx, SnapTerm} = IdxTerm, MacMod, LiveIndexes,
932931
#?MODULE{cfg = #cfg{uid = UId,
933932
names = Names} = Cfg,
934933
snapshot_state = SnapState0,
@@ -951,30 +950,18 @@ install_snapshot({SnapIdx, SnapTerm} = IdxTerm, MacMod,
951950
last_index = SnapIdx,
952951
last_term = SnapTerm,
953952
last_written_index_term = IdxTerm},
954-
{Meta, MacState} = recover_snapshot(State),
955-
LiveIndexes = ra_machine:live_indexes(MacMod, MacState),
956-
%% TODO: it is not safe to write the indexes _after_ if we then treat
957-
%% the persisted indexes as authoritative as if we crash in between
958-
%% it may compact segments that still contain live indexes
959-
SmallestLiveIndex = ra_seq:first(LiveIndexes),
960-
SnapDir = ra_snapshot:current_snapshot_dir(SnapState),
961-
ok = ra_snapshot:write_indexes(SnapDir, LiveIndexes),
962-
%% TODO: more mt entries could potentially be cleared up here
953+
%% TODO: more mt entries could potentially be cleared up in the
954+
%% mem table here
955+
SmallestLiveIndex = case ra_seq:first(LiveIndexes) of
956+
undefined ->
957+
SnapIdx + 1;
958+
I ->
959+
I
960+
end,
963961
{Spec, Mt} = ra_mt:set_first(SmallestLiveIndex, Mt0),
964962
ok = exec_mem_table_delete(Names, UId, Spec),
965-
%% TODO: move this to install_snapshot so we can work out the
966-
%% live indexes
967-
SmallestIdx = case LiveIndexes of
968-
[] ->
969-
SnapIdx + 1;
970-
_ ->
971-
ra_seq:first(LiveIndexes)
972-
end,
973-
ok = ra_log_snapshot_state:insert(?ETSTBL, UId, SnapIdx, SmallestIdx,
974-
LiveIndexes),
975-
{Meta, MacState, State#?MODULE{live_indexes = LiveIndexes,
976-
mem_table = Mt
977-
},
963+
{ok, State#?MODULE{live_indexes = LiveIndexes,
964+
mem_table = Mt},
978965
CompEffs ++ CPEffects}.
979966

980967

src/ra_server.erl

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1516,9 +1516,10 @@ handle_follower(Msg, State) ->
15161516
handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15171517
meta = #{index := SnapIndex,
15181518
machine_version := SnapMacVer,
1519+
cluster := ClusterIds,
15191520
term := SnapTerm} = SnapMeta,
15201521
chunk_state = {Num, ChunkFlag},
1521-
data = Data},
1522+
data = ChunkOrEntries},
15221523
#{cfg := #cfg{id = Id,
15231524
log_id = LogId,
15241525
effective_machine_version = CurEffMacVer,
@@ -1536,7 +1537,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15361537
last_term = SnapTerm,
15371538
last_index = SnapIndex},
15381539
case ChunkFlag of
1539-
pre ->
1540+
pre when is_list(ChunkOrEntries) ->
15401541
%% TODO: we may need to reset the log here to
15411542
%% the last applied index as we
15421543
%% dont know for sure indexes after last applied
@@ -1546,22 +1547,20 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15461547
fun ({I, _, _} = E, {L0, LstIdx}) ->
15471548
{ok, L} = ra_log:write_sparse(E, LstIdx, L0),
15481549
{L, I}
1549-
end, {Log00, LastIndex}, Data),
1550+
end, {Log00, LastIndex}, ChunkOrEntries),
15501551
State = update_term(Term, State0#{log => Log0}),
15511552
{receive_snapshot, State, [{reply, Reply}]};
15521553
next ->
15531554
SnapState0 = ra_log:snapshot_state(Log00),
1554-
{ok, SnapState, Effs0} =
1555-
ra_snapshot:accept_chunk(Data, Num, ChunkFlag,
1556-
SnapState0),
1555+
SnapState = ra_snapshot:accept_chunk(ChunkOrEntries, Num, SnapState0),
15571556
Log0 = ra_log:set_snapshot_state(SnapState, Log00),
15581557
State = update_term(Term, State0#{log => Log0}),
1559-
{receive_snapshot, State, [{reply, Reply} | Effs0]};
1558+
{receive_snapshot, State, [{reply, Reply}]};
15601559
last ->
15611560
SnapState0 = ra_log:snapshot_state(Log00),
1562-
{ok, SnapState, Effs0} =
1563-
ra_snapshot:accept_chunk(Data, Num, ChunkFlag,
1564-
SnapState0),
1561+
{SnapState, MacState, LiveIndexes, Effs0} =
1562+
ra_snapshot:complete_accept(ChunkOrEntries, Num, Machine,
1563+
SnapState0),
15651564
Log0 = ra_log:set_snapshot_state(SnapState, Log00),
15661565
%% if the machine version of the snapshot is higher
15671566
%% we also need to update the current effective machine configuration
@@ -1579,9 +1578,9 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15791578
Cfg0
15801579
end,
15811580
%% this is the last chunk so we can "install" it
1582-
{#{cluster := ClusterIds},
1583-
MacState, Log, Effs} = ra_log:install_snapshot({SnapIndex, SnapTerm},
1584-
EffMacMod, Log0),
1581+
{ok, Log, Effs} = ra_log:install_snapshot({SnapIndex, SnapTerm},
1582+
EffMacMod,
1583+
LiveIndexes, Log0),
15851584
OldServerIds = maps:map(fun (_, V) ->
15861585
maps:with([voter_status], V)
15871586
end, Cluster),

src/ra_server_proc.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1912,6 +1912,8 @@ send_snapshots(Id, Term, {_, ToNode} = To, ChunkSize,
19121912
ra_snapshot:begin_read(SnapState, Context),
19131913

19141914
%% only send the snapshot if the target server can accept it
1915+
%% TODO: grab the last_applied index also and use this to floor
1916+
%% the live indexes
19151917
TheirMacVer = erpc:call(ToNode, ra_machine, version, [Machine]),
19161918

19171919
%% rpc the check what their

src/ra_snapshot.erl

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
-include("ra.hrl").
1010

11+
-include_lib("stdlib/include/assert.hrl").
1112
-type file_err() :: ra_lib:file_err().
1213

1314
%% alias
@@ -33,7 +34,8 @@
3334
complete_snapshot/4,
3435

3536
begin_accept/2,
36-
accept_chunk/4,
37+
accept_chunk/3,
38+
complete_accept/4,
3739
abort_accept/1,
3840

3941
context/2,
@@ -70,6 +72,7 @@
7072
-record(accept, {%% the next expected chunk
7173
next = 1 :: non_neg_integer(),
7274
state :: term(),
75+
machine_version :: non_neg_integer(),
7376
idxterm :: ra_idxterm()}).
7477

7578
-record(?MODULE,
@@ -503,26 +506,30 @@ complete_snapshot(IdxTerm, checkpoint, _LiveIndexes,
503506

504507
-spec begin_accept(meta(), state()) ->
505508
{ok, state()}.
506-
begin_accept(#{index := Idx, term := Term} = Meta,
509+
begin_accept(#{index := Idx,
510+
machine_version := SnapMacVer,
511+
term := Term} = Meta,
507512
#?MODULE{module = Mod,
508513
snapshot_directory = Dir} = State) ->
509514
SnapDir = make_snapshot_dir(Dir, Idx, Term),
510515
ok = ra_lib:make_dir(SnapDir),
511516
{ok, AcceptState} = Mod:begin_accept(SnapDir, Meta),
512517
{ok, State#?MODULE{accepting = #accept{idxterm = {Idx, Term},
518+
machine_version = SnapMacVer,
513519
state = AcceptState}}}.
514520

515-
-spec accept_chunk(term(), non_neg_integer(), chunk_flag(), state()) ->
516-
{ok, state(), [effect()]}.
517-
accept_chunk(Chunk, Num, last,
518-
#?MODULE{uid = _UId,
519-
module = Mod,
520-
snapshot_directory = Dir,
521-
current = Current,
522-
pending = Pending,
523-
accepting = #accept{next = Num,
524-
idxterm = {_Idx, _} = IdxTerm,
525-
state = AccState}} = State) ->
521+
-spec complete_accept(Chunk :: term(), Num :: non_neg_integer(),
522+
Machine :: ra_machine:machine(), state()) ->
523+
{state(), MacState :: term(), ra_seq:state(), [effect()]}.
524+
complete_accept(Chunk, Num, Machine,
525+
#?MODULE{uid = UId,
526+
module = Mod,
527+
snapshot_directory = Dir,
528+
current = Current,
529+
pending = Pending,
530+
accepting = #accept{next = Num,
531+
idxterm = {Idx, Term} = IdxTerm,
532+
state = AccState}} = State0) ->
526533
%% last chunk
527534
ok = Mod:complete_accept(Chunk, AccState),
528535
%% run validate here?
@@ -536,25 +543,44 @@ accept_chunk(Chunk, Num, last,
536543
Eff = {bg_work,
537544
fun() -> [delete(Dir, Del) || Del <- Dels] end,
538545
fun (_) -> ok end},
546+
State = State0#?MODULE{accepting = undefined,
547+
%% reset any pending snapshot writes
548+
pending = undefined,
549+
current = IdxTerm},
550+
{ok, #{machine_version := SnapMacVer}, MacState} = recover(State),
551+
SnapMacMod = ra_machine:which_module(Machine, SnapMacVer),
552+
LiveIndexes = ra_machine:live_indexes(SnapMacMod, MacState),
553+
SnapDir = make_snapshot_dir(Dir, Idx, Term),
554+
ok = write_indexes(SnapDir, LiveIndexes),
555+
%% delete accepting marker file
556+
AcceptMarker = filename:join(SnapDir, <<"accepting">>),
557+
_ = prim_file:delete(AcceptMarker),
558+
%% assert accepting marker is no longer there
559+
?assertNot(filelib:is_file(AcceptMarker)),
560+
SmallestIdx = case ra_seq:first(LiveIndexes) of
561+
undefined ->
562+
Idx + 1;
563+
I ->
564+
I
565+
end,
566+
ok = ra_log_snapshot_state:insert(?ETSTBL, UId, Idx, SmallestIdx,
567+
LiveIndexes),
568+
{State, MacState, LiveIndexes, [Eff]}.
539569

540-
{ok, State#?MODULE{accepting = undefined,
541-
%% reset any pending snapshot writes
542-
pending = undefined,
543-
current = IdxTerm}, [Eff]};
544-
accept_chunk(Chunk, Num, next,
545-
#?MODULE{module = Mod,
546-
accepting =
547-
#accept{state = AccState0,
548-
next = Num} = Accept} = State) ->
570+
-spec accept_chunk(Chunk :: term(), Num :: non_neg_integer(), state()) ->
571+
state().
572+
accept_chunk(Chunk, Num, #?MODULE{module = Mod,
573+
accepting =
574+
#accept{state = AccState0,
575+
next = Num} = Accept} = State) ->
549576
{ok, AccState} = Mod:accept_chunk(Chunk, AccState0),
550-
{ok, State#?MODULE{accepting = Accept#accept{state = AccState,
551-
next = Num + 1}},
552-
[]};
553-
accept_chunk(_Chunk, Num, _ChunkFlag,
577+
State#?MODULE{accepting = Accept#accept{state = AccState,
578+
next = Num + 1}};
579+
accept_chunk(_Chunk, Num,
554580
#?MODULE{accepting = #accept{next = Next}} = State)
555581
when Next > Num ->
556582
%% this must be a resend - we can just ignore it
557-
{ok, State, []}.
583+
State.
558584

559585
-spec abort_accept(state()) -> state().
560586
abort_accept(#?MODULE{accepting = undefined} = State) ->

test/ra_kv_SUITE.erl

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -73,40 +73,56 @@ basics(_Config) ->
7373
?assertMatch({ok, #{machine := #{num_keys := 2}}, KvId},
7474
ra:member_overview(KvId)),
7575
ra_log_wal:force_roll_over(ra_log_wal),
76-
timer:sleep(1000),
76+
%% wait for rollover processing
77+
ra_log_wal:last_writer_seq(ra_log_wal, <<>>),
78+
%% wait for segment writer to process
79+
ra_log_segment_writer:await(ra_log_segment_writer),
80+
%% promt ra_kv to take a snapshot
7781
ok = ra:aux_command(KvId, take_snapshot),
78-
timer:sleep(1000),
82+
%% wait for snapshot to complete
83+
ok = ra_lib:retry(
84+
fun () ->
85+
{ok, #{log := #{snapshot_index := SnapIdx,
86+
num_segments := NumSegments,
87+
last_index := LastIdx}}, _} =
88+
ra:member_overview(KvId),
89+
SnapIdx == LastIdx andalso NumSegments == 2
90+
end, 100, 100),
91+
%% restart server to test recovery
7992
ok = ra:stop_server(default, KvId),
8093
ok = ra:restart_server(default, KvId),
8194
{ok, #{index := LastIdx}} = ra_kv:put(KvId, <<"k3">>, <<"k3">>, 5000),
82-
ct:pal("overview after ~p", [ra:member_overview(KvId)]),
83-
{ok, #{machine := #{live_indexes := Live}}, _} = ra:member_overview(KvId),
95+
{ok, #{machine := #{live_indexes := Live},
96+
log := #{last_index := KvIdLastIdx}}, _} = ra:member_overview(KvId),
8497
{ok, {Reads, _}} = ra_server_proc:read_entries(KvId, [LastIdx | Live],
8598
undefined, 1000),
86-
ct:pal("ReadRes ~p", [Reads]),
87-
88-
% debugger:start(),
89-
% int:i(ra_log),
90-
% int:i(ra_snapshot),
91-
% int:i(ra_server_proc),
92-
% int:break(ra_server_proc, 1922),
93-
% int:break(ra_log, 873),
94-
% int:break(ra_log, 1002),
95-
% int:break(ra_log, 1328),
99+
?assertEqual(3, map_size(Reads)),
100+
% ct:pal("ReadRes ~p", [Reads]),
96101
KvId2 = {kv2, node()},
97102
ok = ra_kv:add_member(?SYS, KvId2, KvId),
98-
timer:sleep(1000),
103+
ok = ra_lib:retry(
104+
fun () ->
105+
{ok, #{log := #{last_index := Last}}, _} =
106+
ra:member_overview(KvId2),
107+
Last >= KvIdLastIdx
108+
end, 100, 100),
99109
{ok, {Reads2, _}} = ra_server_proc:read_entries(KvId2, [LastIdx | Live],
100110
undefined, 1000),
101-
ct:pal("ReadRes2 ~p", [Reads2]),
102-
ct:pal("overview ~p", [ra:member_overview(KvId2)]),
103111
?assertEqual(3, map_size(Reads2)),
104112
ra_log_wal:force_roll_over(ra_log_wal),
105-
timer:sleep(1000),
113+
ra_log_wal:last_writer_seq(ra_log_wal, <<>>),
114+
ra_log_segment_writer:await(ra_log_segment_writer),
106115
{ok, {Reads3, _}} = ra_server_proc:read_entries(KvId2, [LastIdx | Live],
107116
undefined, 1000),
108117
ct:pal("ReadRes3 ~p", [Reads3]),
109-
ct:pal("overview3 ~p", [ra:member_overview(KvId2)]),
118+
% ct:pal("overview3 ~p", [ra:member_overview(KvId2)]),
110119
?assertEqual(3, map_size(Reads3)),
111120

121+
%% TODO: test recovery of kv
122+
ok = ra:stop_server(default, KvId2),
123+
ok = ra:restart_server(default, KvId2),
124+
{ok, {Reads4, _}} = ra_server_proc:read_entries(KvId2, [LastIdx | Live],
125+
undefined, 1000),
126+
127+
?assertEqual(3, map_size(Reads4)),
112128
ok.

0 commit comments

Comments
 (0)