Skip to content

Commit 41fa3e6

Browse files
committed
only flush live indexes in seg writer
1 parent b6c488a commit 41fa3e6

File tree

5 files changed

+96
-18
lines changed

5 files changed

+96
-18
lines changed

src/ra_kv.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
put/4,
2323
get/3,
24-
query_get/3
24+
query_get/3,
25+
take_snapshot/1
2526
]).
2627

2728

@@ -47,7 +48,9 @@
4748
command/0]).
4849

4950
%% mgmt
50-
-spec start_cluster(atom(), atom(), map()) ->
51+
-spec start_cluster(System :: atom(),
52+
ClusterName :: atom(),
53+
Config :: #{members := [ra_server_id()]}) ->
5154
{ok, [ra_server_id()], [ra_server_id()]} |
5255
{error, cluster_not_formed}.
5356
start_cluster(System, ClusterName, #{members := ServerIds})
@@ -134,6 +137,10 @@ query_get(ClusterName, Key, #?STATE{keys = Keys}) ->
134137
{error, not_found}
135138
end.
136139

140+
-spec take_snapshot(ra_server_id()) -> ok.
141+
take_snapshot(ServerId) ->
142+
ra:aux_command(ServerId, take_snapshot).
143+
137144
%% state machine
138145

139146
init(_) ->
@@ -154,6 +161,7 @@ live_indexes(#?STATE{keys = Keys}) ->
154161
[Idx | Acc]
155162
end, [], Keys).
156163

164+
157165
-record(aux, {}).
158166

159167
init_aux(_) ->

src/ra_log_segment_writer.erl

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,28 +264,41 @@ get_overview(#state{data_dir = Dir,
264264
flush_mem_table_ranges({ServerUId, TidSeqs0},
265265
#state{system = System} = State) ->
266266
SmallestIdx = smallest_live_idx(ServerUId),
267-
%% TidRanges arrive here sorted new -> old.
267+
LiveIndexes = live_indexes(ServerUId),
268+
LastLive = ra_seq:last(LiveIndexes),
269+
%% TidSeqs arrive here sorted new -> old.
268270

269-
%% truncate and limit all ranges to create a contiguous non-overlapping
271+
%% TODO: use live indexes from ra_log_snapshot_state table to only
272+
%% write live entries below the snapshot index
273+
274+
%% truncate and limit all seqa to create a contiguous non-overlapping
270275
%% list of tid ranges to flush to disk
271-
%% now TidRanges are sorted old -> new, i.e the correct order of
272-
%% processing
273276
TidSeqs = lists:foldl(
274277
fun ({T, Seq0}, []) ->
275278
case ra_seq:floor(SmallestIdx, Seq0) of
276279
[] ->
277280
[];
281+
Seq when LiveIndexes == []->
282+
[{T, Seq}];
278283
Seq ->
279-
[{T, Seq}]
284+
L = ra_seq:in_range(ra_seq:range(Seq),
285+
LiveIndexes),
286+
287+
[{T, ra_seq:add(ra_seq:floor(LastLive + 1, Seq), L)}]
280288
end;
281289
({T, Seq0}, [{_T, PrevSeq} | _] = Acc) ->
282290
Start = ra_seq:first(PrevSeq),
283291
Seq1 = ra_seq:floor(SmallestIdx, Seq0),
284292
case ra_seq:limit(Start, Seq1) of
285293
[] ->
286294
Acc;
295+
Seq when LiveIndexes == [] ->
296+
[{T, Seq} | Acc];
287297
Seq ->
288-
[{T, Seq} | Acc]
298+
L = ra_seq:in_range(ra_seq:range(Seq),
299+
LiveIndexes),
300+
[{T, ra_seq:add(ra_seq:floor(LastLive + 1, Seq), L)}
301+
| Acc]
289302
end
290303
end, [], TidSeqs0),
291304

@@ -357,6 +370,9 @@ start_index(ServerUId, StartIdx0) ->
357370
smallest_live_idx(ServerUId) ->
358371
ra_log_snapshot_state:smallest(ra_log_snapshot_state, ServerUId).
359372

373+
live_indexes(ServerUId) ->
374+
ra_log_snapshot_state:live_indexes(ra_log_snapshot_state, ServerUId).
375+
360376
send_segments(System, ServerUId, TidRanges, SegRefs) ->
361377
case ra_directory:pid_of(System, ServerUId) of
362378
undefined ->

src/ra_log_snapshot_state.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
insert/5,
55
delete/2,
66
smallest/2,
7+
live_indexes/2,
78
snapshot/2
89
]).
910

@@ -26,6 +27,11 @@ delete(Table, UId) ->
2627
smallest(Table, UId) when is_binary(UId) ->
2728
ets:lookup_element(Table, UId, 3, 0).
2829

30+
-spec live_indexes(ets:table(), ra:uid()) ->
31+
ra:index().
32+
live_indexes(Table, UId) when is_binary(UId) ->
33+
ets:lookup_element(Table, UId, 4, []).
34+
2935
-spec snapshot(ets:table(), ra:uid()) ->
3036
ra:index() | -1.
3137
snapshot(Table, UId) when is_binary(UId) ->

src/ra_seq.erl

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,19 @@ limit(CeilIdx, [{_, _} = T | Rem]) ->
8989
limit(_CeilIdx, Seq) ->
9090
Seq.
9191

92-
-spec add(state(), state()) -> state().
93-
add([], Seq2) ->
94-
Seq2;
95-
add(Seq1, Seq2) ->
96-
Fst = case lists:last(Seq1) of
97-
{I, _} -> I;
98-
I -> I
99-
end,
100-
fold(fun append/2, limit(Fst - 1, Seq2), Seq1).
92+
%% @doc adds two sequences together where To is
93+
%% the "lower" sequence
94+
-spec add(Add :: state(), To :: state()) -> state().
95+
add([], To) ->
96+
To;
97+
add(Add, []) ->
98+
Add;
99+
add(Add, To) ->
100+
Fst = first(Add),
101+
% {I, _} -> I;
102+
% I -> I
103+
% end,
104+
fold(fun append/2, limit(Fst - 1, To), Add).
101105

102106
-spec fold(fun ((ra:index(), Acc) -> Acc), Acc, state()) ->
103107
Acc when Acc :: term().
@@ -199,6 +203,8 @@ range(Seq) ->
199203
ra_range:new(first(Seq), last(Seq)).
200204

201205

206+
-spec in_range(ra:range(), state()) ->
207+
state().
202208
in_range(_Range, []) ->
203209
[];
204210
in_range(undefined, _) ->

test/ra_log_segment_writer_SUITE.erl

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ all_tests() ->
4141
my_segments,
4242
upgrade_segment_name_format,
4343
skip_entries_lower_than_snapshot_index,
44-
skip_all_entries_lower_than_snapshot_index
44+
skip_all_entries_lower_than_snapshot_index,
45+
live_indexes_1
4546
].
4647

4748
groups() ->
@@ -839,6 +840,47 @@ skip_all_entries_lower_than_snapshot_index(Config) ->
839840
ok = gen_server:stop(TblWriterPid),
840841
ok.
841842

843+
live_indexes_1(Config) ->
844+
Dir = ?config(wal_dir, Config),
845+
UId = ?config(uid, Config),
846+
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
847+
name => ?SEGWR,
848+
data_dir => Dir}),
849+
% first batch
850+
Entries = [{1, 42, a},
851+
{2, 42, b},
852+
{3, 43, c},
853+
{4, 43, d},
854+
{5, 43, e},
855+
{6, 43, f}
856+
],
857+
Mt = make_mem_table(UId, Entries),
858+
Ranges = #{UId => [{ra_mt:tid(Mt), [ra_mt:range(Mt)]}]},
859+
%% update snapshot state table
860+
ra_log_snapshot_state:insert(ra_log_snapshot_state, UId, 4, 2, [4, 2]),
861+
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
862+
make_wal(Config, "w1.wal")),
863+
receive
864+
{ra_log_event, {segments, _Tid, [{Fn, {2, 6}}]}} ->
865+
SegmentFile = filename:join(?config(server_dir, Config), Fn),
866+
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
867+
% assert only entries with a higher index than the snapshot
868+
% have been written
869+
ok = gen_server:stop(TblWriterPid),
870+
?assertExit({missing_key, 3}, read_sparse(Seg, [2, 3, 4])),
871+
[
872+
{2, _, _},
873+
{4, _, _},
874+
{5, _, _},
875+
{6, _, _}
876+
] = read_sparse(Seg, [2, 4, 5, 6])
877+
after 3000 ->
878+
flush(),
879+
ok = gen_server:stop(TblWriterPid),
880+
throw(ra_log_event_timeout)
881+
end,
882+
ok.
883+
842884
%%% Internal
843885

844886
fake_mem_table(UId, Dir, Entries) ->

0 commit comments

Comments
 (0)