Skip to content

Commit 3418318

Browse files
committed
fixes
1 parent ed35fdf commit 3418318

File tree

4 files changed

+83
-62
lines changed

4 files changed

+83
-62
lines changed

src/ra_kv.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
-module(ra_kv).
2-
32
-feature(maybe_expr, enable).
4-
53
-behaviour(ra_machine).
6-
-include("src/ra.hrl").
74

5+
-include("src/ra.hrl").
86
-include_lib("eunit/include/eunit.hrl").
97

108
-export([

src/ra_log.erl

Lines changed: 61 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
partial_read/3,
2727
execute_read_plan/4,
2828
read_plan_info/1,
29+
previous_wal_index/1,
2930
last_index_term/1,
3031
set_last_index/2,
3132
handle_event/2,
@@ -111,10 +112,10 @@
111112
snapshot_state :: ra_snapshot:state(),
112113
current_snapshot :: option(ra_idxterm()),
113114
last_resend_time :: option({integer(), WalPid :: pid() | undefined}),
114-
last_wal_write :: {pid(), Ms :: integer()},
115+
last_wal_write :: {pid(), Ms :: integer(), ra:index() | -1},
115116
reader :: ra_log_segments:state(),
116117
mem_table :: ra_mt:state(),
117-
tx = false :: boolean(),
118+
tx = false :: false | {true, ra:range()},
118119
pending = [] :: ra_seq:state(),
119120
live_indexes = [] :: ra_seq:state()
120121
}).
@@ -265,6 +266,17 @@ init(#{uid := UId,
265266
{SnapIdx, Range}})
266267
end
267268
end,
269+
LastWalIdx = case ra_log_wal:last_writer_seq(Wal, UId) of
270+
{ok, undefined} ->
271+
-1;
272+
{ok, Idx} ->
273+
Idx;
274+
{error, wal_down} ->
275+
?ERROR("~ts: ra_log:init/1 cannot complete as wal"
276+
" process is down.",
277+
[LogId]),
278+
exit(wal_down)
279+
end,
268280
Cfg = #cfg{directory = Dir,
269281
uid = UId,
270282
log_id = LogId,
@@ -282,7 +294,7 @@ init(#{uid := UId,
282294
mem_table = Mt,
283295
snapshot_state = SnapshotState,
284296
current_snapshot = ra_snapshot:current(SnapshotState),
285-
last_wal_write = {whereis(Wal), now_ms()},
297+
last_wal_write = {whereis(Wal), now_ms(), LastWalIdx},
286298
live_indexes = LiveIndexes
287299
},
288300
put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
@@ -316,18 +328,7 @@ init(#{uid := UId,
316328
{_, L} ->
317329
L
318330
end,
319-
LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of
320-
{ok, undefined} ->
321-
%% take last segref index
322-
max(SnapIdx, LastSegRefIdx);
323-
{ok, Idx} ->
324-
max(Idx, LastSegRefIdx);
325-
{error, wal_down} ->
326-
?ERROR("~ts: ra_log:init/1 cannot complete as wal"
327-
" process is down.",
328-
[State2#?MODULE.cfg#cfg.log_id]),
329-
exit(wal_down)
330-
end,
331+
LastWrittenIdx = lists:max([LastWalIdx, SnapIdx, LastSegRefIdx]),
331332
{LastWrittenTerm, State3} = case LastWrittenIdx of
332333
SnapIdx ->
333334
{SnapTerm, State2};
@@ -360,28 +361,32 @@ close(#?MODULE{cfg = #cfg{uid = _UId},
360361

361362
-spec begin_tx(state()) -> state().
362363
begin_tx(State) ->
363-
State#?MODULE{tx = true}.
364+
State#?MODULE{tx = {true, undefined}}.
364365

365366
-spec commit_tx(state()) -> {ok, state()} | {error, wal_down, state()}.
366367
commit_tx(#?MODULE{cfg = #cfg{uid = UId,
367368
wal = Wal} = Cfg,
368-
tx = true,
369+
tx = {true, TxRange},
370+
range = Range,
369371
mem_table = Mt1} = State) ->
370372
{Entries, Mt} = ra_mt:commit(Mt1),
371373
Tid = ra_mt:tid(Mt),
372374
WriterId = {UId, self()},
373-
{WalCommands, Num} =
374-
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N}) ->
375+
PrevIdx = previous_wal_index(State),
376+
{WalCommands, Num, _} =
377+
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N, Prev}) ->
375378
Cmd = {ttb, term_to_iovec(Cmd0)},
376-
WalC = {append, WriterId, Tid, Idx-1, Idx, Term, Cmd},
377-
{[WalC | WC], N+1}
378-
end, {[], 0}, Entries),
379+
WalC = {append, WriterId, Tid, Prev, Idx, Term, Cmd},
380+
{[WalC | WC], N+1, Idx}
381+
end, {[], 0, PrevIdx}, Entries),
382+
{_, LastIdx} = Range,
379383

380384
case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of
381385
{ok, Pid} ->
382386
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),
383387
{ok, State#?MODULE{tx = false,
384-
last_wal_write = {Pid, now_ms()},
388+
range = ra_range:add(TxRange, Range),
389+
last_wal_write = {Pid, now_ms(), LastIdx},
385390
mem_table = Mt}};
386391
{error, wal_down} ->
387392
%% still need to return the state here
@@ -421,7 +426,7 @@ append({Idx, Term, Cmd0} = Entry,
421426
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
422427
State#?MODULE{range = ra_range:extend(Idx, Range),
423428
last_term = Term,
424-
last_wal_write = {Pid, now_ms()},
429+
last_wal_write = {Pid, now_ms(), Idx},
425430
pending = ra_seq:append(Idx, Pend),
426431
mem_table = Mt};
427432
{error, wal_down} ->
@@ -438,15 +443,14 @@ append({Idx, Term, Cmd0} = Entry,
438443
end;
439444
append({Idx, Term, _Cmd} = Entry,
440445
#?MODULE{cfg = Cfg,
441-
range = Range,
442-
tx = true,
446+
tx = {true, TxRange},
443447
pending = Pend0,
444448
mem_table = Mt0} = State)
445-
when ?IS_NEXT_IDX(Idx, Range) ->
449+
when ?IS_NEXT_IDX(Idx, TxRange) ->
446450
case ra_mt:stage(Entry, Mt0) of
447451
{ok, Mt} ->
448452
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
449-
State#?MODULE{range = ra_range:extend(Idx, Range),
453+
State#?MODULE{tx = {true, ra_range:extend(Idx, TxRange)},
450454
last_term = Term,
451455
pending = ra_seq:append(Idx, Pend0),
452456
mem_table = Mt};
@@ -459,9 +463,10 @@ append({Idx, Term, _Cmd} = Entry,
459463
Cfg#cfg.uid, Mt0),
460464
append(Entry, State#?MODULE{mem_table = M0})
461465
end;
462-
append({Idx, _, _}, #?MODULE{range = Range}) ->
463-
Msg = lists:flatten(io_lib:format("tried writing ~b - current range ~w",
464-
[Idx, Range])),
466+
append({Idx, _, _}, #?MODULE{range = Range,
467+
tx = Tx}) ->
468+
Msg = lists:flatten(io_lib:format("tried writing ~b - current range ~w tx ~p",
469+
[Idx, Range, Tx])),
465470
exit({integrity_error, Msg}).
466471

467472
-spec write(Entries :: [log_entry()], State :: state()) ->
@@ -505,13 +510,7 @@ write_sparse({Idx, Term, _} = Entry, PrevIdx0,
505510
{ok, Mt} = ra_mt:insert_sparse(Entry, PrevIdx0, Mt0),
506511
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
507512
Tid = ra_mt:tid(Mt),
508-
PrevIdx = case PrevIdx0 of
509-
undefined ->
510-
%% this is likely to always be accepted
511-
0;
512-
_ ->
513-
PrevIdx0
514-
end,
513+
PrevIdx = previous_wal_index(State0),
515514
case ra_log_wal:write(Wal, {UId, self()}, Tid, PrevIdx, Idx,
516515
Term, Entry) of
517516
{ok, Pid} ->
@@ -526,7 +525,7 @@ write_sparse({Idx, Term, _} = Entry, PrevIdx0,
526525
{ok, State0#?MODULE{range = NewRange,
527526
last_term = Term,
528527
mem_table = Mt,
529-
last_wal_write = {Pid, now_ms()}}};
528+
last_wal_write = {Pid, now_ms(), Idx}}};
530529
{error, wal_down} = Err->
531530
Err
532531
end.
@@ -704,6 +703,15 @@ read_plan_info(#read_plan{read = Read,
704703
num_segments => NumSegments}.
705704

706705

706+
-spec previous_wal_index(state()) -> ra_idxterm() | -1.
707+
previous_wal_index(#?MODULE{range = Range}) ->
708+
case Range of
709+
undefined ->
710+
-1;
711+
{_, LastIdx} ->
712+
LastIdx
713+
end.
714+
707715
-spec last_index_term(state()) -> option(ra_idxterm()).
708716
last_index_term(#?MODULE{range = {_, LastIdx},
709717
last_term = LastTerm}) ->
@@ -972,6 +980,8 @@ handle_event({down, _Pid, _Info}, #?MODULE{} = State) ->
972980
{State, []}.
973981

974982
-spec next_index(state()) -> ra_index().
983+
next_index(#?MODULE{tx = {true, {_, Last}}}) ->
984+
Last + 1;
975985
next_index(#?MODULE{range = {_, LastIdx}}) ->
976986
LastIdx + 1;
977987
next_index(#?MODULE{current_snapshot = {SnapIdx, _}}) ->
@@ -1124,7 +1134,7 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg,
11241134
tick(Now, #?MODULE{cfg = #cfg{wal = Wal},
11251135
mem_table = Mt,
11261136
last_written_index_term = {LastWrittenIdx, _},
1127-
last_wal_write = {WalPid, Ms}} = State) ->
1137+
last_wal_write = {WalPid, Ms, _}} = State) ->
11281138
CurWalPid = whereis(Wal),
11291139
MtRange = ra_mt:range(Mt),
11301140
case Now > Ms + ?WAL_RESEND_TIMEOUT andalso
@@ -1262,7 +1272,7 @@ overview(#?MODULE{range = Range,
12621272
snapshot_state = SnapshotState,
12631273
current_snapshot = CurrSnap,
12641274
reader = Reader,
1265-
last_wal_write = {_LastPid, LastMs},
1275+
last_wal_write = {_LastPid, LastMs, LastWalIdx},
12661276
mem_table = Mt,
12671277
pending = Pend
12681278
} = State) ->
@@ -1291,6 +1301,7 @@ overview(#?MODULE{range = Range,
12911301
mem_table_range => ra_mt:range(Mt),
12921302
mem_table_info => ra_mt:info(Mt),
12931303
last_wal_write => LastMs,
1304+
last_wal_index => LastWalIdx,
12941305
num_pending => ra_seq:length(Pend)
12951306
}.
12961307

@@ -1351,15 +1362,15 @@ release_resources(MaxOpenSegments, AccessPattern,
13511362
%% only used by resend to wal functionality and doesn't update the mem table
13521363
wal_rewrite(#?MODULE{cfg = #cfg{uid = UId,
13531364
wal = Wal} = Cfg,
1354-
range = _Range} = State,
1365+
last_wal_write = {_, _, _LastWalIdx}} = State,
13551366
Tid, {Idx, Term, Cmd}) ->
13561367
case ra_log_wal:write(Wal, {UId, self()}, Tid, Idx, Term, Cmd) of
13571368
{ok, Pid} ->
13581369
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
13591370
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
13601371
State#?MODULE{%last_index = Idx,
13611372
last_term = Term,
1362-
last_wal_write = {Pid, now_ms()}
1373+
last_wal_write = {Pid, now_ms(), Idx}
13631374
};
13641375
{error, wal_down} ->
13651376
error(wal_down)
@@ -1372,14 +1383,15 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
13721383
mem_table = Mt0} = State,
13731384
[{FstIdx, _, _} | _] = Entries) ->
13741385
WriterId = {UId, self()},
1386+
PrevIdx = previous_wal_index(State),
13751387
%% all entries in a transaction are written to the same tid
13761388
Tid = ra_mt:tid(Mt0),
1377-
{WalCommands, Num, Pend} =
1378-
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N, P}) ->
1389+
{WalCommands, Num, LastIdx, Pend} =
1390+
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N, Prev, P}) ->
13791391
Cmd = {ttb, term_to_iovec(Cmd0)},
1380-
WalC = {append, WriterId, Tid, Idx-1, Idx, Term, Cmd},
1381-
{[WalC | WC], N+1, ra_seq:append(Idx, P)}
1382-
end, {[], 0, Pend0}, Entries),
1392+
WalC = {append, WriterId, Tid, Prev, Idx, Term, Cmd},
1393+
{[WalC | WC], N+1, Idx, ra_seq:append(Idx, P)}
1394+
end, {[], 0, PrevIdx, Pend0}, Entries),
13831395

13841396
[{_, _, _, _PrevIdx, LastIdx, LastTerm, _} | _] = WalCommands,
13851397
{_, Mt} = ra_mt:commit(Mt0),
@@ -1395,7 +1407,7 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
13951407
{ok, Pid} ->
13961408
{ok, State#?MODULE{range = NewRange,
13971409
last_term = LastTerm,
1398-
last_wal_write = {Pid, now_ms()},
1410+
last_wal_write = {Pid, now_ms(), LastIdx},
13991411
mem_table = Mt,
14001412
pending = Pend}};
14011413
{error, wal_down} = Err ->

test/ra_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,7 @@ wait_for_applied(Msg) ->
709709
false -> wait_for_applied(Msg)
710710
end
711711
after 10000 ->
712+
flush(),
712713
error({timeout_waiting_for_applied, Msg})
713714
end.
714715

test/ra_log_2_SUITE.erl

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ all_tests() ->
7575
write_config,
7676
sparse_write,
7777
overwritten_segment_is_cleared,
78-
overwritten_segment_is_cleared_on_init
78+
overwritten_segment_is_cleared_on_init,
79+
snapshot_installation_with_live_indexes
7980
].
8081

8182
groups() ->
@@ -830,11 +831,6 @@ write_sparse_re_init(Config) ->
830831

831832
ok.
832833

833-
write_sparse_after_snapshot_install(Config) ->
834-
835-
836-
ok.
837-
838834
write_sparse_after_index_reset(Config) ->
839835
Log0 = ra_log_init(Config),
840836
Log1 = write_n(1, 6, 1, Log0),
@@ -1371,19 +1367,33 @@ snapshot_installation_with_live_indexes(Config) ->
13711367

13721368
%% create snapshot chunk
13731369
Meta = meta(15, 2, [?N1]),
1374-
Chunk = create_snapshot_chunk(Config, Meta, #{}),
1370+
Chunk = create_snapshot_chunk(Config, Meta, [2, 9, 14], #{}),
13751371
SnapState0 = ra_log:snapshot_state(Log2),
13761372
{ok, SnapState1} = ra_snapshot:begin_accept(Meta, SnapState0),
13771373
Machine = {machine, ?MODULE, #{}},
1374+
1375+
%% write a sparse one
13781376
{SnapState, _, LiveIndexes, AEffs} = ra_snapshot:complete_accept(Chunk, 1, Machine,
13791377
SnapState1),
13801378
run_effs(AEffs),
1379+
{ok, Log2b} = ra_log:write_sparse({14, 2, <<>>}, 9, Log2),
13811380
{ok, Log3, Effs4} = ra_log:install_snapshot({15, 2}, ?MODULE, LiveIndexes,
1382-
ra_log:set_snapshot_state(SnapState, Log2)),
1381+
ra_log:set_snapshot_state(SnapState, Log2b)),
1382+
13831383

13841384
run_effs(Effs4),
1385+
ct:pal("o ~p", [ra_log:overview(Log3)]),
13851386
{15, _} = ra_log:last_index_term(Log3),
13861387
{15, _} = ra_log:last_written(Log3),
1388+
%% write the next index, bearning in mind the last index the WAL saw
1389+
%% was 14
1390+
{ok, Log4} = ra_log:write([{16, 2, <<>>}], Log3),
1391+
Log = assert_log_events(Log4,
1392+
fun (L) ->
1393+
LW = ra_log:last_written(L),
1394+
{16, 2} == LW
1395+
end),
1396+
ct:pal("o ~p", [ra_log:overview(Log)]),
13871397
ok.
13881398

13891399
snapshot_installation(Config) ->
@@ -2148,8 +2158,8 @@ meta(Idx, Term, Cluster) ->
21482158
cluster => Cluster,
21492159
machine_version => 1}.
21502160

2151-
create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) ->
2152-
create_snapshot_chunk(Config, #{index := Idx} = Meta, <<"9">>, Context).
2161+
create_snapshot_chunk(Config, Meta, Context) ->
2162+
create_snapshot_chunk(Config, Meta, <<"9">>, Context).
21532163

21542164
create_snapshot_chunk(Config, #{index := Idx} = Meta, MacState, Context) ->
21552165
OthDir = filename:join(?config(work_dir, Config), "snapshot_installation"),

0 commit comments

Comments
 (0)