@@ -22,12 +22,15 @@ use chrono::DateTime;
22
22
use chrono:: Local ;
23
23
use hyperactor:: Actor ;
24
24
use hyperactor:: ActorRef ;
25
+ use hyperactor:: Bind ;
25
26
use hyperactor:: Context ;
26
27
use hyperactor:: HandleClient ;
27
28
use hyperactor:: Handler ;
28
29
use hyperactor:: Instance ;
29
30
use hyperactor:: Named ;
31
+ use hyperactor:: OncePortRef ;
30
32
use hyperactor:: RefClient ;
33
+ use hyperactor:: Unbind ;
31
34
use hyperactor:: channel;
32
35
use hyperactor:: channel:: ChannelAddr ;
33
36
use hyperactor:: channel:: ChannelRx ;
@@ -39,9 +42,6 @@ use hyperactor::channel::TxStatus;
39
42
use hyperactor:: clock:: Clock ;
40
43
use hyperactor:: clock:: RealClock ;
41
44
use hyperactor:: data:: Serialized ;
42
- use hyperactor:: message:: Bind ;
43
- use hyperactor:: message:: Bindings ;
44
- use hyperactor:: message:: Unbind ;
45
45
use hyperactor_telemetry:: env;
46
46
use hyperactor_telemetry:: log_file_path;
47
47
use serde:: Deserialize ;
@@ -235,6 +235,24 @@ impl fmt::Display for Aggregator {
235
235
}
236
236
}
237
237
238
+ /// Messages that can be sent to the LogClientActor remotely.
239
+ #[ derive(
240
+ Debug ,
241
+ Clone ,
242
+ Serialize ,
243
+ Deserialize ,
244
+ Named ,
245
+ Handler ,
246
+ HandleClient ,
247
+ RefClient ,
248
+ Bind ,
249
+ Unbind
250
+ ) ]
251
+ pub enum LogFlushMessage {
252
+ /// Flush the log
253
+ ForceSyncFlush { version : u64 } ,
254
+ }
255
+
238
256
/// Messages that can be sent to the LogClientActor remotely.
239
257
#[ derive(
240
258
Debug ,
@@ -260,7 +278,11 @@ pub enum LogMessage {
260
278
} ,
261
279
262
280
/// Flush the log
263
- Flush { } ,
281
+ Flush {
282
+ /// Indicate if the current flush is synced or non-synced.
283
+ /// If synced, a version number is available. Otherwise, none.
284
+ sync_version : Option < u64 > ,
285
+ } ,
264
286
}
265
287
266
288
/// Messages that can be sent to the LogClient locally.
@@ -279,6 +301,16 @@ pub enum LogClientMessage {
279
301
/// The time window in seconds to aggregate logs. If None, aggregation is disabled.
280
302
aggregate_window_sec : Option < u64 > ,
281
303
} ,
304
+
305
+ /// Synchronously flush all the logs from all the procs. This is for client to call.
306
+ StartSyncFlush {
307
+ /// Expect these many procs to ack the flush message.
308
+ expected_procs : usize ,
309
+ /// Return once we have received the acks from all the procs
310
+ reply : OncePortRef < ( ) > ,
311
+ /// Return to the caller the current flush version
312
+ version : OncePortRef < u64 > ,
313
+ } ,
282
314
}
283
315
284
316
/// Trait for sending logs
@@ -352,7 +384,7 @@ impl LogSender for LocalLogSender {
352
384
// send will make sure message is delivered
353
385
if TxStatus :: Active == * self . status . borrow ( ) {
354
386
// Do not use tx.send, it will block the allocator as the child process state is unknown.
355
- self . tx . post ( LogMessage :: Flush { } ) ;
387
+ self . tx . post ( LogMessage :: Flush { sync_version : None } ) ;
356
388
} else {
357
389
tracing:: debug!(
358
390
"log sender {} is not active, skip sending flush message" ,
@@ -558,7 +590,9 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
558
590
Named ,
559
591
Handler ,
560
592
HandleClient ,
561
- RefClient
593
+ RefClient ,
594
+ Bind ,
595
+ Unbind
562
596
) ]
563
597
pub enum LogForwardMessage {
564
598
/// Receive the log from the parent process and forward ti to the client.
@@ -568,18 +602,6 @@ pub enum LogForwardMessage {
568
602
SetMode { stream_to_client : bool } ,
569
603
}
570
604
571
- impl Bind for LogForwardMessage {
572
- fn bind ( & mut self , _bindings : & mut Bindings ) -> anyhow:: Result < ( ) > {
573
- Ok ( ( ) )
574
- }
575
- }
576
-
577
- impl Unbind for LogForwardMessage {
578
- fn unbind ( & self , _bindings : & mut Bindings ) -> anyhow:: Result < ( ) > {
579
- Ok ( ( ) )
580
- }
581
- }
582
-
583
605
/// A log forwarder that receives the log from its parent process and forward it back to the client
584
606
#[ derive( Debug ) ]
585
607
#[ hyperactor:: export(
@@ -647,17 +669,32 @@ impl Actor for LogForwardActor {
647
669
#[ hyperactor:: forward( LogForwardMessage ) ]
648
670
impl LogForwardMessageHandler for LogForwardActor {
649
671
async fn forward ( & mut self , ctx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
650
- if let Ok ( LogMessage :: Log {
651
- hostname,
652
- pid,
653
- output_target,
654
- payload,
655
- } ) = self . rx . recv ( ) . await
656
- {
657
- if self . stream_to_client {
658
- self . logging_client_ref
659
- . log ( ctx, hostname, pid, output_target, payload)
660
- . await ?;
672
+ match self . rx . recv ( ) . await {
673
+ Ok ( LogMessage :: Flush { sync_version } ) => {
674
+ match sync_version {
675
+ None => {
676
+ // no need to do anything. The previous messages have already been sent to the client.
677
+ // Client will flush based on its own frequency.
678
+ }
679
+ version => {
680
+ self . logging_client_ref . flush ( ctx, version) . await ?;
681
+ }
682
+ }
683
+ }
684
+ Ok ( LogMessage :: Log {
685
+ hostname,
686
+ pid,
687
+ output_target,
688
+ payload,
689
+ } ) => {
690
+ if self . stream_to_client {
691
+ self . logging_client_ref
692
+ . log ( ctx, hostname, pid, output_target, payload)
693
+ . await ?;
694
+ }
695
+ }
696
+ Err ( e) => {
697
+ return Err ( e. into ( ) ) ;
661
698
}
662
699
}
663
700
@@ -696,6 +733,60 @@ fn deserialize_message_lines(
696
733
anyhow:: bail!( "Failed to deserialize message as either String or Vec<u8>" )
697
734
}
698
735
736
+ /// An actor that send flush message to the log forwarder actor.
737
+ /// The reason we need an extra actor instead of reusing the log forwarder actor
738
+ /// is because the log forwarder can be blocked on the rx.recv() that listens on the new log lines.
739
+ /// Thus, we need to create anew channel as a tx to send the flush message to the log forwarder
740
+ /// So we do not get into a deadlock.
741
+ #[ derive( Debug ) ]
742
+ #[ hyperactor:: export(
743
+ spawn = true ,
744
+ handlers = [ LogFlushMessage { cast = true } ] ,
745
+ ) ]
746
+ pub struct LogFlushActor {
747
+ tx : ChannelTx < LogMessage > ,
748
+ }
749
+
750
+ #[ async_trait]
751
+ impl Actor for LogFlushActor {
752
+ type Params = ( ) ;
753
+
754
+ async fn new ( _: ( ) ) -> Result < Self , anyhow:: Error > {
755
+ let log_channel: ChannelAddr = match std:: env:: var ( BOOTSTRAP_LOG_CHANNEL ) {
756
+ Ok ( channel) => channel. parse ( ) ?,
757
+ Err ( err) => {
758
+ tracing:: debug!(
759
+ "log forwarder actor failed to read env var {}: {}" ,
760
+ BOOTSTRAP_LOG_CHANNEL ,
761
+ err
762
+ ) ;
763
+ // TODO: this should error out; it can only happen with local proc; we need to fix it.
764
+ ChannelAddr :: any ( ChannelTransport :: Unix )
765
+ }
766
+ } ;
767
+ let tx = channel:: dial :: < LogMessage > ( log_channel) ?;
768
+
769
+ Ok ( Self { tx } )
770
+ }
771
+ }
772
+
773
+ #[ async_trait]
774
+ #[ hyperactor:: forward( LogFlushMessage ) ]
775
+ impl LogFlushMessageHandler for LogFlushActor {
776
+ async fn force_sync_flush (
777
+ & mut self ,
778
+ _cx : & Context < Self > ,
779
+ version : u64 ,
780
+ ) -> Result < ( ) , anyhow:: Error > {
781
+ self . tx
782
+ . send ( LogMessage :: Flush {
783
+ sync_version : Some ( version) ,
784
+ } )
785
+ . await
786
+ . map_err ( anyhow:: Error :: from)
787
+ }
788
+ }
789
+
699
790
/// A client to receive logs from remote processes
700
791
#[ derive( Debug ) ]
701
792
#[ hyperactor:: export(
@@ -707,6 +798,11 @@ pub struct LogClientActor {
707
798
aggregators : HashMap < OutputTarget , Aggregator > ,
708
799
last_flush_time : SystemTime ,
709
800
next_flush_deadline : Option < SystemTime > ,
801
+
802
+ // For flush sync barrier
803
+ current_flush_version : u64 ,
804
+ current_flush_port : Option < OncePortRef < ( ) > > ,
805
+ current_unflushed_procs : usize ,
710
806
}
711
807
712
808
impl LogClientActor {
@@ -736,6 +832,12 @@ impl LogClientActor {
736
832
OutputTarget :: Stderr => eprintln ! ( "{}" , message) ,
737
833
}
738
834
}
835
+
836
+ fn flush_internal ( & mut self ) {
837
+ self . print_aggregators ( ) ;
838
+ self . last_flush_time = RealClock . system_time_now ( ) ;
839
+ self . next_flush_deadline = None ;
840
+ }
739
841
}
740
842
741
843
#[ async_trait]
@@ -754,6 +856,9 @@ impl Actor for LogClientActor {
754
856
aggregators,
755
857
last_flush_time : RealClock . system_time_now ( ) ,
756
858
next_flush_deadline : None ,
859
+ current_flush_version : 0 ,
860
+ current_flush_port : None ,
861
+ current_unflushed_procs : 0 ,
757
862
} )
758
863
}
759
864
}
@@ -805,20 +910,26 @@ impl LogMessageHandler for LogClientActor {
805
910
let new_deadline = self . last_flush_time + Duration :: from_secs ( window) ;
806
911
let now = RealClock . system_time_now ( ) ;
807
912
if new_deadline <= now {
808
- self . flush ( cx ) . await ? ;
913
+ self . flush_internal ( ) ;
809
914
} else {
810
915
let delay = new_deadline. duration_since ( now) ?;
811
916
match self . next_flush_deadline {
812
917
None => {
813
918
self . next_flush_deadline = Some ( new_deadline) ;
814
- cx. self_message_with_delay ( LogMessage :: Flush { } , delay) ?;
919
+ cx. self_message_with_delay (
920
+ LogMessage :: Flush { sync_version : None } ,
921
+ delay,
922
+ ) ?;
815
923
}
816
924
Some ( deadline) => {
817
925
// Some early log lines have alrady triggered the flush.
818
926
if new_deadline < deadline {
819
927
// This can happen if the user has adjusted the aggregation window.
820
928
self . next_flush_deadline = Some ( new_deadline) ;
821
- cx. self_message_with_delay ( LogMessage :: Flush { } , delay) ?;
929
+ cx. self_message_with_delay (
930
+ LogMessage :: Flush { sync_version : None } ,
931
+ delay,
932
+ ) ?;
822
933
}
823
934
}
824
935
}
@@ -829,10 +940,45 @@ impl LogMessageHandler for LogClientActor {
829
940
Ok ( ( ) )
830
941
}
831
942
832
- async fn flush ( & mut self , _cx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
833
- self . print_aggregators ( ) ;
834
- self . last_flush_time = RealClock . system_time_now ( ) ;
835
- self . next_flush_deadline = None ;
943
+ async fn flush (
944
+ & mut self ,
945
+ cx : & Context < Self > ,
946
+ sync_version : Option < u64 > ,
947
+ ) -> Result < ( ) , anyhow:: Error > {
948
+ match sync_version {
949
+ None => {
950
+ self . flush_internal ( ) ;
951
+ }
952
+ Some ( version) => {
953
+ if version != self . current_flush_version {
954
+ tracing:: error!(
955
+ "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully" ,
956
+ version,
957
+ self . current_flush_version
958
+ ) ;
959
+ return Ok ( ( ) ) ;
960
+ }
961
+
962
+ if self . current_unflushed_procs == 0 || self . current_flush_port . is_none ( ) {
963
+ // This is a serious issue; it's better to error out.
964
+ anyhow:: bail!( "found no ongoing flush request" ) ;
965
+ }
966
+ self . current_unflushed_procs -= 1 ;
967
+
968
+ tracing:: debug!(
969
+ "ack sync flush: version {}; remaining procs: {}" ,
970
+ self . current_flush_version,
971
+ self . current_unflushed_procs
972
+ ) ;
973
+
974
+ if self . current_unflushed_procs == 0 {
975
+ self . flush_internal ( ) ;
976
+ let reply = self . current_flush_port . take ( ) . unwrap ( ) ;
977
+ self . current_flush_port = None ;
978
+ reply. send ( cx, ( ) ) . map_err ( anyhow:: Error :: from) ?;
979
+ }
980
+ }
981
+ }
836
982
837
983
Ok ( ( ) )
838
984
}
@@ -853,6 +999,34 @@ impl LogClientMessageHandler for LogClientActor {
853
999
self . aggregate_window_sec = aggregate_window_sec;
854
1000
Ok ( ( ) )
855
1001
}
1002
+
1003
+ async fn start_sync_flush (
1004
+ & mut self ,
1005
+ cx : & Context < Self > ,
1006
+ expected_procs_flushed : usize ,
1007
+ reply : OncePortRef < ( ) > ,
1008
+ version : OncePortRef < u64 > ,
1009
+ ) -> Result < ( ) , anyhow:: Error > {
1010
+ if self . current_unflushed_procs > 0 || self . current_flush_port . is_some ( ) {
1011
+ tracing:: warn!(
1012
+ "found unfinished ongoing flush: version {}; {} unflushed procs" ,
1013
+ self . current_flush_version,
1014
+ self . current_unflushed_procs,
1015
+ ) ;
1016
+ }
1017
+
1018
+ self . current_flush_version += 1 ;
1019
+ tracing:: debug!(
1020
+ "start sync flush with version {}" ,
1021
+ self . current_flush_version
1022
+ ) ;
1023
+ self . current_flush_port = Some ( reply. clone ( ) ) ;
1024
+ self . current_unflushed_procs = expected_procs_flushed;
1025
+ version
1026
+ . send ( cx, self . current_flush_version )
1027
+ . map_err ( anyhow:: Error :: from) ?;
1028
+ Ok ( ( ) )
1029
+ }
856
1030
}
857
1031
858
1032
#[ cfg( test) ]
0 commit comments