7171 #{writer_id () => binary ()}}.
7272
7373-record (conf , {dir :: file :filename_all (),
74+ system :: atom (),
7475 segment_writer = ra_log_segment_writer :: atom () | pid (),
7576 compute_checksums = false :: boolean (),
7677 max_size_bytes :: non_neg_integer (),
120121 }).
121122
122123-type state () :: # state {}.
123- -type wal_conf () :: #{name := atom (), % % the name to register the wal as
124- names := ra_system : names (),
124+ -type wal_conf () :: #{names := ra_system : names (),
125+ system := atom (),
125126 dir := file :filename_all (),
126127 max_size_bytes => non_neg_integer (),
127128 max_entries => non_neg_integer (),
128- segment_writer => atom () | pid (),
129129 compute_checksums => boolean (),
130130 pre_allocate => boolean (),
131131 sync_method => sync | datasync ,
@@ -222,7 +222,9 @@ force_roll_over(Wal) ->
222222 {ok , pid ()} |
223223 {error , {already_started , pid ()}} |
224224 {error , wal_checksum_validation_failure }.
225- start_link (#{name := Name } = Config )
225+ start_link (#{dir := _ ,
226+ system := _ ,
227+ names := #{wal := Name }} = Config )
226228 when is_atom (Name ) ->
227229 WalMaxBatchSize = maps :get (max_batch_size , Config ,
228230 ? WAL_DEFAULT_MAX_BATCH_SIZE ),
@@ -241,22 +243,22 @@ start_link(#{name := Name} = Config)
241243-spec init (wal_conf ()) ->
242244 {ok , state ()} |
243245 {stop , wal_checksum_validation_failure } | {stop , term ()}.
244- init (#{dir := Dir } = Conf0 ) ->
246+ init (#{system := System ,
247+ dir := Dir } = Conf0 ) ->
245248 #{max_size_bytes := MaxWalSize ,
246249 max_entries := MaxEntries ,
247250 recovery_chunk_size := RecoveryChunkSize ,
248- segment_writer := SegWriter ,
249251 compute_checksums := ComputeChecksums ,
250252 pre_allocate := PreAllocate ,
251253 sync_method := SyncMethod ,
252254 garbage_collect := Gc ,
253255 min_heap_size := MinHeapSize ,
254256 min_bin_vheap_size := MinBinVheapSize ,
255257 names := #{wal := WalName ,
258+ segment_writer := SegWriter ,
256259 open_mem_tbls := MemTablesName } = Names } =
257260 merge_conf_defaults (Conf0 ),
258- ? NOTICE (" WAL: ~ts init, mem-tables table name: ~w " ,
259- [WalName , MemTablesName ]),
261+ ? NOTICE (" WAL in ~ts initialising with name ~ts " , [System , WalName ]),
260262 process_flag (trap_exit , true ),
261263 % given ra_log_wal is effectively a fan-in sink it is likely that it will
262264 % at times receive large number of messages from a large number of
@@ -266,6 +268,7 @@ init(#{dir := Dir} = Conf0) ->
266268 process_flag (min_heap_size , MinHeapSize ),
267269 CRef = ra_counters :new (WalName , ? COUNTER_FIELDS ),
268270 Conf = # conf {dir = Dir ,
271+ system = System ,
269272 segment_writer = SegWriter ,
270273 compute_checksums = ComputeChecksums ,
271274 max_size_bytes = max (? WAL_MIN_SIZE , MaxWalSize ),
@@ -285,8 +288,8 @@ init(#{dir := Dir} = Conf0) ->
285288 ok = ra_log_segment_writer :await (SegWriter ),
286289 {ok , Result }
287290 catch _ :Err :Stack ->
288- ? ERROR (" WAL: failed to initialise with ~p , stack ~p " ,
289- [Err , Stack ]),
291+ ? ERROR (" WAL in ~ts failed to initialise with ~p , stack ~p " ,
292+ [System , Err , Stack ]),
290293 {stop , Err }
291294 end .
292295
@@ -304,8 +307,9 @@ handle_batch(Ops, #state{conf = #conf{explicit_gc = Gc}} = State0) ->
304307 % % process all ops
305308 {ok , Actions , complete_batch (State )}.
306309
307- terminate (Reason , State ) ->
308- ? DEBUG (" wal: terminating with ~W " , [Reason , 20 ]),
310+ terminate (Reason , # state {conf = # conf {system = System }} = State ) ->
311+
312+ ? DEBUG (" WAL in ~ts : terminating with ~0P " , [System , Reason , 20 ]),
309313 _ = cleanup (State ),
310314 ok .
311315
@@ -337,7 +341,8 @@ handle_op({info, {'EXIT', _, Reason}}, _State) ->
337341 % % this is here for testing purposes only
338342 throw ({stop , Reason }).
339343
340- recover_wal (Dir , # conf {segment_writer = SegWriter ,
344+ recover_wal (Dir , # conf {system = System ,
345+ segment_writer = SegWriter ,
341346 mem_tables_tid = MemTblsTid } = Conf ) ->
342347 % ensure configured directory exists
343348 ok = ra_lib :make_dir (Dir ),
@@ -363,7 +368,8 @@ recover_wal(Dir, #conf{segment_writer = SegWriter,
363368 WalFiles = lists :sort (Files ),
364369 AllWriters =
365370 [begin
366- ? DEBUG (" wal: recovering ~ts , Mode ~s " , [F , Mode ]),
371+ ? DEBUG (" WAL in ~ts : recovering ~ts , Mode ~s " ,
372+ [System , F , Mode ]),
367373 Fd = open_at_first_record (filename :join (Dir , F )),
368374 {Time , # recovery {ranges = Ranges ,
369375 writers = Writers }} =
@@ -372,16 +378,17 @@ recover_wal(Dir, #conf{segment_writer = SegWriter,
372378 ok = ra_log_segment_writer :accept_mem_tables (SegWriter , Ranges , F ),
373379
374380 close_existing (Fd ),
375- ? DEBUG (" wal : recovered ~ts time taken ~b ms - Writer state recovered ~p " ,
376- [F , Time div 1000 , Writers ]),
381+ ? DEBUG (" WAL in ~ts : recovered ~ts time taken ~b ms - Writer state recovered ~p " ,
382+ [System , F , Time div 1000 , Writers ]),
377383 Writers
378384 end || F <- WalFiles ],
379385
380386 FinalWriters = lists :foldl (fun (New , Acc ) ->
381387 maps :merge (Acc , New )
382388 end , #{}, AllWriters ),
383389
384- ? DEBUG (" wal: final writer state recovered ~p " , [FinalWriters ]),
390+ ? DEBUG (" WAL in ~ts : final writer state recovered ~p " ,
391+ [System , FinalWriters ]),
385392
386393 FileNum = extract_file_num (lists :reverse (WalFiles )),
387394 State = roll_over (# state {conf = Conf ,
@@ -492,9 +499,9 @@ handle_msg({append, {UId, Pid} = Id, MtTid, Idx, Term, Entry},
492499 {ok , {in_seq , PrevIdx }} ->
493500 % writer was in seq but has sent an out of seq entry
494501 % notify writer
495- ? DEBUG (" WAL: requesting resend from `~w `, "
502+ ? DEBUG (" WAL in ~ts : requesting resend from `~w `, "
496503 " last idx ~b idx received ~b " ,
497- [UId , PrevIdx , Idx ]),
504+ [Conf # conf . system , UId , PrevIdx , Idx ]),
498505 Pid ! {ra_log_event , {resend_write , PrevIdx + 1 }},
499506 State0 # state {writers = Writers #{UId => {out_of_seq , PrevIdx }}}
500507 end ;
@@ -549,13 +556,14 @@ complete_batch_and_roll(#state{} = State0) ->
549556
550557roll_over (# state {wal = Wal0 , file_num = Num0 ,
551558 conf = # conf {dir = Dir ,
559+ system = System ,
552560 segment_writer = SegWriter ,
553561 max_size_bytes = MaxBytes } = Conf0 } = State0 ) ->
554562 counters :add (Conf0 # conf .counter , ? C_WAL_FILES , 1 ),
555563 Num = Num0 + 1 ,
556564 Fn = ra_lib :zpad_filename (" " , " wal" , Num ),
557565 NextFile = filename :join (Dir , Fn ),
558- ? DEBUG (" wal : opening new file ~ts " , [Fn ]),
566+ ? DEBUG (" WAL in ~ts : opening new file ~ts " , [System , Fn ]),
559567 % % if this is the first wal since restart randomise the first
560568 % % max wal size to reduce the likelihood that each erlang node will
561569 % % flush mem tables at the same time
@@ -608,7 +616,8 @@ make_tmp(File) ->
608616 ok = file :close (Fd ),
609617 Tmp .
610618
611- maybe_pre_allocate (# conf {pre_allocate = true } = Conf , Fd , Max0 ) ->
619+ maybe_pre_allocate (# conf {system = System ,
620+ pre_allocate = true } = Conf , Fd , Max0 ) ->
612621 Max = Max0 - ? HEADER_SIZE ,
613622 case file :allocate (Fd , ? HEADER_SIZE , Max ) of
614623 ok ->
@@ -619,8 +628,9 @@ maybe_pre_allocate(#conf{pre_allocate = true} = Conf, Fd, Max0) ->
619628 {error , _ } ->
620629 % % fallocate may not be supported, fall back to fsync instead
621630 % % of fdatasync
622- ? INFO (" wal: preallocation may not be supported by the file system"
623- " falling back to fsync instead of fdatasync" , []),
631+ ? INFO (" WAL in ~ts : preallocation may not be supported by the file system"
632+ " falling back to fsync instead of fdatasync" ,
633+ [System ]),
624634 Conf # conf {pre_allocate = false }
625635 end ;
626636maybe_pre_allocate (Conf , _Fd , _Max0 ) ->
@@ -792,11 +802,12 @@ recover_records(#conf{names = Names} = Conf, Fd,
792802 recover_records (Conf , Fd , Rest , Cache ,
793803 State0 # recovery {writers = Writers });
794804 error ->
795- ? DEBUG (" WAL: record failed CRC check. If this is the last record"
796- " recovery can resume" , []),
805+ System = Conf # conf .system ,
806+ ? DEBUG (" WAL in ~ts : record failed CRC check. If this is the last record"
807+ " recovery can resume" , [System ]),
797808 % % if this is the last entry in the wal we can just drop the
798809 % % record;
799- ok = is_last_record (Fd , Rest ),
810+ ok = is_last_record (Fd , Rest , Conf ),
800811 State0
801812 end ;
802813 false ->
@@ -827,11 +838,12 @@ recover_records(#conf{names = Names} = Conf, Fd,
827838 ok ->
828839 recover_records (Conf , Fd , Rest , Cache , State0 );
829840 error ->
830- ? DEBUG (" WAL: record failed CRC check. If this is the last record"
831- " recovery can resume" , []),
841+ System = Conf # conf .system ,
842+ ? DEBUG (" WAL in ~ts : record failed CRC check. If this is the last record"
843+ " recovery can resume" , [System ]),
832844 % % if this is the last entry in the wal we can just drop the
833845 % % record;
834- ok = is_last_record (Fd , Rest ),
846+ ok = is_last_record (Fd , Rest , Conf ),
835847 State0
836848 end ;
837849 _ ->
@@ -860,20 +872,20 @@ recover_snap_idx(Conf, UId, Trunc, CurIdx) ->
860872 snap_idx (Conf , UId )
861873 end .
862874
863- is_last_record (_Fd , <<0 :104 , _ /binary >>) ->
875+ is_last_record (_Fd , <<0 :104 , _ /binary >>, _ ) ->
864876 ok ;
865- is_last_record (Fd , Rest ) ->
877+ is_last_record (Fd , Rest , Conf ) ->
866878 case byte_size (Rest ) < 13 of
867879 true ->
868880 case read_wal_chunk (Fd , 256 ) of
869881 <<>> ->
870882 ok ;
871883 Next ->
872- is_last_record (Fd , <<Rest /binary , Next /binary >>)
884+ is_last_record (Fd , <<Rest /binary , Next /binary >>, Conf )
873885 end ;
874886 false ->
875- ? ERROR (" WAL: record failed CRC check during recovery. "
876- " Unable to recover WAL data safely" , []),
887+ ? ERROR (" WAL in ~ts : record failed CRC check during recovery. "
888+ " Unable to recover WAL data safely" , [Conf # conf . system ]),
877889 throw (wal_checksum_validation_failure )
878890
879891 end .
0 commit comments