Skip to content

Commit 9ca16bb

Browse files
committed
Extend ra_log_snapshot_state to support live indexes.
Instead of working off the snapshot index to reduce writes to WAL and segments there is a new "smallest index" that needs to be populated. For normal snapshotting machines this will be snapshot index + 1 but when a machine reportst live indexes it will be the smallest of those.
1 parent a2364fc commit 9ca16bb

File tree

10 files changed

+150
-80
lines changed

10 files changed

+150
-80
lines changed

src/ra_directory.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,10 @@ overview(System) when is_atom(System) ->
189189
end,
190190
ets:tab2list(ra_state)),
191191
States = maps:from_list(Rows),
192-
Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)),
192+
Snaps = lists:foldl(
193+
fun (T, Acc) ->
194+
Acc#{element(1, T) => erlang:delete_element(1, T)}
195+
end, #{}, ets:tab2list(ra_log_snapshot_state)),
193196
lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) ->
194197
{S, V} = maps:get(ServerName, States, {undefined, undefined}),
195198
Acc#{ServerName =>

src/ra_log.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,7 @@ delete_everything(#?MODULE{cfg = #cfg{uid = UId,
10981098
%% if there is a snapshot process pending it could cause the directory
10991099
%% deletion to fail, best kill the snapshot process first
11001100
ok = ra_log_ets:delete_mem_tables(Names, UId),
1101-
catch ets:delete(ra_log_snapshot_state, UId),
1101+
catch ra_log_snapshot_state:delete(ra_log_snapshot_state, UId),
11021102
try ra_lib:recursive_delete(Dir) of
11031103
ok -> ok
11041104
catch

src/ra_log_segment_writer.erl

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ get_overview(#state{data_dir = Dir,
263263

264264
flush_mem_table_ranges({ServerUId, TidRanges0},
265265
#state{system = System} = State) ->
266-
SnapIdx = snap_idx(ServerUId),
266+
SmallestIdx = smallest_live_idx(ServerUId),
267267
%% TidRanges arrive here sorted new -> old.
268268

269269
%% truncate and limit all ranges to create a contiguous non-overlapping
@@ -272,14 +272,14 @@ flush_mem_table_ranges({ServerUId, TidRanges0},
272272
%% processing
273273
TidRanges = lists:foldl(
274274
fun ({T, Range0}, []) ->
275-
case ra_range:truncate(SnapIdx, Range0) of
275+
case ra_range:truncate(SmallestIdx - 1, Range0) of
276276
undefined ->
277277
[];
278278
Range ->
279279
[{T, Range}]
280280
end;
281281
({T, Range0}, [{_T, {Start, _}} | _] = Acc) ->
282-
Range1 = ra_range:truncate(SnapIdx, Range0),
282+
Range1 = ra_range:truncate(SmallestIdx - 1, Range0),
283283
case ra_range:limit(Start, Range1) of
284284
undefined ->
285285
Acc;
@@ -352,15 +352,10 @@ flush_mem_table_range(ServerUId, {Tid, {StartIdx0, EndIdx}},
352352
end.
353353

354354
start_index(ServerUId, StartIdx0) ->
355-
max(snap_idx(ServerUId) + 1, StartIdx0).
355+
max(smallest_live_idx(ServerUId), StartIdx0).
356356

357-
snap_idx(ServerUId) ->
358-
case ets:lookup(ra_log_snapshot_state, ServerUId) of
359-
[{_, SnapIdx}] ->
360-
SnapIdx;
361-
_ ->
362-
-1
363-
end.
357+
smallest_live_idx(ServerUId) ->
358+
ra_log_snapshot_state:smallest(ra_log_snapshot_state, ServerUId).
364359

365360
send_segments(System, ServerUId, TidRanges, SegRefs) ->
366361
case ra_directory:pid_of(System, ServerUId) of

src/ra_log_snapshot_state.erl

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
-module(ra_log_snapshot_state).
2+
3+
-export([
4+
insert/5,
5+
delete/2,
6+
smallest/2,
7+
snapshot/2
8+
]).
9+
10+
-spec insert(ets:table(), ra:uid(), -1 | ra:index(), ra:index(), ra_seq:state()) ->
11+
ok.
12+
insert(Table, UId, SnapIdx, SmallestIdx, LiveIndexes)
13+
when is_binary(UId) andalso
14+
is_integer(SnapIdx) andalso
15+
is_integer(SmallestIdx) andalso
16+
is_list(LiveIndexes) ->
17+
true = ets:insert(Table, {UId, SnapIdx, SmallestIdx, LiveIndexes}),
18+
ok.
19+
20+
delete(Table, UId) ->
21+
true = ets:delete(Table, UId),
22+
ok.
23+
24+
-spec smallest(ets:table(), ra:uid()) ->
25+
ra:index().
26+
smallest(Table, UId) when is_binary(UId) ->
27+
ets:lookup_element(Table, UId, 3, 0).
28+
29+
-spec snapshot(ets:table(), ra:uid()) ->
30+
ra:index() | -1.
31+
snapshot(Table, UId) when is_binary(UId) ->
32+
ets:lookup_element(Table, UId, 2, -1).
33+
34+
%%% ===================
35+
%%% Internal unit tests
36+
%%% ===================
37+
38+
-ifdef(TEST).
39+
-include_lib("eunit/include/eunit.hrl").
40+
41+
basics_test() ->
42+
43+
UId = atom_to_binary(?FUNCTION_NAME, utf8),
44+
T = ets:new(?FUNCTION_NAME, [set]),
45+
ok = insert(T, UId, 50, 51, []),
46+
?assertEqual(51, smallest(T, UId)),
47+
?assertEqual(50, snapshot(T, UId)),
48+
ok.
49+
50+
-endif.

src/ra_log_wal.erl

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
% tables and segment notification
5555
-type writer_id() :: {ra_uid(), pid()}.
5656

57-
-record(batch_writer, {snap_idx :: ra_index(),
57+
-record(batch_writer, {smallest_live_idx :: ra_index(),
5858
tid :: ets:tid(),
5959
uid :: term(),
6060
range :: ra:range(),
@@ -140,7 +140,7 @@
140140
-export_type([wal_conf/0]).
141141

142142
-type wal_command() ::
143-
{append | truncate, writer_id(), ra_index(), ra_term(), term()}.
143+
{append, writer_id(), ra_index(), ra_term(), term()}.
144144

145145
-type wal_op() :: {cast, wal_command()} |
146146
{call, from(), wal_command()}.
@@ -428,7 +428,7 @@ serialize_header(UId, Trunc, {Next, Cache} = WriterCache) ->
428428
{Next + 1, Cache#{UId => BinId}}}
429429
end.
430430

431-
write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SnapIdx,
431+
write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SmallestIndex,
432432
#state{conf = #conf{counter = Counter,
433433
compute_checksums = ComputeChecksum} = _Cfg,
434434
batch = Batch0,
@@ -441,7 +441,7 @@ write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SnapIdx,
441441
case should_roll_wal(State0) of
442442
true ->
443443
State = complete_batch_and_roll(State0),
444-
write_data(Id, MtTid, Idx, Term, Data0, Trunc, SnapIdx, State);
444+
write_data(Id, MtTid, Idx, Term, Data0, Trunc, SmallestIndex, State);
445445
false ->
446446
EntryData = case Data0 of
447447
{ttb, Bin} ->
@@ -466,7 +466,7 @@ write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SnapIdx,
466466
<<Checksum:32/integer, EntryDataLen:32/unsigned>> |
467467
Entry],
468468
Batch = incr_batch(Batch0, UId, Pid, MtTid,
469-
Idx, Term, Record, SnapIdx),
469+
Idx, Term, Record, SmallestIndex),
470470
counters:add(Counter, ?C_BYTES_WRITTEN, DataSize),
471471
State0#state{batch = Batch,
472472
wal = Wal#wal{writer_name_cache = Cache,
@@ -479,19 +479,22 @@ write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SnapIdx,
479479
handle_msg({append, {UId, Pid} = Id, MtTid, Idx, Term, Entry},
480480
#state{conf = Conf,
481481
writers = Writers} = State0) ->
482-
SnapIdx = snap_idx(Conf, UId),
482+
SmallestIdx = smallest_live_index(Conf, UId),
483483
%% detect if truncating flag should be set
484-
Trunc = Idx == SnapIdx + 1,
484+
Trunc = Idx == SmallestIdx,
485+
485486
case maps:find(UId, Writers) of
486-
_ when Idx =< SnapIdx ->
487-
%% a snapshot already exists that is higher - just drop the write
488-
State0#state{writers = Writers#{UId => {in_seq, SnapIdx}}};
487+
_ when Idx < SmallestIdx ->
488+
%% the smallest live index for the last snapshot is higher than
489+
%% this index, just drop it
490+
PrevIdx = SmallestIdx - 1,
491+
State0#state{writers = Writers#{UId => {in_seq, PrevIdx}}};
489492
{ok, {_, PrevIdx}}
490493
when Idx =< PrevIdx + 1 orelse
491494
Trunc ->
492-
write_data(Id, MtTid, Idx, Term, Entry, Trunc, SnapIdx, State0);
495+
write_data(Id, MtTid, Idx, Term, Entry, Trunc, SmallestIdx, State0);
493496
error ->
494-
write_data(Id, MtTid, Idx, Term, Entry, false, SnapIdx, State0);
497+
write_data(Id, MtTid, Idx, Term, Entry, false, SmallestIdx, State0);
495498
{ok, {out_of_seq, _}} ->
496499
% writer is out of seq simply ignore drop the write
497500
% TODO: capture metric for dropped writes?
@@ -505,10 +508,6 @@ handle_msg({append, {UId, Pid} = Id, MtTid, Idx, Term, Entry},
505508
Pid ! {ra_log_event, {resend_write, PrevIdx + 1}},
506509
State0#state{writers = Writers#{UId => {out_of_seq, PrevIdx}}}
507510
end;
508-
handle_msg({truncate, Id, MtTid, Idx, Term, Entry},
509-
#state{conf = Conf} = State0) ->
510-
SnapIdx = snap_idx(Conf, Id),
511-
write_data(Id, MtTid, Idx, Term, Entry, true, SnapIdx, State0);
512511
handle_msg({query, Fun}, State) ->
513512
%% for testing
514513
_ = catch Fun(State),
@@ -520,23 +519,24 @@ incr_batch(#batch{num_writes = Writes,
520519
waiting = Waiting0,
521520
pending = Pend} = Batch,
522521
UId, Pid, MT_TID = MtTid,
523-
Idx, TERM = Term, Data, SnapIdx) ->
522+
Idx, TERM = Term, Data, SmallestIdx) ->
524523
Waiting = case Waiting0 of
525524
#{Pid := #batch_writer{term = TERM,
526525
tid = MT_TID,
527526
range = Range0
528527
} = W} ->
529528
%% The Tid and term is the same so add to current batch_writer
530-
Range = ra_range:extend(Idx, ra_range:truncate(SnapIdx, Range0)),
529+
Range = ra_range:extend(Idx, ra_range:truncate(SmallestIdx - 1,
530+
Range0)),
531531
Waiting0#{Pid => W#batch_writer{range = Range,
532-
snap_idx = SnapIdx,
532+
smallest_live_idx = SmallestIdx,
533533
term = Term
534534
}};
535535
_ ->
536536
%% The tid is different, open a new batch writer for the
537537
%% new tid and term
538538
PrevBatchWriter = maps:get(Pid, Waiting0, undefined),
539-
Writer = #batch_writer{snap_idx = SnapIdx,
539+
Writer = #batch_writer{smallest_live_idx = SmallestIdx,
540540
tid = MtTid,
541541
range = ra_range:new(Idx),
542542
uid = UId,
@@ -675,14 +675,14 @@ complete_batch(#state{batch = #batch{waiting = Waiting,
675675
end, Wal#wal.ranges, Waiting),
676676
State#state{wal = Wal#wal{ranges = Ranges}}.
677677

678-
complete_batch_writer(Pid, #batch_writer{snap_idx = SnapIdx,
678+
complete_batch_writer(Pid, #batch_writer{smallest_live_idx = SmallestIdx,
679679
tid = MtTid,
680680
uid = UId,
681681
range = Range,
682682
term = Term,
683683
old = undefined}, Ranges) ->
684684
Pid ! {ra_log_event, {written, Term, Range}},
685-
update_ranges(Ranges, UId, MtTid, SnapIdx, Range);
685+
update_ranges(Ranges, UId, MtTid, SmallestIdx, Range);
686686
complete_batch_writer(Pid, #batch_writer{old = #batch_writer{} = OldBw} = Bw,
687687
Ranges0) ->
688688
Ranges = complete_batch_writer(Pid, OldBw, Ranges0),
@@ -778,13 +778,13 @@ recover_records(#conf{names = Names} = Conf, Fd,
778778
case ra_directory:is_registered_uid(Names, UId) of
779779
true ->
780780
Cache = Cache0#{IdRef => {UId, <<1:1/unsigned, IdRef:22/unsigned>>}},
781-
SnapIdx = recover_snap_idx(Conf, UId, Trunc == 1, Idx),
781+
SmallestIdx = recover_smallest_idx(Conf, UId, Trunc == 1, Idx),
782782
case validate_checksum(Checksum, Idx, Term, EntryData) of
783-
ok when Idx > SnapIdx ->
783+
ok when Idx >= SmallestIdx ->
784784
State1 = handle_trunc(Trunc == 1, UId, Idx, State0),
785785
case recover_entry(Names, UId,
786786
{Idx, Term, binary_to_term(EntryData)},
787-
SnapIdx, State1) of
787+
SmallestIdx, State1) of
788788
{ok, State} ->
789789
recover_records(Conf, Fd, Rest, Cache, State);
790790
{retry, State} ->
@@ -793,12 +793,14 @@ recover_records(#conf{names = Names} = Conf, Fd,
793793
ok ->
794794
%% best the the snapshot index as the last
795795
%% writer index
796-
Writers = case State0#recovery.writers of
797-
#{UId := {in_seq, SnapIdx}} = W ->
798-
W;
799-
W ->
800-
W#{UId => {in_seq, SnapIdx}}
801-
end,
796+
% Writers = case State0#recovery.writers of
797+
% #{UId := {in_seq, SmallestIdx}} = W ->
798+
% W;
799+
% W ->
800+
% W#{UId => {in_seq, SmallestIdx}}
801+
% end,
802+
W = State0#recovery.writers,
803+
Writers = W#{UId => {in_seq, SmallestIdx - 1}},
802804
recover_records(Conf, Fd, Rest, Cache,
803805
State0#recovery{writers = Writers});
804806
error ->
@@ -823,13 +825,13 @@ recover_records(#conf{names = Names} = Conf, Fd,
823825
Cache, State0) ->
824826
case Cache of
825827
#{IdRef := {UId, _}} ->
826-
SnapIdx = recover_snap_idx(Conf, UId, Trunc == 1, Idx),
828+
SmallestIdx = recover_smallest_idx(Conf, UId, Trunc == 1, Idx),
827829
case validate_checksum(Checksum, Idx, Term, EntryData) of
828-
ok when Idx > SnapIdx ->
830+
ok when Idx >= SmallestIdx ->
829831
State1 = handle_trunc(Trunc == 1, UId, Idx, State0),
830832
case recover_entry(Names, UId,
831833
{Idx, Term, binary_to_term(EntryData)},
832-
SnapIdx, State1) of
834+
SmallestIdx, State1) of
833835
{ok, State} ->
834836
recover_records(Conf, Fd, Rest, Cache, State);
835837
{retry, State} ->
@@ -864,12 +866,12 @@ recover_records(Conf, Fd, Chunk, Cache, State) ->
864866
recover_records(Conf, Fd, Chunk0, Cache, State)
865867
end.
866868

867-
recover_snap_idx(Conf, UId, Trunc, CurIdx) ->
869+
recover_smallest_idx(Conf, UId, Trunc, CurIdx) ->
868870
case Trunc of
869871
true ->
870-
max(CurIdx-1, snap_idx(Conf, UId));
872+
max(CurIdx, smallest_live_index(Conf, UId));
871873
false ->
872-
snap_idx(Conf, UId)
874+
smallest_live_index(Conf, UId)
873875
end.
874876

875877
is_last_record(_Fd, <<0:104, _/binary>>, _) ->
@@ -946,29 +948,29 @@ should_roll_wal(#state{conf = #conf{max_entries = MaxEntries},
946948
Count + 1 > MaxEntries
947949
end.
948950

949-
snap_idx(#conf{ra_log_snapshot_state_tid = Tid}, ServerUId) ->
950-
ets:lookup_element(Tid, ServerUId, 2, -1).
951+
smallest_live_index(#conf{ra_log_snapshot_state_tid = Tid}, ServerUId) ->
952+
ra_log_snapshot_state:smallest(Tid, ServerUId).
951953

952-
update_ranges(Ranges, UId, MtTid, SnapIdx, {Start, _} = AddRange) ->
954+
update_ranges(Ranges, UId, MtTid, SmallestIdx, {Start, _} = AddRange) ->
953955
case Ranges of
954956
#{UId := [{MtTid, Range0} | Rem]} ->
955-
%% SnapIdx might have moved to we truncate the old range first
957+
%% SmallestIdx might have moved to we truncate the old range first
956958
%% before extending
957-
Range1 = ra_range:truncate(SnapIdx, Range0),
959+
Range1 = ra_range:truncate(SmallestIdx - 1, Range0),
958960
%% limit the old range by the add end start as in some resend
959961
%% cases we may have got back before the prior range.
960962
Range = ra_range:add(AddRange, ra_range:limit(Start, Range1)),
961963
Ranges#{UId => [{MtTid, Range} | Rem]};
962964
#{UId := [{OldMtTid, OldMtRange} | Rem]} ->
963965
%% new Tid, need to add a new range record for this
964966
Ranges#{UId => [{MtTid, AddRange},
965-
ra_range:truncate(SnapIdx, {OldMtTid, OldMtRange})
967+
ra_range:truncate(SmallestIdx - 1, {OldMtTid, OldMtRange})
966968
| Rem]};
967969
_ ->
968970
Ranges#{UId => [{MtTid, AddRange}]}
969971
end.
970972

971-
recover_entry(Names, UId, {Idx, _, _} = Entry, SnapIdx,
973+
recover_entry(Names, UId, {Idx, _, _} = Entry, SmallestIdx,
972974
#recovery{mode = initial,
973975
ranges = Ranges0,
974976
writers = Writers,
@@ -982,7 +984,7 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SnapIdx,
982984
case ra_mt:insert(Entry, Mt0) of
983985
{ok, Mt1} ->
984986
Ranges = update_ranges(Ranges0, UId, ra_mt:tid(Mt1),
985-
SnapIdx, ra_range:new(Idx)),
987+
SmallestIdx, ra_range:new(Idx)),
986988
{ok, State#recovery{ranges = Ranges,
987989
writers = Writers#{UId => {in_seq, Idx}},
988990
tables = Tables#{UId => Mt1}}};
@@ -991,7 +993,7 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SnapIdx,
991993
{ok, Mt1} = ra_log_ets:new_mem_table_please(Names, UId, Mt0),
992994
{retry, State#recovery{tables = Tables#{UId => Mt1}}}
993995
end;
994-
recover_entry(Names, UId, {Idx, Term, _}, SnapIdx,
996+
recover_entry(Names, UId, {Idx, Term, _}, SmallestIdx,
995997
#recovery{mode = post_boot,
996998
ranges = Ranges0,
997999
writers = Writers,
@@ -1014,7 +1016,7 @@ recover_entry(Names, UId, {Idx, Term, _}, SnapIdx,
10141016
tables = Tables#{UId => Mt0}}};
10151017
Tid ->
10161018
Ranges = update_ranges(Ranges0, UId, Tid,
1017-
SnapIdx, ra_range:new(Idx)),
1019+
SmallestIdx, ra_range:new(Idx)),
10181020
{ok, State#recovery{ranges = Ranges,
10191021
writers = Writers#{UId => {in_seq, Idx}},
10201022
tables = Tables#{UId => Mt0}}}

src/ra_seq.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-module(ra_seq).
2+
3+
%% open type
4+
-type state() :: [ra:index() | ra:range()].
5+
6+
-export_type([state/0]).
7+
8+
9+

0 commit comments

Comments
 (0)