Skip to content

Commit fb457b1

Browse files
committed
Fix ra_log:commit/1 to roll back mt changes.
Else subsequent writes would crash the entire process.
1 parent 08ac5d1 commit fb457b1

File tree

8 files changed

+166
-19
lines changed

8 files changed

+166
-19
lines changed

src/ra_log.erl

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -388,9 +388,12 @@ commit_tx(#?MODULE{cfg = #cfg{uid = UId,
388388
wal = Wal} = Cfg,
389389
tx = {true, TxRange},
390390
range = Range,
391+
pending = Pend0,
391392
mem_table = Mt1} = State) ->
392-
{Entries, Mt} = ra_mt:commit(Mt1),
393-
Tid = ra_mt:tid(Mt),
393+
%% TODO: staged could contain entries from previous? I don't think that is
394+
%% ever the case as that would mean overwriting withing a single append batch
395+
Entries = ra_mt:staged(Mt1),
396+
Tid = ra_mt:tid(Mt1),
394397
WriterId = {UId, self()},
395398
PrevIdx = previous_wal_index(State),
396399
{WalCommands, Num, _} =
@@ -403,15 +406,21 @@ commit_tx(#?MODULE{cfg = #cfg{uid = UId,
403406

404407
case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of
405408
{ok, Pid} ->
409+
%% commit after send to WAL, else abort
410+
{_, Mt} = ra_mt:commit(Mt1),
406411
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),
407412
{ok, State#?MODULE{tx = false,
408413
range = ra_range:add(TxRange, Range),
409414
last_wal_write = {Pid, now_ms(), LastIdx},
410415
mem_table = Mt}};
411416
{error, wal_down} ->
417+
{Idx, _, _} = hd(Entries),
418+
Mt = ra_mt:abort(Mt1),
412419
%% TODO: review this - still need to return the state here
413-
{error, wal_down, State#?MODULE{tx = false,
414-
mem_table = Mt}}
420+
{error, wal_down,
421+
State#?MODULE{tx = false,
422+
pending = ra_seq:limit(Idx - 1, Pend0),
423+
mem_table = Mt}}
415424
end;
416425
commit_tx(#?MODULE{tx = false} = State) ->
417426
State.
@@ -815,7 +824,7 @@ handle_event({written, Term, WrittenSeq},
815824
{State#?MODULE{last_written_index_term = {LastWrittenIdx, Term},
816825
pending = Pend}, []};
817826
{error, not_prefix} ->
818-
?DEBUG("~ts: ~p not prefix of ~p",
827+
?DEBUG("~ts: ~w not prefix of ~w",
819828
[Cfg#cfg.log_id, WrittenSeq, Pend0]),
820829
{resend_pending(State0), []}
821830
end;

src/ra_log_wal.erl

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -543,18 +543,28 @@ incr_batch(#batch{num_writes = Writes,
543543
#{Pid := #batch_writer{term = TERM,
544544
tid = MT_TID,
545545
seq = Seq0} = W} ->
546-
%% The Tid and term is the same so add to current batch_writer
547-
Seq = ra_seq:append(Idx, Seq0),
546+
%% The Tid and term is the same so add to
547+
%% current batch_writer
548+
Seq = case Idx > ra_seq:last(Seq0) of
549+
true ->
550+
ra_seq:append(Idx, Seq0);
551+
false ->
552+
%% this is a rewrite / resend
553+
%% we need to limit the seq before
554+
%% appending
555+
ra_seq:append(Idx, ra_seq:limit(Idx - 1,
556+
Seq0))
557+
end,
548558
Waiting0#{Pid => W#batch_writer{seq = Seq,
549559
smallest_live_idx = SmallestLiveIdx,
550560
term = Term}};
551561
_ ->
552-
%% The tid is different, open a new batch writer for the
553-
%% new tid and term
562+
%% The tid or term is different
563+
%% open a new batch writer for the new tid and term
554564
PrevBatchWriter = maps:get(Pid, Waiting0, undefined),
555565
Writer = #batch_writer{smallest_live_idx = SmallestLiveIdx,
556566
tid = MtTid,
557-
seq = [Idx],
567+
seq = ra_seq:append(Idx, []),
558568
uid = UId,
559569
term = Term,
560570
old = PrevBatchWriter},
@@ -968,9 +978,9 @@ should_roll_wal(#state{conf = #conf{max_entries = MaxEntries},
968978
smallest_live_index(#conf{ra_log_snapshot_state_tid = Tid}, ServerUId) ->
969979
ra_log_snapshot_state:smallest(Tid, ServerUId).
970980

971-
update_ranges(Ranges, UId, MtTid, _SmallestIdx, AddSeq) ->
981+
update_ranges(Ranges, UId, MtTid = MT_TID, _SmallestIdx, AddSeq) ->
972982
case Ranges of
973-
#{UId := [{MtTid, Seq0} | Seqs]} ->
983+
#{UId := [{MT_TID, Seq0} | Seqs]} ->
974984
%% limit the old range by the add end start as in some resend
975985
%% cases we may have got back before the prior range.
976986
Seq = ra_seq:add(AddSeq, Seq0),

src/ra_mt.erl

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
insert_sparse/3,
1717
stage/2,
1818
commit/1,
19-
% abort/1,
19+
abort/1,
2020
lookup/2,
2121
lookup_term/2,
2222
tid_for/3,
@@ -182,6 +182,19 @@ stage({Idx, _, _} = _Entry,
182182
exit({unexpected_sparse_stage, Idx, Seq})
183183
end.
184184

185+
-spec staged(state()) -> [log_entry()].
186+
staged(#?MODULE{staged = undefined}) ->
187+
[];
188+
staged(#?MODULE{staged = {_, Staged0},
189+
prev = Prev0}) ->
190+
PrevStaged = case Prev0 of
191+
undefined ->
192+
[];
193+
_ ->
194+
staged(Prev0)
195+
end,
196+
PrevStaged ++ lists:reverse(Staged0).
197+
185198
-spec commit(state()) -> {[log_entry()], state()}.
186199
commit(#?MODULE{staged = undefined} = State) ->
187200
{[], State};
@@ -200,6 +213,16 @@ commit(#?MODULE{tid = Tid,
200213
{PrevStaged ++ Staged, State#?MODULE{staged = undefined,
201214
prev = Prev}}.
202215

216+
-spec abort(state()) -> state().
217+
abort(#?MODULE{staged = undefined} = State) ->
218+
State;
219+
abort(#?MODULE{indexes = Seq,
220+
staged = {_, Staged0}} = State) ->
221+
{Idx, _, _} = lists:last(Staged0),
222+
State#?MODULE{staged = undefined,
223+
indexes = ra_seq:limit(Idx - 1, Seq)}.
224+
225+
203226
-spec lookup(ra:index(), state()) ->
204227
log_entry() | undefined.
205228
lookup(Idx, #?MODULE{staged = {FstStagedIdx, Staged}})
@@ -375,11 +398,11 @@ range(_State) ->
375398
tid(#?MODULE{tid = Tid}) ->
376399
Tid.
377400

378-
-spec staged(state()) -> [log_entry()].
379-
staged(#?MODULE{staged = {_, Staged}}) ->
380-
Staged;
381-
staged(#?MODULE{staged = undefined}) ->
382-
[].
401+
% -spec staged(state()) -> [log_entry()].
402+
% staged(#?MODULE{staged = {_, Staged}}) ->
403+
% Staged;
404+
% staged(#?MODULE{staged = undefined}) ->
405+
% [].
383406

384407
-spec is_active(ets:tid(), state()) -> boolean().
385408
is_active(Tid, State) ->

src/ra_seq.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ limit(_CeilIdxIncl, Seq) ->
9393

9494
%% @doc adds two sequences together where To is
9595
%% the "lower" sequence
96+
%% TODO: optimise to avoid the fold which could be expensive
97+
%% for very large sequences containing large ranges
9698
-spec add(Add :: state(), To :: state()) -> state().
9799
add([], To) ->
98100
To;

src/ra_server.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1709,6 +1709,9 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
17091709
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapIndex),
17101710
%% it was the last snapshot chunk so we can revert back to
17111711
%% follower status
1712+
%% TODO: consider waiting until all pending WAL writes (sparse)
1713+
%% have been confirmed written by the WAL or segment writer until
1714+
%% returning to follower state
17121715
{follower, persist_last_applied(State), [{reply, Reply} |
17131716
Effs0 ++ Effs ++
17141717
SnapInstalledEffs]}

test/ra_log_2_SUITE.erl

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ all_tests() ->
2222
resend_write_lost_in_wal_crash,
2323
resend_after_written_event_lost_in_wal_crash,
2424
resend_write_after_tick,
25+
snapshot_before_written,
2526
handle_overwrite,
2627
handle_overwrite_append,
2728
receive_segment,
@@ -52,7 +53,7 @@ all_tests() ->
5253
wal_down_read_availability,
5354
wal_down_append_throws,
5455
wal_down_write_returns_error_wal_down,
55-
56+
wal_down_stage,
5657
detect_lost_written_range,
5758
snapshot_installation,
5859
snapshot_written_after_installation,
@@ -141,6 +142,39 @@ end_per_testcase(_, Config) ->
141142
-define(N2, {n2, node()}).
142143
% -define(N3, {n3, node()}).
143144

145+
snapshot_before_written(Config) ->
146+
%% snapshot_written comes in before written event for lower entries.
147+
148+
Log0 = write_n(1, 6, 1,
149+
ra_log_init(Config, #{min_snapshot_interval => 0})),
150+
receive
151+
{ra_log_event, {written, 1, _}} -> ok
152+
after 2000 ->
153+
exit(written_timeout)
154+
end,
155+
%% write 10 entries
156+
Log1 = write_n(6, 20, 1, Log0),
157+
SnapIdx = 10,
158+
%% do snapshot in
159+
{Log2, Effs} = ra_log:update_release_cursor(SnapIdx, #{}, macctx(),
160+
[10, 4], Log1),
161+
run_effs(Effs),
162+
{Log3, Effs3} = receive
163+
{ra_log_event, {snapshot_written, {10, 1}, _,
164+
snapshot, _} = Evt} ->
165+
ra_log:handle_event(Evt, Log2)
166+
after 5000 ->
167+
flush(),
168+
exit(snapshot_written_timeout)
169+
end,
170+
run_effs(Effs3),
171+
172+
_Log4 = assert_log_events(Log3,
173+
fun (L) ->
174+
{19, 1} == ra_log:last_written(L)
175+
end),
176+
ok.
177+
144178
handle_overwrite(Config) ->
145179
Log0 = ra_log_init(Config),
146180
{ok, Log1} = ra_log:write([{1, 1, "value"},
@@ -154,9 +188,11 @@ handle_overwrite(Config) ->
154188
% ensure immediate truncation
155189
{1, 2} = ra_log:last_index_term(Log3),
156190
{ok, Log4} = ra_log:write([{2, 2, "value"}], Log3),
191+
157192
% simulate the first written event coming after index 20 has already
158193
% been written in a new term
159194
{Log, _} = ra_log:handle_event({written, 1, [2, 1]}, Log4),
195+
ct:pal("Log ~p", [Log]),
160196
% ensure last written has not been incremented
161197
{0, 0} = ra_log:last_written(Log),
162198
{2, 2} = ra_log:last_written(
@@ -1253,6 +1289,27 @@ wal_down_write_returns_error_wal_down(Config) ->
12531289
{error, wal_down} = ra_log:write([{1, 1, hi}], Log0),
12541290
ok.
12551291

1292+
wal_down_stage(Config) ->
1293+
[SupPid] = [P || {ra_log_wal_sup, P, _, _}
1294+
<- supervisor:which_children(ra_log_sup)],
1295+
1296+
Log0 = ra_log:begin_tx(ra_log_init(Config)),
1297+
Log1 = ra_log:append({2, 1, ho},
1298+
ra_log:append({1, 1, hi}, Log0)),
1299+
ct:pal("o1 ~p", [ra_log:overview(Log1)]),
1300+
ok = supervisor:terminate_child(SupPid, ra_log_wal),
1301+
{error, wal_down, Log2} = ra_log:commit_tx(Log1),
1302+
{ok, _} = supervisor:restart_child(SupPid, ra_log_wal),
1303+
%% check it is possible to write the next entries
1304+
?assertEqual(1, ra_log:next_index(Log2)),
1305+
Log3 = ra_log:append({1, 1, hi}, ra_log:begin_tx(Log2)),
1306+
{ok, Log4} = ra_log:commit_tx(Log3),
1307+
ra_log_wal:force_roll_over(ra_log_wal),
1308+
ct:pal("o4 ~p", [ra_log:overview(Log4)]),
1309+
Log = deliver_all_log_events(Log4, 200),
1310+
ct:pal("o ~p", [ra_log:overview(Log)]),
1311+
ok.
1312+
12561313
detect_lost_written_range(Config) ->
12571314
Log0 = ra_log_init(Config, #{wal => ra_log_wal}),
12581315
{0, 0} = ra_log:last_index_term(Log0),

test/ra_log_wal_SUITE.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ all() ->
2424
all_tests() ->
2525
[
2626
basic_log_writes,
27+
rewrite,
2728
sparse_writes,
2829
sparse_write_same_batch,
2930
sparse_write_overwrite,
@@ -156,6 +157,22 @@ basic_log_writes(Config) ->
156157
meck:unload(),
157158
ok.
158159

160+
rewrite(Config) ->
161+
meck:new(ra_log_segment_writer, [passthrough]),
162+
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
163+
Conf = ?config(wal_conf, Config),
164+
{_UId, _} = WriterId = ?config(writer_id, Config),
165+
Tid = ets:new(?FUNCTION_NAME, []),
166+
{ok, Pid} = ra_log_wal:start_link(Conf),
167+
suspend_process(Pid),
168+
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 1, "value"),
169+
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 1, "value"),
170+
erlang:resume_process(Pid),
171+
flush(),
172+
proc_lib:stop(Pid),
173+
meck:unload(),
174+
ok.
175+
159176
sparse_writes(Config) ->
160177
meck:new(ra_log_segment_writer, [passthrough]),
161178
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),

test/ra_mt_SUITE.erl

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ all_tests() ->
3636
successor,
3737
successor_below,
3838
stage_commit,
39+
stage_abort,
3940
range_overlap,
4041
stage_commit_2,
4142
perf,
@@ -464,6 +465,31 @@ stage_commit(_Config) ->
464465
|| I <- lists:seq(1, 10)],
465466
ok.
466467

468+
stage_abort(_Config) ->
469+
Tid = ets:new(t1, [set, public]),
470+
{ok, Mt0} = ra_mt:insert({0, 0, hi}, ra_mt:init(Tid)),
471+
?assertMatch({0, 0}, ra_mt:range(Mt0)),
472+
Mt1 = lists:foldl(
473+
fun (I, Acc) ->
474+
element(2, ra_mt:stage({I, 1, <<"banana">>}, Acc))
475+
end, Mt0, lists:seq(1, 10)),
476+
?assertMatch({0, 10}, ra_mt:range(Mt1)),
477+
[{I, _, _} = ra_mt:lookup(I, Mt1)
478+
|| I <- lists:seq(1, 10)],
479+
Mt2 = ra_mt:abort(Mt1),
480+
?assertMatch({0, 0}, ra_mt:range(Mt2)),
481+
Mt3 = lists:foldl(
482+
fun (I, Acc) ->
483+
element(2, ra_mt:stage({I, 1, <<"banana">>}, Acc))
484+
end, Mt2, lists:seq(1, 10)),
485+
{Entries, Mt4}= ra_mt:commit(Mt3),
486+
?assertMatch({0, 10}, ra_mt:range(Mt4)),
487+
?assertEqual(10, length(Entries)),
488+
?assertMatch([{1, 1, _} | _], Entries),
489+
[{I, _, _} = ra_mt:lookup(I, Mt4)
490+
|| I <- lists:seq(1, 10)],
491+
ok.
492+
467493
range_overlap(_Config) ->
468494
Tid = ets:new(t1, [set, public]),
469495
Mt0 = ra_mt:init(Tid),

0 commit comments

Comments
 (0)