@@ -22,12 +22,15 @@ use chrono::DateTime;
22
22
use chrono:: Local ;
23
23
use hyperactor:: Actor ;
24
24
use hyperactor:: ActorRef ;
25
+ use hyperactor:: Bind ;
25
26
use hyperactor:: Context ;
26
27
use hyperactor:: HandleClient ;
27
28
use hyperactor:: Handler ;
28
29
use hyperactor:: Instance ;
29
30
use hyperactor:: Named ;
31
+ use hyperactor:: OncePortRef ;
30
32
use hyperactor:: RefClient ;
33
+ use hyperactor:: Unbind ;
31
34
use hyperactor:: channel;
32
35
use hyperactor:: channel:: ChannelAddr ;
33
36
use hyperactor:: channel:: ChannelRx ;
@@ -39,9 +42,6 @@ use hyperactor::channel::TxStatus;
39
42
use hyperactor:: clock:: Clock ;
40
43
use hyperactor:: clock:: RealClock ;
41
44
use hyperactor:: data:: Serialized ;
42
- use hyperactor:: message:: Bind ;
43
- use hyperactor:: message:: Bindings ;
44
- use hyperactor:: message:: Unbind ;
45
45
use hyperactor_telemetry:: env;
46
46
use hyperactor_telemetry:: log_file_path;
47
47
use serde:: Deserialize ;
@@ -235,6 +235,24 @@ impl fmt::Display for Aggregator {
235
235
}
236
236
}
237
237
238
+ /// Messages that can be sent to the LogClientActor remotely.
239
+ #[ derive(
240
+ Debug ,
241
+ Clone ,
242
+ Serialize ,
243
+ Deserialize ,
244
+ Named ,
245
+ Handler ,
246
+ HandleClient ,
247
+ RefClient ,
248
+ Bind ,
249
+ Unbind
250
+ ) ]
251
+ pub enum LogFlushMessage {
252
+ /// Flush the log
253
+ ForceSyncFlush { version : u64 } ,
254
+ }
255
+
238
256
/// Messages that can be sent to the LogClientActor remotely.
239
257
#[ derive(
240
258
Debug ,
@@ -260,7 +278,11 @@ pub enum LogMessage {
260
278
} ,
261
279
262
280
/// Flush the log
263
- Flush { } ,
281
+ Flush {
282
+ /// Indicate if the current flush is synced or non-synced.
283
+ /// If synced, a version number is available. Otherwise, none.
284
+ sync_version : Option < u64 > ,
285
+ } ,
264
286
}
265
287
266
288
/// Messages that can be sent to the LogClient locally.
@@ -279,6 +301,16 @@ pub enum LogClientMessage {
279
301
/// The time window in seconds to aggregate logs. If None, aggregation is disabled.
280
302
aggregate_window_sec : Option < u64 > ,
281
303
} ,
304
+
305
+ /// Synchronously flush all the logs from all the procs. This is for client to call.
306
+ StartSyncFlush {
307
+ /// Expect these many procs to ack the flush message.
308
+ expected_procs : usize ,
309
+ /// Return once we have received the acks from all the procs
310
+ reply : OncePortRef < ( ) > ,
311
+ /// Return to the caller the current flush version
312
+ version : OncePortRef < u64 > ,
313
+ } ,
282
314
}
283
315
284
316
/// Trait for sending logs
@@ -352,7 +384,7 @@ impl LogSender for LocalLogSender {
352
384
// send will make sure message is delivered
353
385
if TxStatus :: Active == * self . status . borrow ( ) {
354
386
// Do not use tx.send, it will block the allocator as the child process state is unknown.
355
- self . tx . post ( LogMessage :: Flush { } ) ;
387
+ self . tx . post ( LogMessage :: Flush { sync_version : None } ) ;
356
388
} else {
357
389
tracing:: debug!(
358
390
"log sender {} is not active, skip sending flush message" ,
@@ -547,7 +579,9 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
547
579
Named ,
548
580
Handler ,
549
581
HandleClient ,
550
- RefClient
582
+ RefClient ,
583
+ Bind ,
584
+ Unbind
551
585
) ]
552
586
pub enum LogForwardMessage {
553
587
/// Receive the log from the parent process and forward ti to the client.
@@ -557,18 +591,6 @@ pub enum LogForwardMessage {
557
591
SetMode { stream_to_client : bool } ,
558
592
}
559
593
560
- impl Bind for LogForwardMessage {
561
- fn bind ( & mut self , _bindings : & mut Bindings ) -> anyhow:: Result < ( ) > {
562
- Ok ( ( ) )
563
- }
564
- }
565
-
566
- impl Unbind for LogForwardMessage {
567
- fn unbind ( & self , _bindings : & mut Bindings ) -> anyhow:: Result < ( ) > {
568
- Ok ( ( ) )
569
- }
570
- }
571
-
572
594
/// A log forwarder that receives the log from its parent process and forward it back to the client
573
595
#[ derive( Debug ) ]
574
596
#[ hyperactor:: export(
@@ -636,17 +658,32 @@ impl Actor for LogForwardActor {
636
658
#[ hyperactor:: forward( LogForwardMessage ) ]
637
659
impl LogForwardMessageHandler for LogForwardActor {
638
660
async fn forward ( & mut self , ctx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
639
- if let Ok ( LogMessage :: Log {
640
- hostname,
641
- pid,
642
- output_target,
643
- payload,
644
- } ) = self . rx . recv ( ) . await
645
- {
646
- if self . stream_to_client {
647
- self . logging_client_ref
648
- . log ( ctx, hostname, pid, output_target, payload)
649
- . await ?;
661
+ match self . rx . recv ( ) . await {
662
+ Ok ( LogMessage :: Flush { sync_version } ) => {
663
+ match sync_version {
664
+ None => {
665
+ // no need to do anything. The previous messages have already been sent to the client.
666
+ // Client will flush based on its own frequency.
667
+ }
668
+ version => {
669
+ self . logging_client_ref . flush ( ctx, version) . await ?;
670
+ }
671
+ }
672
+ }
673
+ Ok ( LogMessage :: Log {
674
+ hostname,
675
+ pid,
676
+ output_target,
677
+ payload,
678
+ } ) => {
679
+ if self . stream_to_client {
680
+ self . logging_client_ref
681
+ . log ( ctx, hostname, pid, output_target, payload)
682
+ . await ?;
683
+ }
684
+ }
685
+ Err ( e) => {
686
+ return Err ( e. into ( ) ) ;
650
687
}
651
688
}
652
689
@@ -685,6 +722,60 @@ fn deserialize_message_lines(
685
722
anyhow:: bail!( "Failed to deserialize message as either String or Vec<u8>" )
686
723
}
687
724
725
+ /// An actor that send flush message to the log forwarder actor.
726
+ /// The reason we need an extra actor instead of reusing the log forwarder actor
727
+ /// is because the log forwarder can be blocked on the rx.recv() that listens on the new log lines.
728
+ /// Thus, we need to create anew channel as a tx to send the flush message to the log forwarder
729
+ /// So we do not get into a deadlock.
730
+ #[ derive( Debug ) ]
731
+ #[ hyperactor:: export(
732
+ spawn = true ,
733
+ handlers = [ LogFlushMessage { cast = true } ] ,
734
+ ) ]
735
+ pub struct LogFlushActor {
736
+ tx : ChannelTx < LogMessage > ,
737
+ }
738
+
739
+ #[ async_trait]
740
+ impl Actor for LogFlushActor {
741
+ type Params = ( ) ;
742
+
743
+ async fn new ( _: ( ) ) -> Result < Self , anyhow:: Error > {
744
+ let log_channel: ChannelAddr = match std:: env:: var ( BOOTSTRAP_LOG_CHANNEL ) {
745
+ Ok ( channel) => channel. parse ( ) ?,
746
+ Err ( err) => {
747
+ tracing:: debug!(
748
+ "log forwarder actor failed to read env var {}: {}" ,
749
+ BOOTSTRAP_LOG_CHANNEL ,
750
+ err
751
+ ) ;
752
+ // TODO: this should error out; it can only happen with local proc; we need to fix it.
753
+ ChannelAddr :: any ( ChannelTransport :: Unix )
754
+ }
755
+ } ;
756
+ let tx = channel:: dial :: < LogMessage > ( log_channel) ?;
757
+
758
+ Ok ( Self { tx } )
759
+ }
760
+ }
761
+
762
+ #[ async_trait]
763
+ #[ hyperactor:: forward( LogFlushMessage ) ]
764
+ impl LogFlushMessageHandler for LogFlushActor {
765
+ async fn force_sync_flush (
766
+ & mut self ,
767
+ _cx : & Context < Self > ,
768
+ version : u64 ,
769
+ ) -> Result < ( ) , anyhow:: Error > {
770
+ self . tx
771
+ . send ( LogMessage :: Flush {
772
+ sync_version : Some ( version) ,
773
+ } )
774
+ . await
775
+ . map_err ( anyhow:: Error :: from)
776
+ }
777
+ }
778
+
688
779
/// A client to receive logs from remote processes
689
780
#[ derive( Debug ) ]
690
781
#[ hyperactor:: export(
@@ -696,6 +787,11 @@ pub struct LogClientActor {
696
787
aggregators : HashMap < OutputTarget , Aggregator > ,
697
788
last_flush_time : SystemTime ,
698
789
next_flush_deadline : Option < SystemTime > ,
790
+
791
+ // For flush sync barrier
792
+ current_flush_version : u64 ,
793
+ current_flush_port : Option < OncePortRef < ( ) > > ,
794
+ current_unflushed_procs : usize ,
699
795
}
700
796
701
797
impl LogClientActor {
@@ -725,6 +821,12 @@ impl LogClientActor {
725
821
OutputTarget :: Stderr => eprintln ! ( "{}" , message) ,
726
822
}
727
823
}
824
+
825
+ fn flush_internal ( & mut self ) {
826
+ self . print_aggregators ( ) ;
827
+ self . last_flush_time = RealClock . system_time_now ( ) ;
828
+ self . next_flush_deadline = None ;
829
+ }
728
830
}
729
831
730
832
#[ async_trait]
@@ -743,6 +845,9 @@ impl Actor for LogClientActor {
743
845
aggregators,
744
846
last_flush_time : RealClock . system_time_now ( ) ,
745
847
next_flush_deadline : None ,
848
+ current_flush_version : 0 ,
849
+ current_flush_port : None ,
850
+ current_unflushed_procs : 0 ,
746
851
} )
747
852
}
748
853
}
@@ -794,20 +899,26 @@ impl LogMessageHandler for LogClientActor {
794
899
let new_deadline = self . last_flush_time + Duration :: from_secs ( window) ;
795
900
let now = RealClock . system_time_now ( ) ;
796
901
if new_deadline <= now {
797
- self . flush ( cx ) . await ? ;
902
+ self . flush_internal ( ) ;
798
903
} else {
799
904
let delay = new_deadline. duration_since ( now) ?;
800
905
match self . next_flush_deadline {
801
906
None => {
802
907
self . next_flush_deadline = Some ( new_deadline) ;
803
- cx. self_message_with_delay ( LogMessage :: Flush { } , delay) ?;
908
+ cx. self_message_with_delay (
909
+ LogMessage :: Flush { sync_version : None } ,
910
+ delay,
911
+ ) ?;
804
912
}
805
913
Some ( deadline) => {
806
914
// Some early log lines have alrady triggered the flush.
807
915
if new_deadline < deadline {
808
916
// This can happen if the user has adjusted the aggregation window.
809
917
self . next_flush_deadline = Some ( new_deadline) ;
810
- cx. self_message_with_delay ( LogMessage :: Flush { } , delay) ?;
918
+ cx. self_message_with_delay (
919
+ LogMessage :: Flush { sync_version : None } ,
920
+ delay,
921
+ ) ?;
811
922
}
812
923
}
813
924
}
@@ -818,10 +929,38 @@ impl LogMessageHandler for LogClientActor {
818
929
Ok ( ( ) )
819
930
}
820
931
821
- async fn flush ( & mut self , _cx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
822
- self . print_aggregators ( ) ;
823
- self . last_flush_time = RealClock . system_time_now ( ) ;
824
- self . next_flush_deadline = None ;
932
+ async fn flush (
933
+ & mut self ,
934
+ cx : & Context < Self > ,
935
+ sync_version : Option < u64 > ,
936
+ ) -> Result < ( ) , anyhow:: Error > {
937
+ match sync_version {
938
+ None => {
939
+ self . flush_internal ( ) ;
940
+ }
941
+ Some ( version) => {
942
+ if version != self . current_flush_version {
943
+ tracing:: error!(
944
+ "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully" ,
945
+ version,
946
+ self . current_flush_version
947
+ ) ;
948
+ return Ok ( ( ) ) ;
949
+ }
950
+
951
+ if self . current_unflushed_procs == 0 || self . current_flush_port . is_none ( ) {
952
+ // This is a serious issue; it's better to error out.
953
+ anyhow:: bail!( "found no ongoing flush request" ) ;
954
+ }
955
+ self . current_unflushed_procs -= 1 ;
956
+ if self . current_unflushed_procs == 0 {
957
+ self . flush_internal ( ) ;
958
+ let reply = self . current_flush_port . take ( ) . unwrap ( ) ;
959
+ self . current_flush_port = None ;
960
+ reply. send ( cx, ( ) ) . map_err ( anyhow:: Error :: from) ?;
961
+ }
962
+ }
963
+ }
825
964
826
965
Ok ( ( ) )
827
966
}
@@ -842,6 +981,30 @@ impl LogClientMessageHandler for LogClientActor {
842
981
self . aggregate_window_sec = aggregate_window_sec;
843
982
Ok ( ( ) )
844
983
}
984
+
985
+ async fn start_sync_flush (
986
+ & mut self ,
987
+ cx : & Context < Self > ,
988
+ expected_procs_flushed : usize ,
989
+ reply : OncePortRef < ( ) > ,
990
+ version : OncePortRef < u64 > ,
991
+ ) -> Result < ( ) , anyhow:: Error > {
992
+ if self . current_unflushed_procs > 0 || self . current_flush_port . is_some ( ) {
993
+ tracing:: error!(
994
+ "found unfinished ongoing flush: version {}; {} unflushed procs" ,
995
+ self . current_flush_version,
996
+ self . current_unflushed_procs,
997
+ ) ;
998
+ }
999
+
1000
+ self . current_flush_version += 1 ;
1001
+ self . current_flush_port = Some ( reply. clone ( ) ) ;
1002
+ self . current_unflushed_procs = expected_procs_flushed;
1003
+ version
1004
+ . send ( cx, self . current_flush_version )
1005
+ . map_err ( anyhow:: Error :: from) ?;
1006
+ Ok ( ( ) )
1007
+ }
845
1008
}
846
1009
847
1010
#[ cfg( test) ]
0 commit comments