Skip to content

Commit f81afc1

Browse files
committed
Fix multiple issues around interrupted snapshot replication
with sparse entries
1 parent c9b2ccd commit f81afc1

File tree

10 files changed

+257
-68
lines changed

10 files changed

+257
-68
lines changed

src/ra.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,10 @@ start_in(DataDir) ->
155155
ra_env:configure_logger(logger),
156156
LogFile = filename:join(DataDir, "ra.log"),
157157
SaslFile = filename:join(DataDir, "ra_sasl.log"),
158-
logger:set_primary_config(level, debug),
158+
logger:remove_handler(ra_handler),
159+
ok = logger:set_primary_config(level, debug),
159160
Config = #{config => #{file => LogFile}},
160-
logger:add_handler(ra_handler, logger_std_h, Config),
161+
ok = logger:add_handler(ra_handler, logger_std_h, Config),
161162
application:load(sasl),
162163
application:set_env(sasl, sasl_error_logger, {file, SaslFile}),
163164
application:stop(sasl),

src/ra_log.erl

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
append_sync/2,
2222
write_sync/2,
2323
fold/5,
24+
fold/6,
2425
sparse_read/2,
2526
partial_read/3,
2627
execute_read_plan/4,
@@ -491,7 +492,7 @@ write([{Idx, _, _} | _], #?MODULE{cfg = #cfg{uid = UId},
491492
[UId, Idx, Range])),
492493
{error, {integrity_error, Msg}}.
493494

494-
-spec write_sparse(log_entry(), ra:index(), state()) ->
495+
-spec write_sparse(log_entry(), option(ra:index()), state()) ->
495496
{ok, state()} | {error, wal_down | gap_detected}.
496497
write_sparse({Idx, Term, _} = Entry, PrevIdx0,
497498
#?MODULE{cfg = #cfg{uid = UId,
@@ -533,11 +534,18 @@ write_sparse({Idx, Term, _} = Entry, PrevIdx0,
533534
-spec fold(FromIdx :: ra_index(), ToIdx :: ra_index(),
534535
fun((log_entry(), Acc) -> Acc), Acc, state()) ->
535536
{Acc, state()} when Acc :: term().
537+
fold(From0, To0, Fun, Acc0, State) ->
538+
fold(From0, To0, Fun, Acc0, State, error).
539+
540+
-spec fold(FromIdx :: ra_index(), ToIdx :: ra_index(),
541+
fun((log_entry(), Acc) -> Acc), Acc, state(),
542+
MissingKeyStrategy :: error | return) ->
543+
{Acc, state()} when Acc :: term().
536544
fold(From0, To0, Fun, Acc0,
537545
#?MODULE{cfg = Cfg,
538546
mem_table = Mt,
539547
range = {StartIdx, EndIdx},
540-
reader = Reader0} = State)
548+
reader = Reader0} = State, MissingKeyStrat)
541549
when To0 >= From0 andalso
542550
To0 >= StartIdx ->
543551

@@ -550,22 +558,27 @@ fold(From0, To0, Fun, Acc0,
550558
case MtOverlap of
551559
{undefined, {RemStart, RemEnd}} ->
552560
{Reader, Acc} = ra_log_segments:fold(RemStart, RemEnd, Fun,
553-
Acc0, Reader0),
561+
Acc0, Reader0,
562+
MissingKeyStrat),
554563
{Acc, State#?MODULE{reader = Reader}};
555564
{{MtStart, MtEnd}, {RemStart, RemEnd}} ->
556565
{Reader, Acc1} = ra_log_segments:fold(RemStart, RemEnd, Fun,
557-
Acc0, Reader0),
558-
Acc = ra_mt:fold(MtStart, MtEnd, Fun, Acc1, Mt),
566+
Acc0, Reader0,
567+
MissingKeyStrat),
568+
Acc = ra_mt:fold(MtStart, MtEnd, Fun, Acc1, Mt, MissingKeyStrat),
559569
NumRead = MtEnd - MtStart + 1,
560570
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, NumRead),
561571
{Acc, State#?MODULE{reader = Reader}};
562572
{{MtStart, MtEnd}, undefined} ->
563-
Acc = ra_mt:fold(MtStart, MtEnd, Fun, Acc0, Mt),
573+
Acc = ra_mt:fold(MtStart, MtEnd, Fun, Acc0, Mt, MissingKeyStrat),
574+
%% TODO: if fold is short circuited with MissingKeyStrat == return
575+
%% this count isn't correct, it doesn't massively matter so leaving
576+
%% for now
564577
NumRead = MtEnd - MtStart + 1,
565578
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, NumRead),
566579
{Acc, State}
567580
end;
568-
fold(_From, _To, _Fun, Acc, State) ->
581+
fold(_From, _To, _Fun, Acc, State, _) ->
569582
{Acc, State}.
570583

571584
%% @doc Reads a list of indexes.

src/ra_log_segment.erl

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
append/4,
1313
sync/1,
1414
fold/6,
15+
fold/7,
1516
is_modified/1,
1617
read_sparse/4,
1718
read_sparse_no_checks/4,
@@ -300,7 +301,22 @@ fold(#state{cfg = #cfg{mode = read} = Cfg,
300301
cache = Cache,
301302
index = Index},
302303
FromIdx, ToIdx, Fun, AccFun, Acc) ->
303-
fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc).
304+
fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc,
305+
error).
306+
307+
-spec fold(state(),
308+
FromIdx :: ra_index(),
309+
ToIdx :: ra_index(),
310+
fun((binary()) -> term()),
311+
fun(({ra_index(), ra_term(), term()}, Acc) -> Acc), Acc,
312+
MissingKeyStrat :: error | return) ->
313+
Acc when Acc :: term().
314+
fold(#state{cfg = #cfg{mode = read} = Cfg,
315+
cache = Cache,
316+
index = Index},
317+
FromIdx, ToIdx, Fun, AccFun, Acc, MissingKeyStrat) ->
318+
fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc,
319+
MissingKeyStrat).
304320

305321
-spec is_modified(state()) -> boolean().
306322
is_modified(#state{cfg = #cfg{fd = Fd},
@@ -403,10 +419,10 @@ term_query(#state{index = Index}, Idx) ->
403419
_ -> undefined
404420
end.
405421

406-
fold0(_Cfg, _Cache, Idx, FinalIdx, _, _Fun, _AccFun, Acc)
422+
fold0(_Cfg, _Cache, Idx, FinalIdx, _, _Fun, _AccFun, Acc, _)
407423
when Idx > FinalIdx ->
408424
Acc;
409-
fold0(Cfg, Cache0, Idx, FinalIdx, Index, Fun, AccFun, Acc0) ->
425+
fold0(Cfg, Cache0, Idx, FinalIdx, Index, Fun, AccFun, Acc0, MissingKeyStrat) ->
410426
case Index of
411427
#{Idx := {Term, Offset, Length, Crc} = IdxRec} ->
412428
case pread(Cfg, Cache0, Offset, Length) of
@@ -415,7 +431,8 @@ fold0(Cfg, Cache0, Idx, FinalIdx, Index, Fun, AccFun, Acc0) ->
415431
case validate_checksum(Crc, Data) of
416432
true ->
417433
Acc = AccFun({Idx, Term, Fun(Data)}, Acc0),
418-
fold0(Cfg, Cache, Idx+1, FinalIdx, Index, Fun, AccFun, Acc);
434+
fold0(Cfg, Cache, Idx+1, FinalIdx,
435+
Index, Fun, AccFun, Acc, MissingKeyStrat);
419436
false ->
420437
%% CRC check failures are irrecoverable
421438
exit({ra_log_segment_crc_check_failure, Idx, IdxRec,
@@ -426,8 +443,10 @@ fold0(Cfg, Cache0, Idx, FinalIdx, Index, Fun, AccFun, Acc0) ->
426443
exit({ra_log_segment_unexpected_eof, Idx, IdxRec,
427444
Cfg#cfg.filename})
428445
end;
429-
_ ->
430-
exit({missing_key, Idx, Cfg#cfg.filename})
446+
_ when MissingKeyStrat == error ->
447+
exit({missing_key, Idx, Cfg#cfg.filename});
448+
_ when MissingKeyStrat == return ->
449+
Acc0
431450
end.
432451

433452
-spec range(state()) -> option({ra_index(), ra_index()}).

src/ra_log_segments.erl

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
range/1,
2222
num_open_segments/1,
2323
update_first_index/2,
24-
fold/5,
24+
fold/6,
2525
sparse_read/3,
2626
read_plan/2,
2727
exec_read_plan/6,
@@ -285,14 +285,15 @@ compaction_conf(#?STATE{cfg = #cfg{compaction_conf = Conf}}) ->
285285
num_open_segments(#?STATE{open_segments = Open}) ->
286286
ra_flru:size(Open).
287287

288-
-spec fold(ra_index(), ra_index(), fun(), term(), state()) ->
288+
-spec fold(ra_index(), ra_index(), fun(), term(), state(),
289+
MissingKeyStrategy :: error | return) ->
289290
{state(), term()}.
290291
fold(FromIdx, ToIdx, Fun, Acc,
291-
#?STATE{cfg = #cfg{} = Cfg} = State0)
292+
#?STATE{cfg = #cfg{} = Cfg} = State0, MissingKeyStrat)
292293
when ToIdx >= FromIdx ->
293294
ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, ToIdx - FromIdx + 1),
294-
segment_fold(State0, FromIdx, ToIdx, Fun, Acc);
295-
fold(_FromIdx, _ToIdx, _Fun, Acc, #?STATE{} = State) ->
295+
segment_fold(State0, FromIdx, ToIdx, Fun, Acc, MissingKeyStrat);
296+
fold(_FromIdx, _ToIdx, _Fun, Acc, #?STATE{} = State, _Strat) ->
296297
{State, Acc}.
297298

298299
-spec sparse_read(state(), [ra_index()], [log_entry()]) ->
@@ -453,15 +454,15 @@ segment_fold_plan(SegRefs, {_ReqStart, ReqEnd} = ReqRange, Acc) ->
453454
segment_fold(#?STATE{segment_refs = SegRefs,
454455
open_segments = OpenSegs,
455456
cfg = Cfg} = State,
456-
RStart, REnd, Fun, Acc) ->
457+
RStart, REnd, Fun, Acc, MissingKeyStrat) ->
457458
Plan = segment_fold_plan(SegRefs, {RStart, REnd}, []),
458459
{Op, A} =
459460
lists:foldl(
460461
fun ({Fn, {Start, End}}, {Open0, Ac0}) ->
461462
{Seg, Open} = get_segment(Cfg, Open0, Fn),
462463
{Open, ra_log_segment:fold(Seg, Start, End,
463464
fun binary_to_term/1,
464-
Fun, Ac0)}
465+
Fun, Ac0, MissingKeyStrat)}
465466
end, {OpenSegs, Acc}, Plan),
466467
{State#?MODULE{open_segments = Op}, A}.
467468

src/ra_mt.erl

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
%% @hidden
77
-module(ra_mt).
88

9-
-include_lib("stdlib/include/assert.hrl").
109
-include("ra.hrl").
1110

1211
-export([
@@ -22,6 +21,7 @@
2221
lookup_term/2,
2322
tid_for/3,
2423
fold/5,
24+
fold/6,
2525
get_items/2,
2626
record_flushed/3,
2727
set_first/2,
@@ -249,12 +249,6 @@ lookup_term(Idx, #?MODULE{tid = Tid,
249249
Term ->
250250
Term
251251
end.
252-
% when ?IN_RANGE(Idx, Seq) ->
253-
% ets:lookup_element(Tid, Idx, 2);
254-
% lookup_term(Idx, #?MODULE{prev = #?MODULE{} = Prev}) ->
255-
% lookup_term(Idx, Prev);
256-
% lookup_term(_Idx, _State) ->
257-
% undefined.
258252

259253
-spec tid_for(ra:index(), ra_term(), state()) ->
260254
undefined | ets:tid().
@@ -269,16 +263,28 @@ tid_for(Idx, Term, State) ->
269263
tid_for(Idx, Term, State#?MODULE.prev)
270264
end.
271265

266+
-spec fold(ra:index(), ra:index(),
267+
fun(), term(), state(), MissingKeyStrategy :: error | return) ->
268+
term().
269+
fold(From, To, Fun, Acc, State, MissingKeyStrat)
270+
when is_atom(MissingKeyStrat) andalso
271+
To >= From ->
272+
case lookup(From, State) of
273+
undefined when MissingKeyStrat == error ->
274+
error({missing_key, From, Acc});
275+
undefined when MissingKeyStrat == return ->
276+
Acc;
277+
E ->
278+
fold(From + 1, To, Fun, Fun(E, Acc),
279+
State, MissingKeyStrat)
280+
end;
281+
fold(_From, _To, _Fun, Acc, _State, _Strat) ->
282+
Acc.
283+
272284
-spec fold(ra:index(), ra:index(), fun(), term(), state()) ->
273285
term().
274-
fold(To, To, Fun, Acc, State) ->
275-
E = lookup(To, State),
276-
Fun(E, Acc);
277-
fold(From, To, Fun, Acc, State)
278-
when To > From ->
279-
E = lookup(From, State),
280-
?assert(E =/= undefined),
281-
fold(From + 1, To, Fun, Fun(E, Acc), State).
286+
fold(From, To, Fun, Acc, State) ->
287+
fold(From, To, Fun, Acc, State, error).
282288

283289
-spec get_items([ra:index()], state()) ->
284290
{[log_entry()],

src/ra_server.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,9 +452,11 @@ recover(#{cfg := #cfg{log_id = LogId,
452452
FromScan = CommitIndex + 1,
453453
{ToScan, _} = ra_log:last_index_term(Log0),
454454
?DEBUG("~ts: scanning for cluster changes ~b:~b ", [LogId, FromScan, ToScan]),
455+
%% if we're recovering after a partial sparse write phase this will fail
456+
%%
455457
{State, Log} = ra_log:fold(FromScan, ToScan,
456458
fun cluster_scan_fun/2,
457-
State1, Log0),
459+
State1, Log0, return),
458460

459461
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0),
460462
State#{log => Log,
@@ -1557,8 +1559,6 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15571559
{ok, L} = ra_log:write_sparse(E, LstIdx, L0),
15581560
{L, I}
15591561
end, {Log00, LastIdx}, ChunkOrEntries),
1560-
?DEBUG("~ts: receiving snapshot log last index ~p",
1561-
[LogId, ra_log:last_index_term(Log)]),
15621562
State = update_term(Term, State0#{log => Log}),
15631563
{receive_snapshot, State, [{reply, Reply}]};
15641564
next ->

0 commit comments

Comments
 (0)