1717
1818-export ([
1919 write /6 ,
20+ write /7 ,
2021 write_batch /2 ,
2122 last_writer_seq /2 ,
2223 force_roll_over /1 ]).
140141-export_type ([wal_conf / 0 ]).
141142
142143-type wal_command () ::
143- {append , writer_id (), ra_index (), ra_term (), term ()}.
144+ {append , writer_id (), PrevIndex :: ra :index () | - 1 ,
145+ Index :: ra :index (), Term :: ra_term (), wal_cmd ()}.
144146
145147-type wal_op () :: {cast , wal_command ()} |
146148 {call , from (), wal_command ()}.
149151-spec write (atom () | pid (), writer_id (), ets :tid (), ra_index (), ra_term (),
150152 wal_cmd ()) ->
151153 {ok , pid ()} | {error , wal_down }.
152- write (Wal , {_ , _ } = From , MtTid , Idx , Term , Cmd )
154+ write (Wal , From , MtTid , Idx , Term , Cmd ) ->
155+ % % "normal opereation where we assume a contigious sequence
156+ % % this may be removed at some point
157+ write (Wal , From , MtTid , Idx - 1 , Idx , Term , Cmd ).
158+
159+ -spec write (atom () | pid (), writer_id (), ets :tid (),
160+ PrevIndex :: ra :index () | - 1 ,
161+ Index :: ra_index (),
162+ Term :: ra_term (),
163+ wal_cmd ()) ->
164+ {ok , pid ()} | {error , wal_down }.
165+ write (Wal , {_ , _ } = From , MtTid , PrevIdx , Idx , Term , Cmd )
153166 when is_integer (Idx ) andalso
154- is_integer (Term ) ->
155- named_cast (Wal , {append , From , MtTid , Idx , Term , Cmd }).
167+ is_integer (PrevIdx ) andalso
168+ is_integer (Term ) andalso
169+ PrevIdx < Idx ->
170+ named_cast (Wal , {append , From , MtTid , PrevIdx , Idx , Term , Cmd }).
156171
157172-spec write_batch (Wal :: atom () | pid (), [wal_command ()]) ->
158173 {ok , pid ()} | {error , wal_down }.
@@ -476,7 +491,7 @@ write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SmallestIndex,
476491 end .
477492
478493
479- handle_msg ({append , {UId , Pid } = Id , MtTid , Idx , Term , Entry },
494+ handle_msg ({append , {UId , Pid } = Id , MtTid , PrevIdx0 , Idx , Term , Entry },
480495 # state {conf = Conf ,
481496 writers = Writers } = State0 ) ->
482497 SmallestIdx = smallest_live_index (Conf , UId ),
@@ -487,13 +502,17 @@ handle_msg({append, {UId, Pid} = Id, MtTid, Idx, Term, Entry},
487502 _ when Idx < SmallestIdx ->
488503 % % the smallest live index for the last snapshot is higher than
489504 % % this index, just drop it
490- PrevIdx = SmallestIdx - 1 ,
491- State0 # state {writers = Writers #{UId => {in_seq , PrevIdx }}};
505+ LastIdx = SmallestIdx - 1 ,
506+ State0 # state {writers = Writers #{UId => {in_seq , LastIdx }}};
492507 {ok , {_ , PrevIdx }}
493- when Idx =< PrevIdx + 1 orelse
508+ when PrevIdx0 =< PrevIdx orelse
494509 Trunc ->
510+ % % if the passed in previous index is less than the last written
511+ % % index (gap detection) _or_ it is a truncation
512+ % % then we can proceed and write the entry
495513 write_data (Id , MtTid , Idx , Term , Entry , Trunc , SmallestIdx , State0 );
496514 error ->
515+ % % no state for the UId is known so go ahead and write
497516 write_data (Id , MtTid , Idx , Term , Entry , false , SmallestIdx , State0 );
498517 {ok , {out_of_seq , _ }} ->
499518 % writer is out of seq simply ignore drop the write
@@ -528,6 +547,8 @@ incr_batch(#batch{num_writes = Writes,
528547 % % The Tid and term is the same so add to current batch_writer
529548 Range = ra_range :extend (Idx , ra_range :truncate (SmallestIdx - 1 ,
530549 Range0 )),
550+ % % TODO: range nees to become a ra_seq so that we can
551+ % % capture sparse writes correctly
531552 Waiting0 #{Pid => W # batch_writer {range = Range ,
532553 smallest_live_idx = SmallestIdx ,
533554 term = Term
0 commit comments