Skip to content

Commit c9b2ccd

Browse files
committed
Sparse write fixes
1 parent 5f09e81 commit c9b2ccd

File tree

5 files changed

+166
-55
lines changed

5 files changed

+166
-55
lines changed

src/ra_log.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,8 @@ write_sparse({Idx, Term, _} = Entry, PrevIdx0,
506506
Tid = ra_mt:tid(Mt),
507507
PrevIdx = case PrevIdx0 of
508508
undefined ->
509-
Idx - 1;
509+
%% this is likely to always be accepted
510+
0;
510511
_ ->
511512
PrevIdx0
512513
end,
@@ -707,6 +708,7 @@ last_written(#?MODULE{last_written_index_term = LWTI}) ->
707708
set_last_index(Idx, #?MODULE{cfg = Cfg,
708709
range = Range,
709710
snapshot_state = SnapState,
711+
mem_table = Mt0,
710712
last_written_index_term = {LWIdx0, _}} = State0) ->
711713
Cur = ra_snapshot:current(SnapState),
712714
case fetch_term(Idx, State0) of
@@ -716,10 +718,13 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
716718
{_, State} when element(1, Cur) =:= Idx ->
717719
{_, SnapTerm} = Cur,
718720
%% Idx is equal to the current snapshot
721+
{ok, Mt} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
722+
Cfg#cfg.uid, Mt0),
719723
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
720724
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx),
721725
{ok, State#?MODULE{range = ra_range:limit(Idx + 1, Range),
722726
last_term = SnapTerm,
727+
mem_table = Mt,
723728
last_written_index_term = Cur}};
724729
{Term, State1} ->
725730
LWIdx = min(Idx, LWIdx0),
@@ -729,10 +734,13 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
729734
%% to write to the mem table it will detect this and open
730735
%% a new one
731736
true = LWTerm =/= undefined,
737+
{ok, Mt} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
738+
Cfg#cfg.uid, Mt0),
732739
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
733740
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx),
734741
{ok, State2#?MODULE{range = ra_range:limit(Idx + 1, Range),
735742
last_term = Term,
743+
mem_table = Mt,
736744
last_written_index_term = {LWIdx, LWTerm}}}
737745
end.
738746

src/ra_mt.erl

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ insert({Idx, _, _} = _Entry,
124124
overwriting |
125125
gap_detected |
126126
limit_reached}.
127+
insert_sparse({Idx, _, _} = Entry, _LastIdx,
128+
#?MODULE{tid = Tid,
129+
indexes = []} = State) ->
130+
%% when the indexes is empty always accept the next entry
131+
true = ets:insert(Tid, Entry),
132+
{ok, State#?MODULE{indexes = ra_seq:append(Idx, [])}};
127133
insert_sparse({Idx, _, _} = Entry, LastIdx,
128134
#?MODULE{tid = Tid,
129135
indexes = Seq} = State) ->
@@ -195,23 +201,6 @@ commit(#?MODULE{tid = Tid,
195201
{PrevStaged ++ Staged, State#?MODULE{staged = undefined,
196202
prev = Prev}}.
197203

198-
% -spec abort(state()) -> state().
199-
% abort(#?MODULE{staged = undefined} = State) ->
200-
% State;
201-
% abort(#?MODULE{staged = {_, Staged},
202-
% indexes = Range,
203-
% prev = Prev0} = State) ->
204-
% Prev = case Prev0 of
205-
% undefined ->
206-
% Prev0;
207-
% _ ->
208-
% abort(Prev0)
209-
% end,
210-
% {Idx, _, _} = lists:last(Staged),
211-
% State#?MODULE{staged = undefined,
212-
% indexes = ra_range:limit(Idx, Range),
213-
% prev = Prev}.
214-
215204
-spec lookup(ra:index(), state()) ->
216205
log_entry() | undefined.
217206
lookup(Idx, #?MODULE{staged = {FstStagedIdx, Staged}})
@@ -330,8 +319,9 @@ delete({Op, Tid, Idx})
330319
DelSpec = [{{'$1', '_', '_'}, [{'<', '$1', Idx}], [true]}],
331320
ets:select_delete(Tid, DelSpec);
332321
delete({delete, Tid}) ->
322+
Sz= ets:info(Tid, size),
333323
true = ets:delete(Tid),
334-
0.
324+
Sz.
335325

336326
-spec range_overlap(ra:range(), state()) ->
337327
{Overlap :: ra:range(), Remainder :: ra:range()}.
@@ -354,14 +344,18 @@ range_overlap(ReqRange, #?MODULE{} = State) ->
354344
range(#?MODULE{indexes = Seq,
355345
prev = undefined}) ->
356346
ra_seq:range(Seq);
357-
range(#?MODULE{indexes = []}) ->
358-
undefined;
347+
range(#?MODULE{indexes = [],
348+
prev = Prev}) ->
349+
range(Prev);
359350
range(#?MODULE{indexes = Seq,
360351
prev = Prev}) ->
361-
End = ra_seq:last(Seq),
362-
Range = ra_seq:range(Seq),
363-
PrevRange = ra_range:limit(End, range(Prev)),
364-
ra_range:add(Range, PrevRange);
352+
{Start, End} = Range = ra_seq:range(Seq),
353+
case ra_range:limit(End, range(Prev)) of
354+
undefined ->
355+
Range;
356+
{PrevStart, _PrevEnd} ->
357+
ra_range:new(min(Start, PrevStart), End)
358+
end;
365359
range(_State) ->
366360
undefined.
367361

@@ -439,8 +433,12 @@ set_first(Idx, #?MODULE{tid = Tid,
439433
%% set_first/2 returned a range spec for
440434
%% prev and prev is now empty,
441435
%% upgrade to delete spec of whole tid
442-
case range(P) of
443-
undefined ->
436+
%% also upgrade if the outer seq is truncated
437+
%% by the set_first operation
438+
% case range_shallow(P) of
439+
case Idx >= ra_seq:first(Seq) orelse
440+
range_shallow(P) == undefined of
441+
true ->
444442
{[{delete, tid(P)} | Rem],
445443
prev(P)};
446444
_ ->
@@ -485,3 +483,5 @@ read_sparse([Next | Rem] = Indexes, State, Num, Acc) ->
485483
read_sparse(Rem, State, Num + 1, [Entry | Acc])
486484
end.
487485

486+
range_shallow(#?MODULE{indexes = Seq}) ->
487+
ra_seq:range(Seq).

src/ra_server.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,12 +1551,12 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15511551
%% reset last index to last applied
15521552
%% as we dont know for sure indexes after last applied
15531553
%% are of the right term
1554-
{LastIndex, _} = ra_log:last_index_term(Log00),
1554+
{LastIdx, _} = ra_log:last_index_term(Log00),
15551555
{Log, _} = lists:foldl(
15561556
fun ({I, _, _} = E, {L0, LstIdx}) ->
15571557
{ok, L} = ra_log:write_sparse(E, LstIdx, L0),
15581558
{L, I}
1559-
end, {Log00, LastIndex}, ChunkOrEntries),
1559+
end, {Log00, LastIdx}, ChunkOrEntries),
15601560
?DEBUG("~ts: receiving snapshot log last index ~p",
15611561
[LogId, ra_log:last_index_term(Log)]),
15621562
State = update_term(Term, State0#{log => Log}),

test/ra_log_2_SUITE.erl

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ all_tests() ->
3333
last_written_overwrite,
3434
last_written_overwrite_2,
3535
last_index_reset,
36+
write_sparse_after_index_reset,
37+
write_sparse_after_index_reset_segments,
3638
last_index_reset_before_written,
3739
recovery,
3840
recover_many,
@@ -721,19 +723,97 @@ last_written_overwrite_2(Config) ->
721723

722724
last_index_reset(Config) ->
723725
Log0 = ra_log_init(Config),
724-
Log1 = write_n(1, 5, 1, Log0),
726+
Log1 = write_n(1, 6, 1, Log0),
727+
Pred = fun (L) ->
728+
{5, 1} == ra_log:last_written(L)
729+
end,
730+
Log2 = assert_log_events(Log1, Pred, 2000),
731+
6 = ra_log:next_index(Log2),
732+
{5, 1} = ra_log:last_index_term(Log2),
733+
% reverts last index to a previous index
734+
% needs to be done if a new leader sends an empty AER
735+
{ok, Log3} = ra_log:set_last_index(3, Log2),
736+
{3, 1} = ra_log:last_written(Log3),
737+
4 = ra_log:next_index(Log3),
738+
{3, 1} = ra_log:last_index_term(Log3),
739+
O = ra_log:overview(Log3),
740+
ct:pal("o ~p", [O]),
741+
?assertMatch(#{range := {0, 3},
742+
%% we have a new mem table but the mem table does not know
743+
%% whatever the first index should be so reports the
744+
%% full previous range, this will be corrected after the
745+
%% next write at index 4
746+
mem_table_range := {0, 5}},
747+
O),
748+
{ok, Log} = ra_log:write([{4, 2, hi}], Log3),
749+
O2 = ra_log:overview(Log),
750+
ct:pal("o ~p", [O2]),
751+
?assertMatch(#{range := {0, 4},
752+
mem_table_range := {0, 4}},
753+
O2),
754+
ok.
755+
756+
write_sparse_after_index_reset(Config) ->
757+
Log0 = ra_log_init(Config),
758+
Log1 = write_n(1, 6, 1, Log0),
725759
Pred = fun (L) ->
726-
{4, 1} == ra_log:last_written(L)
760+
{5, 1} == ra_log:last_written(L)
727761
end,
728762
Log2 = assert_log_events(Log1, Pred, 2000),
729-
5 = ra_log:next_index(Log2),
730-
{4, 1} = ra_log:last_index_term(Log2),
763+
6 = ra_log:next_index(Log2),
764+
{5, 1} = ra_log:last_index_term(Log2),
731765
% reverts last index to a previous index
732766
% needs to be done if a new leader sends an empty AER
733767
{ok, Log3} = ra_log:set_last_index(3, Log2),
734768
{3, 1} = ra_log:last_written(Log3),
735769
4 = ra_log:next_index(Log3),
736770
{3, 1} = ra_log:last_index_term(Log3),
771+
O = ra_log:overview(Log3),
772+
ct:pal("o ~p", [O]),
773+
{ok, Log4} = ra_log:write_sparse({7, 1, hi}, undefined, Log3),
774+
{ok, Log} = ra_log:write_sparse({17, 1, hi}, 7, Log4),
775+
O2 = ra_log:overview(Log),
776+
ct:pal("o ~p", [O2]),
777+
?assertMatch(#{range := {0, 17},
778+
mem_table_range := {0, 17}},
779+
O2),
780+
ok.
781+
782+
write_sparse_after_index_reset_segments(Config) ->
783+
Log0 = ra_log_init(Config),
784+
Log1 = write_n(1, 6, 1, Log0),
785+
Pred = fun (L) ->
786+
{5, 1} == ra_log:last_written(L)
787+
end,
788+
Log2 = assert_log_events(Log1, Pred, 2000),
789+
6 = ra_log:next_index(Log2),
790+
{5, 1} = ra_log:last_index_term(Log2),
791+
ra_log_wal:force_roll_over(ra_log_wal),
792+
Log2b = deliver_all_log_events(Log2, 500),
793+
% reverts last index to a previous index
794+
% needs to be done if a new leader sends an empty AER
795+
{ok, Log3} = ra_log:set_last_index(3, Log2b),
796+
{3, 1} = ra_log:last_written(Log3),
797+
4 = ra_log:next_index(Log3),
798+
{3, 1} = ra_log:last_index_term(Log3),
799+
O = ra_log:overview(Log3),
800+
ct:pal("o ~p", [O]),
801+
{ok, Log4} = ra_log:write_sparse({7, 1, hi}, undefined, Log3),
802+
{ok, Log5} = ra_log:write_sparse({17, 1, hi}, 7, Log4),
803+
Log5b = deliver_all_log_events(Log5, 500),
804+
O2 = ra_log:overview(Log5b),
805+
?assertMatch(#{range := {0, 17},
806+
mem_table_range := {7, 17}},
807+
O2),
808+
809+
%% try overwrite again
810+
{ok, Log6} = ra_log:set_last_index(3, Log5b),
811+
{3, 1} = ra_log:last_index_term(Log6),
812+
{ok, Log7} = ra_log:write_sparse({7, 1, hi}, undefined, Log6),
813+
{ok, Log8} = ra_log:write_sparse({17, 1, hi}, 7, Log7),
814+
Log = deliver_all_log_events(Log8, 500),
815+
O5 = ra_log:overview(Log),
816+
ct:pal("o ~p", [O5]),
737817
ok.
738818

739819
last_index_reset_before_written(Config) ->

0 commit comments

Comments
 (0)