Skip to content

Commit ae8cbf2

Browse files
committed
first cut phase 1 compaction impl
1 parent d704338 commit ae8cbf2

11 files changed

+129
-914
lines changed

src/ra_log.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ init(#{uid := UId,
216216
%% to get the actual valua
217217
{ok, LiveIndexes} = ra_snapshot:indexes(
218218
ra_snapshot:current_snapshot_dir(SnapshotState)),
219-
ct:pal("log init live indexes ~p", [LiveIndexes]),
220219

221220
AccessPattern = maps:get(initial_access_pattern, Conf, sequential),
222221
{ok, Mt0} = ra_log_ets:mem_table_please(Names, UId),

src/ra_log_segment_writer.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ flush_mem_table_ranges({ServerUId, TidSeqs0},
273273
TidSeqs = lists:foldl(
274274
fun ({T, Seq0}, []) ->
275275
case ra_seq:floor(SmallestIdx, Seq0) of
276-
undefined ->
276+
[] ->
277277
[];
278278
Seq ->
279279
[{T, Seq}]
@@ -282,7 +282,7 @@ flush_mem_table_ranges({ServerUId, TidSeqs0},
282282
Start = ra_seq:first(PrevSeq),
283283
Seq1 = ra_seq:floor(SmallestIdx, Seq0),
284284
case ra_seq:limit(Start, Seq1) of
285-
undefined ->
285+
[] ->
286286
Acc;
287287
Seq ->
288288
[{T, Seq} | Acc]

src/ra_log_wal.erl

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -381,26 +381,29 @@ recover_wal(Dir, #conf{system = System,
381381
end || File <- Files0,
382382
filename:extension(File) == ".wal"],
383383
WalFiles = lists:sort(Files),
384-
AllWriters =
385-
[begin
386-
?DEBUG("WAL in ~ts: recovering ~ts, Mode ~s",
387-
[System, F, Mode]),
388-
Fd = open_at_first_record(filename:join(Dir, F)),
389-
{Time, #recovery{ranges = Ranges,
390-
writers = Writers}} =
391-
timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end),
392-
393-
ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, F),
394-
395-
close_existing(Fd),
396-
?DEBUG("WAL in ~ts: recovered ~ts time taken ~bms - recovered ~b writers",
397-
[System, F, Time div 1000, map_size(Writers)]),
398-
Writers
399-
end || F <- WalFiles],
400-
401-
FinalWriters = lists:foldl(fun (New, Acc) ->
402-
maps:merge(Acc, New)
403-
end, #{}, AllWriters),
384+
FinalWriters =
385+
lists:foldl(fun (F, Writers0) ->
386+
?DEBUG("WAL in ~ts: recovering ~ts, Mode ~s",
387+
[System, F, Mode]),
388+
Fd = open_at_first_record(filename:join(Dir, F)),
389+
{Time, #recovery{ranges = Ranges,
390+
writers = Writers}} =
391+
timer:tc(fun () ->
392+
recover_wal_chunks(Conf, Fd,
393+
Writers0, Mode)
394+
end),
395+
396+
ok = ra_log_segment_writer:accept_mem_tables(SegWriter,
397+
Ranges, F),
398+
close_existing(Fd),
399+
?DEBUG("WAL in ~ts: recovered ~ts time taken ~bms - recovered ~b writers",
400+
[System, F, Time div 1000, map_size(Writers)]),
401+
Writers
402+
end, #{}, WalFiles),
403+
404+
% FinalWriters = lists:foldl(fun (New, Acc) ->
405+
% maps:merge(Acc, New)
406+
% end, #{}, AllWriters),
404407

405408
?DEBUG("WAL in ~ts: final writers recovered ~b",
406409
[System, map_size(FinalWriters)]),
@@ -781,9 +784,10 @@ dump_records(<<_:1/unsigned, 1:1/unsigned, _:22/unsigned,
781784
dump_records(<<>>, Entries) ->
782785
Entries.
783786

784-
recover_wal_chunks(#conf{} = Conf, Fd, Mode) ->
787+
recover_wal_chunks(#conf{} = Conf, Fd, Writers, Mode) ->
785788
Chunk = read_wal_chunk(Fd, Conf#conf.recovery_chunk_size),
786-
recover_records(Conf, Fd, Chunk, #{}, #recovery{mode = Mode}).
789+
recover_records(Conf, Fd, Chunk, #{}, #recovery{mode = Mode,
790+
writers = Writers}).
787791
% All zeros indicates end of a pre-allocated wal file
788792
recover_records(_, _Fd, <<0:1/unsigned, 0:1/unsigned, 0:22/unsigned,
789793
IdDataLen:16/unsigned, _:IdDataLen/binary,
@@ -824,10 +828,11 @@ recover_records(#conf{names = Names} = Conf, Fd,
824828
% W ->
825829
% W#{UId => {in_seq, SmallestIdx}}
826830
% end,
827-
W = State0#recovery.writers,
828-
Writers = W#{UId => {in_seq, SmallestIdx - 1}},
831+
Writers = State0#recovery.writers,
832+
% Writers = W#{UId => {in_seq, SmallestIdx - 1}},
829833
recover_records(Conf, Fd, Rest, Cache,
830-
State0#recovery{writers = Writers});
834+
State0#recovery{writers =
835+
maps:remove(UId, Writers)});
831836
error ->
832837
System = Conf#conf.system,
833838
?DEBUG("WAL in ~ts: record failed CRC check. If this is the last record"
@@ -1004,7 +1009,16 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SmallestIdx,
10041009
{ok, M} = ra_log_ets:mem_table_please(Names, UId),
10051010
M
10061011
end,
1007-
case ra_mt:insert(Entry, Mt0) of
1012+
%% always use write_sparse as there is nothing to indicate in the wal
1013+
%% data if an entry was written as such. this way we recover all writes
1014+
%% so should be ok for all types of writes
1015+
PrevIdx = case Writers of
1016+
#{UId := {in_seq, I}} ->
1017+
I;
1018+
_ ->
1019+
undefined
1020+
end,
1021+
case ra_mt:insert_sparse(Entry, PrevIdx, Mt0) of
10081022
{ok, Mt1} ->
10091023
Ranges = update_ranges(Ranges0, UId, ra_mt:tid(Mt1),
10101024
SmallestIdx, [Idx]),
@@ -1014,7 +1028,8 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SmallestIdx,
10141028
{error, overwriting} ->
10151029
%% create successor memtable
10161030
{ok, Mt1} = ra_log_ets:new_mem_table_please(Names, UId, Mt0),
1017-
{retry, State#recovery{tables = Tables#{UId => Mt1}}}
1031+
{retry, State#recovery{tables = Tables#{UId => Mt1},
1032+
writers = maps:remove(UId, Writers)}}
10181033
end;
10191034
recover_entry(Names, UId, {Idx, Term, _}, SmallestIdx,
10201035
#recovery{mode = post_boot,
@@ -1049,6 +1064,7 @@ handle_trunc(false, _UId, _Idx, State) ->
10491064
State;
10501065
handle_trunc(true, UId, Idx, #recovery{mode = Mode,
10511066
ranges = Ranges0,
1067+
writers = Writers,
10521068
tables = Tbls} = State) ->
10531069
case Tbls of
10541070
#{UId := Mt0} when Mode == initial ->
@@ -1065,9 +1081,10 @@ handle_trunc(true, UId, Idx, #recovery{mode = Mode,
10651081
end,
10661082

10671083
State#recovery{tables = Tbls#{UId => Mt},
1084+
writers = maps:remove(UId, Writers),
10681085
ranges = Ranges};
10691086
_ ->
1070-
State
1087+
State#recovery{writers = maps:remove(UId, Writers)}
10711088
end.
10721089

10731090
named_cast(To, Msg) when is_pid(To) ->

src/ra_mt.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,13 @@ insert({Idx, _, _} = _Entry,
120120
end.
121121

122122
-spec insert_sparse(log_entry(), undefined | ra:index(), state()) ->
123-
{ok, state()} | {error, gap_detected | limit_reached}.
123+
{ok, state()} | {error, overwriting | gap_detected | limit_reached}.
124124
insert_sparse({Idx, _, _} = Entry, LastIdx,
125-
#?MODULE{tid = Tid,
126-
indexes = Seq} = State) ->
127-
case ra_seq:last(Seq) == LastIdx of
125+
#?MODULE{tid = Tid,
126+
indexes = Seq} = State) ->
127+
LastSeq = ra_seq:last(Seq),
128+
IsOverwriting = Idx =< LastSeq andalso is_integer(LastSeq),
129+
case LastSeq == LastIdx andalso not IsOverwriting of
128130
true ->
129131
case ra_seq:length(Seq) > ?MAX_MEMTBL_ENTRIES of
130132
true ->
@@ -134,7 +136,12 @@ insert_sparse({Idx, _, _} = Entry, LastIdx,
134136
{ok, State#?MODULE{indexes = ra_seq:append(Idx, Seq)}}
135137
end;
136138
false ->
137-
{error, gap_detected}
139+
case IsOverwriting of
140+
true ->
141+
{error, overwriting};
142+
false ->
143+
{error, gap_detected}
144+
end
138145
end.
139146

140147
-spec stage(log_entry(), state()) ->

src/ra_server.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,6 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15321532
?DEBUG("~ts: receiving snapshot chunk: ~b / ~w, index ~b, term ~b",
15331533
[LogId, Num, ChunkFlag, SnapIndex, SnapTerm]),
15341534
SnapState0 = ra_log:snapshot_state(Log0),
1535-
ct:pal("SnapStat0 ~p", [SnapState0]),
15361535
{ok, SnapState, Effs0} = ra_snapshot:accept_chunk(Data, Num, ChunkFlag,
15371536
SnapState0),
15381537
Reply = #install_snapshot_result{term = CurTerm,

test/ra_log_memory.erl

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
-include("src/ra.hrl").
4343

44+
-dialyzer({nowarn_function, install_snapshot/4}).
45+
4446
-type ra_log_memory_meta() :: #{atom() => term()}.
4547

4648
-record(state, {last_index = 0 :: ra_index(),
@@ -50,7 +52,7 @@
5052
meta = #{} :: ra_log_memory_meta(),
5153
snapshot :: option({ra_snapshot:meta(), term()})}).
5254

53-
-type ra_log_memory_state() :: #state{} | ra_log:state().
55+
-opaque ra_log_memory_state() :: #state{} | ra_log:state().
5456

5557
-export_type([ra_log_memory_state/0]).
5658

@@ -170,17 +172,18 @@ last_written(#state{last_written = LastWritten}) ->
170172

171173
-spec handle_event(ra_log:event_body(), ra_log_memory_state()) ->
172174
{ra_log_memory_state(), list()}.
173-
handle_event({written, Term, {_From, Idx} = Range0}, State0) ->
175+
handle_event({written, Term, Seq0}, State0) when is_list(Seq0) ->
176+
Idx = ra_seq:last(Seq0),
174177
case fetch_term(Idx, State0) of
175178
{Term, State} ->
176179
{State#state{last_written = {Idx, Term}}, []};
177180
_ ->
178-
case ra_range:limit(Idx, Range0) of
179-
undefined ->
181+
case ra_seq:limit(Idx - 1, Seq0) of
182+
[] ->
180183
% if the term doesn't match we just ignore it
181184
{State0, []};
182-
Range ->
183-
handle_event({written, Term, Range}, State0)
185+
Seq ->
186+
handle_event({written, Term, Seq}, State0)
184187
end
185188
end;
186189
handle_event(_Evt, State0) ->
@@ -210,6 +213,8 @@ fetch_term(Idx, #state{entries = Log} = State) ->
210213

211214
flush(_Idx, Log) -> Log.
212215

216+
-spec install_snapshot(ra_idxterm(), term(), module(), ra_log_memory_state()) ->
217+
{ra_snapshot:meta(), term(), ra_log_memory_state(), list()}.
213218
install_snapshot({Index, Term}, Data, _MacMod,
214219
#state{entries = Log0} = State0)
215220
when is_tuple(Data) ->

test/ra_log_meta_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ init_per_group(_, Config) ->
3838
Config.
3939

4040
end_per_group(_, Config) ->
41+
application:stop(ra),
4142
Config.
4243

4344
init_per_testcase(TestCase, Config) ->

0 commit comments

Comments
 (0)