4646 rotation_size :: non_neg_integer (),
4747 rotation_num_files :: pos_integer (),
4848 rotation_compress :: boolean (),
49- rotation_check_interval :: non_neg_integer ()
49+ rotation_check_interval :: non_neg_integer (),
50+ encr_state :: any ()
5051 }).
5152
5253start_link (Name , Path ) ->
@@ -96,17 +97,22 @@ init([Name, Path, Opts]) ->
9697
9798 {ok , State }.
9899
99- handle_call (sync , From , # state {worker = Worker } = State ) ->
100- NewState = flush_buffer (State ),
101-
100+ do_work (Worker , Call , From , Timeout ) ->
102101 Parent = self (),
103102 proc_lib :spawn_link (
104103 fun () ->
105- gen_server :reply (From , gen_server :call (Worker , sync , infinity )),
104+ gen_server :reply (From , gen_server :call (Worker , Call , Timeout )),
106105 erlang :unlink (Parent )
107- end ),
106+ end ).
108107
108+ handle_call (sync , From , # state {worker = Worker } = State ) ->
109+ NewState = flush_buffer (State ),
110+ do_work (Worker , sync , From , infinity ),
109111 {noreply , NewState };
112+ handle_call (notify_active_key_updt , From ,
113+ # state {worker = Worker } = State ) ->
114+ do_work (Worker , notify_active_key_updt , From , infinity ),
115+ {noreply , State };
110116handle_call (Request , _From , State ) ->
111117 {stop , {unexpected_call , Request }, State }.
112118
@@ -287,16 +293,18 @@ do_rotate_file(From0, To0, Compress) ->
287293 Error
288294 end .
289295
290- maybe_rotate_files (# worker_state {sink_name = Name ,
291- file_size = FileSize ,
292- rotation_size = RotSize } = State )
293- when RotSize =/= 0 , FileSize >= RotSize ->
296+ do_rotate_files (# worker_state {sink_name = Name } = State ) ->
294297 time_stat (Name , rotation_time ,
295298 fun () ->
296299 ok = rotate_files (State ),
297300 ok = maybe_compress_post_rotate (State ),
298301 open_log_file (State )
299- end );
302+ end ).
303+
304+ maybe_rotate_files (# worker_state {file_size = FileSize ,
305+ rotation_size = RotSize } = State )
306+ when RotSize =/= 0 , FileSize >= RotSize ->
307+ do_rotate_files (State );
300308maybe_rotate_files (State ) ->
301309 State .
302310
@@ -428,16 +436,21 @@ spawn_worker(WorkerState) ->
428436 worker_init (WorkerState )
429437 end ).
430438
431- worker_init (# worker_state {rotation_check_interval = RotCheckInterval } = State0 ) ->
439+ worker_init (# worker_state {rotation_check_interval = RotCheckInterval ,
440+ path = Path } = State0 ) ->
432441 case RotCheckInterval > 0 of
433442 true ->
434443 erlang :send_after (RotCheckInterval , self (), check_file );
435444 false ->
436445 ok
437446 end ,
438- worker_loop (open_log_file (State0 )).
439447
440- worker_loop (State ) ->
448+ DS = ale :create_no_deks_snapshot (),
449+ {<<>>, EncrState } = ale :file_encrypt_init (filename :basename (Path ), DS ),
450+ worker_loop (
451+ open_log_file (State0 # worker_state {encr_state = EncrState })).
452+
453+ worker_loop (# worker_state {sink_name = SinkName } = State ) ->
441454 NewState =
442455 receive
443456 {write , Data0 , DataSize0 } ->
@@ -450,6 +463,13 @@ worker_loop(State) ->
450463 {'$gen_call' , From , sync } ->
451464 gen_server :reply (From , ok ),
452465 State ;
466+ {'$gen_call' , From , notify_active_key_updt } ->
467+ # worker_state {encr_state = EncrState } = State ,
468+ DS = ale :get_sink_ds (SinkName ),
469+ UpdtReq = not ale :file_encrypt_state_match (DS , EncrState ),
470+ UpdatedState = process_key_update_work (UpdtReq , DS , State ),
471+ gen_server :reply (From , ok ),
472+ UpdatedState ;
453473 Msg ->
454474 exit ({unexpected_msg , Msg })
455475 end ,
@@ -466,22 +486,46 @@ receive_more_writes(Data, DataSize) ->
466486 {Data , DataSize }
467487 end .
468488
469- write_data (Data , DataSize ,
489+ maybe_encrypt_data (InputData , EncrState ) ->
490+ ale :file_encrypt_chunk (InputData , EncrState ).
491+
492+ write_data (InputData , InputDataSize ,
470493 # worker_state {sink_name = Name ,
471494 file = File ,
472495 file_size = FileSize ,
473- parent = Parent } = State ) ->
474- broadcast_stat (Name , write_size , DataSize ),
475-
496+ parent = Parent ,
497+ encr_state = EncrState } = State ) ->
498+ {WriteData , NewEncrState } = maybe_encrypt_data (InputData , EncrState ),
499+ WriteDataSize = byte_size (WriteData ),
500+ broadcast_stat (Name , write_size , WriteDataSize ),
476501 time_stat (Name , write_time ,
477502 fun () ->
478- ok = file :write (File , Data )
503+ ok = file :write (File , WriteData )
479504 end ),
480505
481- Parent ! {written , DataSize },
482- NewState = State # worker_state {file_size = FileSize + DataSize },
506+ Parent ! {written , InputDataSize },
507+ NewState = State # worker_state {file_size = FileSize + WriteDataSize ,
508+ encr_state = NewEncrState },
483509 maybe_rotate_files (NewState ).
484510
511+ process_key_update_work (false = _UpdtReq , _DS , State ) ->
512+ State ;
513+ process_key_update_work (true = _UpdtReq , DS ,
514+ # worker_state {sink_name = Name ,
515+ path = Path } = State ) ->
516+ {Header , EncryptState } =
517+ ale :file_encrypt_init (filename :basename (Path ), DS ),
518+ NewState = do_rotate_files (State ),
519+ # worker_state {file = File ,
520+ file_size = 0 ,
521+ path = Path } = NewState ,
522+ time_stat (Name , write_time ,
523+ fun () ->
524+ ok = file :write (File , Header )
525+ end ),
526+ NewState # worker_state {file_size = byte_size (Header ),
527+ encr_state = EncryptState }.
528+
485529remove_unnecessary_log_files (LogFilePath , NumFiles ) ->
486530 Dir = filename :dirname (LogFilePath ),
487531 Name = filename :basename (LogFilePath ),
0 commit comments