Skip to content

Commit a905053

Browse files
committed
wip
1 parent 642b72f commit a905053

File tree

5 files changed

+271
-40
lines changed

5 files changed

+271
-40
lines changed

src/ra_log_wal.erl

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
-record(batch_writer, {smallest_live_idx :: ra_index(),
5959
tid :: ets:tid(),
6060
uid :: term(),
61-
range :: ra:range(),
61+
seq :: ra_seq:state(),
6262
term :: ra_term(),
6363
old :: undefined | #batch_writer{}
6464
}).
@@ -491,39 +491,39 @@ write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SmallestIndex,
491491
end.
492492

493493

494-
handle_msg({append, {UId, Pid} = Id, MtTid, PrevIdx0, Idx, Term, Entry},
494+
handle_msg({append, {UId, Pid} = Id, MtTid, ExpectedPrevIdx, Idx, Term, Entry},
495495
#state{conf = Conf,
496496
writers = Writers} = State0) ->
497497
SmallestIdx = smallest_live_index(Conf, UId),
498498
%% detect if truncating flag should be set
499499
Trunc = Idx == SmallestIdx,
500500

501-
case maps:find(UId, Writers) of
501+
case maps:get(UId, Writers, undefined) of
502502
_ when Idx < SmallestIdx ->
503503
%% the smallest live index for the last snapshot is higher than
504504
%% this index, just drop it
505505
LastIdx = SmallestIdx - 1,
506506
State0#state{writers = Writers#{UId => {in_seq, LastIdx}}};
507-
{ok, {_, PrevIdx}}
508-
when PrevIdx0 =< PrevIdx orelse
507+
{_, PrevIdx}
508+
when ExpectedPrevIdx =< PrevIdx orelse
509509
Trunc ->
510510
%% if the passed in previous index is less than the last written
511511
%% index (gap detection) _or_ it is a truncation
512512
%% then we can proceed and write the entry
513513
write_data(Id, MtTid, Idx, Term, Entry, Trunc, SmallestIdx, State0);
514-
error ->
514+
undefined ->
515515
%% no state for the UId is known so go ahead and write
516516
write_data(Id, MtTid, Idx, Term, Entry, false, SmallestIdx, State0);
517-
{ok, {out_of_seq, _}} ->
517+
{out_of_seq, _} ->
518518
% writer is out of seq simply ignore drop the write
519519
% TODO: capture metric for dropped writes?
520520
State0;
521-
{ok, {in_seq, PrevIdx}} ->
521+
{in_seq, PrevIdx} ->
522522
% writer was in seq but has sent an out of seq entry
523523
% notify writer
524524
?DEBUG("WAL in ~ts: requesting resend from `~w`, "
525-
"last idx ~b idx received ~b",
526-
[Conf#conf.system, UId, PrevIdx, Idx]),
525+
"last idx ~b idx received (~b,~b)",
526+
[Conf#conf.system, UId, PrevIdx, ExpectedPrevIdx, Idx]),
527527
Pid ! {ra_log_event, {resend_write, PrevIdx + 1}},
528528
State0#state{writers = Writers#{UId => {out_of_seq, PrevIdx}}}
529529
end;
@@ -538,32 +538,28 @@ incr_batch(#batch{num_writes = Writes,
538538
waiting = Waiting0,
539539
pending = Pend} = Batch,
540540
UId, Pid, MT_TID = MtTid,
541-
Idx, TERM = Term, Data, SmallestIdx) ->
541+
Idx, TERM = Term, Data, SmallestLiveIdx) ->
542542
Waiting = case Waiting0 of
543543
#{Pid := #batch_writer{term = TERM,
544544
tid = MT_TID,
545-
range = Range0
546-
} = W} ->
545+
seq = Seq0} = W} ->
547546
%% The Tid and term is the same so add to current batch_writer
548-
Range = ra_range:extend(Idx, ra_range:truncate(SmallestIdx - 1,
549-
Range0)),
547+
Range = ra_seq:append(Idx, Seq0),
550548
%% TODO: range nees to become a ra_seq so that we can
551549
%% capture sparse writes correctly
552-
Waiting0#{Pid => W#batch_writer{range = Range,
553-
smallest_live_idx = SmallestIdx,
554-
term = Term
555-
}};
550+
Waiting0#{Pid => W#batch_writer{seq = Range,
551+
smallest_live_idx = SmallestLiveIdx,
552+
term = Term}};
556553
_ ->
557554
%% The tid is different, open a new batch writer for the
558555
%% new tid and term
559556
PrevBatchWriter = maps:get(Pid, Waiting0, undefined),
560-
Writer = #batch_writer{smallest_live_idx = SmallestIdx,
557+
Writer = #batch_writer{smallest_live_idx = SmallestLiveIdx,
561558
tid = MtTid,
562-
range = ra_range:new(Idx),
559+
seq = [Idx],
563560
uid = UId,
564561
term = Term,
565-
old = PrevBatchWriter
566-
},
562+
old = PrevBatchWriter},
567563
Waiting0#{Pid => Writer}
568564
end,
569565

@@ -699,7 +695,7 @@ complete_batch(#state{batch = #batch{waiting = Waiting,
699695
complete_batch_writer(Pid, #batch_writer{smallest_live_idx = SmallestIdx,
700696
tid = MtTid,
701697
uid = UId,
702-
range = Range,
698+
seq = Range,
703699
term = Term,
704700
old = undefined}, Ranges) ->
705701
Pid ! {ra_log_event, {written, Term, Range}},
@@ -972,23 +968,24 @@ should_roll_wal(#state{conf = #conf{max_entries = MaxEntries},
972968
smallest_live_index(#conf{ra_log_snapshot_state_tid = Tid}, ServerUId) ->
973969
ra_log_snapshot_state:smallest(Tid, ServerUId).
974970

975-
update_ranges(Ranges, UId, MtTid, SmallestIdx, {Start, _} = AddRange) ->
971+
update_ranges(Ranges, UId, MtTid, SmallestIdx, AddSeq) ->
976972
case Ranges of
977-
#{UId := [{MtTid, Range0} | Rem]} ->
973+
#{UId := [{MtTid, Seq0} | Rem]} ->
978974
%% SmallestIdx might have moved to we truncate the old range first
979975
%% before extending
980-
Range1 = ra_range:truncate(SmallestIdx - 1, Range0),
976+
Seq1 = ra_seq:floor(SmallestIdx, Seq0),
981977
%% limit the old range by the add end start as in some resend
982978
%% cases we may have got back before the prior range.
983-
Range = ra_range:add(AddRange, ra_range:limit(Start, Range1)),
984-
Ranges#{UId => [{MtTid, Range} | Rem]};
985-
#{UId := [{OldMtTid, OldMtRange} | Rem]} ->
979+
Seq = ra_seq:add(AddSeq, Seq1),
980+
Ranges#{UId => [{MtTid, Seq} | Rem]};
981+
#{UId := [{OldMtTid, OldMtSeq} | Rem]} ->
986982
%% new Tid, need to add a new range record for this
987-
Ranges#{UId => [{MtTid, AddRange},
988-
ra_range:truncate(SmallestIdx - 1, {OldMtTid, OldMtRange})
983+
Ranges#{UId => [{MtTid, AddSeq},
984+
{OldMtTid,
985+
ra_seq:floor(SmallestIdx, OldMtSeq)}
989986
| Rem]};
990987
_ ->
991-
Ranges#{UId => [{MtTid, AddRange}]}
988+
Ranges#{UId => [{MtTid, AddSeq}]}
992989
end.
993990

994991
recover_entry(Names, UId, {Idx, _, _} = Entry, SmallestIdx,
@@ -1005,7 +1002,7 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SmallestIdx,
10051002
case ra_mt:insert(Entry, Mt0) of
10061003
{ok, Mt1} ->
10071004
Ranges = update_ranges(Ranges0, UId, ra_mt:tid(Mt1),
1008-
SmallestIdx, ra_range:new(Idx)),
1005+
SmallestIdx, [Idx]),
10091006
{ok, State#recovery{ranges = Ranges,
10101007
writers = Writers#{UId => {in_seq, Idx}},
10111008
tables = Tables#{UId => Mt1}}};
@@ -1037,7 +1034,7 @@ recover_entry(Names, UId, {Idx, Term, _}, SmallestIdx,
10371034
tables = Tables#{UId => Mt0}}};
10381035
Tid ->
10391036
Ranges = update_ranges(Ranges0, UId, Tid,
1040-
SmallestIdx, ra_range:new(Idx)),
1037+
SmallestIdx, [Idx]),
10411038
{ok, State#recovery{ranges = Ranges,
10421039
writers = Writers#{UId => {in_seq, Idx}},
10431040
tables = Tables#{UId => Mt0}}}

src/ra_range.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@
2525

2626
-export_type([range/0]).
2727

28+
-define(IS_RANGE(R), ((is_tuple(R) andalso
29+
tuple_size(R) == 2 andalso
30+
is_integer(element(1, R)) andalso
31+
is_integer(element(2, R))) orelse
32+
R == undefined)).
33+
34+
2835
-spec new(ra:index()) -> range().
2936
new(Start) when is_integer(Start) ->
3037
{Start, Start}.
@@ -71,14 +78,17 @@ limit(CeilExcl, Range)
7178
-spec truncate(ra:index(), range()) -> range().
7279
truncate(UpToIncl, {_Start, End})
7380
when is_integer(UpToIncl) andalso
81+
is_integer(End) andalso
7482
UpToIncl >= End ->
7583
undefined;
7684
truncate(UpToIncl, {Start, End})
7785
when is_integer(UpToIncl) andalso
86+
is_integer(Start) andalso
7887
UpToIncl >= Start ->
7988
{UpToIncl + 1, End};
8089
truncate(UpToIncl, Range)
81-
when is_integer(UpToIncl) ->
90+
when is_integer(UpToIncl) andalso
91+
?IS_RANGE(Range) ->
8292
Range.
8393

8494
size(undefined) ->

src/ra_seq.erl

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,126 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2017-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
16
-module(ra_seq).
27

38
%% open type
9+
%% sequences are ordered high -> low but ranges are ordered
10+
%% {low, high} so a typical sequence could look like
11+
%% [55, {20, 52}, 3]
412
-type state() :: [ra:index() | ra:range()].
513

614
-export_type([state/0]).
715

816

17+
-export([
18+
append/2,
19+
from_list/1,
20+
floor/2,
21+
limit/2,
22+
add/2,
23+
fold/3
24+
]).
925

26+
-spec append(ra:index(), state()) -> state().
27+
append(Idx, [IdxN1, IdxN2 | Rem])
28+
when Idx == IdxN1 + 1 andalso
29+
Idx == IdxN2 + 2 ->
30+
%% we can compact into a range
31+
[{IdxN2, Idx} | Rem];
32+
append(Idx, [{IdxN, IdxN1} | Rem])
33+
when Idx == IdxN1 + 1 ->
34+
%% Extend the raage
35+
[{IdxN, Idx} | Rem];
36+
append(Idx, [])
37+
when is_integer(Idx) ->
38+
[Idx];
39+
append(Idx, [Prev | _] = Seq)
40+
when is_integer(Idx) andalso
41+
((is_tuple(Prev) andalso
42+
Idx > element(2, Prev)) orelse
43+
Idx > Prev) ->
44+
[Idx | Seq].
45+
46+
-spec from_list([ra:index()]) -> state().
47+
from_list(L) ->
48+
lists:foldl(fun append/2, [], lists:sort(L)).
49+
50+
-spec floor(ra:index(), state()) -> state().
51+
floor(FloorIdxIncl, Seq) ->
52+
%% TODO: assert appendable
53+
%% for now assume appendable
54+
floor0(FloorIdxIncl, Seq, []).
55+
56+
57+
limit(CeilIdx, [Last | Rem])
58+
when is_integer(Last) andalso
59+
Last > CeilIdx ->
60+
limit(CeilIdx, Rem);
61+
limit(CeilIdx, [{_, _} = T | Rem]) ->
62+
case ra_range:limit(CeilIdx + 1, T) of
63+
undefined ->
64+
limit(CeilIdx, Rem);
65+
{I, I} ->
66+
[I | Rem];
67+
{I, I2} when I == I2 - 1 ->
68+
[I2, I | Rem];
69+
NewRange ->
70+
[NewRange | Rem]
71+
end;
72+
limit(_CeilIdx, Seq) ->
73+
Seq.
74+
75+
-spec add(state(), state()) -> state().
76+
add([], Seq2) ->
77+
Seq2;
78+
add(Seq1, Seq2) ->
79+
Fst = case lists:last(Seq1) of
80+
{I, _} -> I;
81+
I -> I
82+
end,
83+
fold(fun append/2, limit(Fst - 1, Seq2), Seq1).
84+
85+
86+
-spec fold(fun ((ra:index(), Acc) -> Acc), Acc, state())
87+
-> Acc when Acc :: term().
88+
fold(Fun, Acc0, Seq) ->
89+
%% TODO: factor out the lists:seq/2
90+
lists:foldr(
91+
fun ({S, E}, Acc) ->
92+
lists:foldl(Fun, Acc, lists:seq(S, E));
93+
(Idx, Acc) ->
94+
Fun(Idx, Acc)
95+
end, Acc0, Seq).
96+
97+
%% internal functions
98+
99+
floor0(FloorIdx, [Last | Rem], Acc)
100+
when is_integer(Last) andalso
101+
Last >= FloorIdx ->
102+
floor0(FloorIdx, Rem, [Last | Acc]);
103+
floor0(FloorIdx, [{_, _} = T | Rem], Acc) ->
104+
case ra_range:truncate(FloorIdx - 1, T) of
105+
undefined ->
106+
lists:reverse(Acc);
107+
{I, I} ->
108+
floor0(FloorIdx, Rem, [I | Acc]);
109+
{I, I2} when I == I2 - 1 ->
110+
floor0(FloorIdx, Rem, [I, I2 | Acc]);
111+
NewRange ->
112+
floor0(FloorIdx, Rem, [NewRange | Acc])
113+
end;
114+
floor0(_FloorIdx, _Seq, Acc) ->
115+
lists:reverse(Acc).
116+
117+
% first_index(Seq) ->
118+
% last_index(lists:reverse(Seq)).
119+
120+
% last_index([{_, I} | _]) ->
121+
% I;
122+
% last_index([I | _])
123+
% when is_integer(I) ->
124+
% I;
125+
% last_index([]) ->
126+
% undefined.

test/ra_log_wal_SUITE.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,14 @@ sparse_writes(Config) ->
159159
{ok, Pid} = ra_log_wal:start_link(Conf),
160160
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 11, 12, 1, "value"),
161161
ok = await_written(WriterId, 1, {12, 12}),
162-
debugger:start(),
163-
int:i(ra_log_wal),
164-
int:break(ra_log_wal, 975),
162+
% debugger:start(),
163+
% int:i(ra_log_wal),
164+
% int:break(ra_log_wal, 975),
165165
timer:sleep(1000),
166+
%% write a "sparse write" at index 15 but reference 12 as the last
167+
%% one
166168
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 15, 1, "value2"),
167-
timer:sleep(200000),
169+
% timer:sleep(200000),
168170
ok = await_written(WriterId, 1, {15, 15}),
169171
ra_log_wal:force_roll_over(Pid),
170172
receive

0 commit comments

Comments
 (0)