@@ -22,12 +22,15 @@ use chrono::DateTime;
22
22
use chrono:: Local ;
23
23
use hyperactor:: Actor ;
24
24
use hyperactor:: ActorRef ;
25
+ use hyperactor:: Bind ;
25
26
use hyperactor:: Context ;
26
27
use hyperactor:: HandleClient ;
27
28
use hyperactor:: Handler ;
28
29
use hyperactor:: Instance ;
29
30
use hyperactor:: Named ;
31
+ use hyperactor:: OncePortRef ;
30
32
use hyperactor:: RefClient ;
33
+ use hyperactor:: Unbind ;
31
34
use hyperactor:: channel;
32
35
use hyperactor:: channel:: ChannelAddr ;
33
36
use hyperactor:: channel:: ChannelRx ;
@@ -39,9 +42,6 @@ use hyperactor::channel::TxStatus;
39
42
use hyperactor:: clock:: Clock ;
40
43
use hyperactor:: clock:: RealClock ;
41
44
use hyperactor:: data:: Serialized ;
42
- use hyperactor:: message:: Bind ;
43
- use hyperactor:: message:: Bindings ;
44
- use hyperactor:: message:: Unbind ;
45
45
use hyperactor_telemetry:: env;
46
46
use hyperactor_telemetry:: log_file_path;
47
47
use serde:: Deserialize ;
@@ -235,6 +235,24 @@ impl fmt::Display for Aggregator {
235
235
}
236
236
}
237
237
238
+ /// Messages that can be sent to the LogClientActor remotely.
239
+ #[ derive(
240
+ Debug ,
241
+ Clone ,
242
+ Serialize ,
243
+ Deserialize ,
244
+ Named ,
245
+ Handler ,
246
+ HandleClient ,
247
+ RefClient ,
248
+ Bind ,
249
+ Unbind
250
+ ) ]
251
+ pub enum LogFlushMessage {
252
+ /// Flush the log
253
+ ForceSyncFlush { } ,
254
+ }
255
+
238
256
/// Messages that can be sent to the LogClientActor remotely.
239
257
#[ derive(
240
258
Debug ,
@@ -260,7 +278,10 @@ pub enum LogMessage {
260
278
} ,
261
279
262
280
/// Flush the log
263
- Flush { } ,
281
+ Flush {
282
+ /// If true, force a flush sync barrier across all procs
283
+ synced : bool ,
284
+ } ,
264
285
}
265
286
266
287
/// Messages that can be sent to the LogClient locally.
@@ -279,6 +300,14 @@ pub enum LogClientMessage {
279
300
/// The time window in seconds to aggregate logs. If None, aggregation is disabled.
280
301
aggregate_window_sec : Option < u64 > ,
281
302
} ,
303
+
304
+ /// Synchronously flush all the logs from all the procs. This is for client to call.
305
+ StartSyncFlush {
306
+ /// Expect these many procs to ack the flush message.
307
+ expected_procs : usize ,
308
+ /// Return once we have received the acks from all the procs
309
+ reply : OncePortRef < ( ) > ,
310
+ } ,
282
311
}
283
312
284
313
/// Trait for sending logs
@@ -351,7 +380,9 @@ impl LogSender for LocalLogSender {
351
380
async fn flush ( & mut self ) -> anyhow:: Result < ( ) > {
352
381
// send will make sure message is delivered
353
382
if TxStatus :: Active == * self . status . borrow ( ) {
354
- match self . tx . send ( LogMessage :: Flush { } ) . await {
383
+ // this is just to make sure the log line is sent to the other side of the channel.
384
+ // it is up to the forwarder to decide when to flush the log.
385
+ match self . tx . send ( LogMessage :: Flush { synced : false } ) . await {
355
386
Ok ( ( ) ) => Ok ( ( ) ) ,
356
387
Err ( e) => {
357
388
tracing:: error!( "log sender {} error sending flush message: {}" , self . pid, e) ;
@@ -570,7 +601,9 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
570
601
Named ,
571
602
Handler ,
572
603
HandleClient ,
573
- RefClient
604
+ RefClient ,
605
+ Bind ,
606
+ Unbind
574
607
) ]
575
608
pub enum LogForwardMessage {
576
609
/// Receive the log from the parent process and forward ti to the client.
@@ -580,18 +613,6 @@ pub enum LogForwardMessage {
580
613
SetMode { stream_to_client : bool } ,
581
614
}
582
615
583
- impl Bind for LogForwardMessage {
584
- fn bind ( & mut self , _bindings : & mut Bindings ) -> anyhow:: Result < ( ) > {
585
- Ok ( ( ) )
586
- }
587
- }
588
-
589
- impl Unbind for LogForwardMessage {
590
- fn unbind ( & self , _bindings : & mut Bindings ) -> anyhow:: Result < ( ) > {
591
- Ok ( ( ) )
592
- }
593
- }
594
-
595
616
/// A log forwarder that receives the log from its parent process and forward it back to the client
596
617
#[ derive( Debug ) ]
597
618
#[ hyperactor:: export(
@@ -659,17 +680,28 @@ impl Actor for LogForwardActor {
659
680
#[ hyperactor:: forward( LogForwardMessage ) ]
660
681
impl LogForwardMessageHandler for LogForwardActor {
661
682
async fn forward ( & mut self , ctx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
662
- if let Ok ( LogMessage :: Log {
663
- hostname,
664
- pid,
665
- output_target,
666
- payload,
667
- } ) = self . rx . recv ( ) . await
668
- {
669
- if self . stream_to_client {
670
- self . logging_client_ref
671
- . log ( ctx, hostname, pid, output_target, payload)
672
- . await ?;
683
+ match self . rx . recv ( ) . await {
684
+ Ok ( LogMessage :: Flush { synced } ) => {
685
+ if synced {
686
+ self . logging_client_ref . flush ( ctx, true ) . await ?;
687
+ } else {
688
+ // no need to do anything. The previous messages have already been sent to the client.
689
+ }
690
+ }
691
+ Ok ( LogMessage :: Log {
692
+ hostname,
693
+ pid,
694
+ output_target,
695
+ payload,
696
+ } ) => {
697
+ if self . stream_to_client {
698
+ self . logging_client_ref
699
+ . log ( ctx, hostname, pid, output_target, payload)
700
+ . await ?;
701
+ }
702
+ }
703
+ Err ( e) => {
704
+ return Err ( e. into ( ) ) ;
673
705
}
674
706
}
675
707
@@ -708,6 +740,54 @@ fn deserialize_message_lines(
708
740
anyhow:: bail!( "Failed to deserialize message as either String or Vec<u8>" )
709
741
}
710
742
743
+ /// An actor that send flush message to the log forwarder actor.
744
+ /// The reason we need an extra actor instead of reusing the log forwarder actor
745
+ /// is because the log forwarder can be blocked on the rx.recv() that listens on the new log lines.
746
+ /// Thus, we need to create anew channel as a tx to send the flush message to the log forwarder
747
+ /// So we do not get into a deadlock.
748
+ #[ derive( Debug ) ]
749
+ #[ hyperactor:: export(
750
+ spawn = true ,
751
+ handlers = [ LogFlushMessage { cast = true } ] ,
752
+ ) ]
753
+ pub struct LogFlushActor {
754
+ tx : ChannelTx < LogMessage > ,
755
+ }
756
+
757
+ #[ async_trait]
758
+ impl Actor for LogFlushActor {
759
+ type Params = ( ) ;
760
+
761
+ async fn new ( _: ( ) ) -> Result < Self , anyhow:: Error > {
762
+ let log_channel: ChannelAddr = match std:: env:: var ( BOOTSTRAP_LOG_CHANNEL ) {
763
+ Ok ( channel) => channel. parse ( ) ?,
764
+ Err ( err) => {
765
+ tracing:: debug!(
766
+ "log forwarder actor failed to read env var {}: {}" ,
767
+ BOOTSTRAP_LOG_CHANNEL ,
768
+ err
769
+ ) ;
770
+ // TODO: this should error out; it can only happen with local proc; we need to fix it.
771
+ ChannelAddr :: any ( ChannelTransport :: Unix )
772
+ }
773
+ } ;
774
+ let tx = channel:: dial :: < LogMessage > ( log_channel) ?;
775
+
776
+ Ok ( Self { tx } )
777
+ }
778
+ }
779
+
780
+ #[ async_trait]
781
+ #[ hyperactor:: forward( LogFlushMessage ) ]
782
+ impl LogFlushMessageHandler for LogFlushActor {
783
+ async fn force_sync_flush ( & mut self , _cx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
784
+ self . tx
785
+ . send ( LogMessage :: Flush { synced : true } )
786
+ . await
787
+ . map_err ( anyhow:: Error :: from)
788
+ }
789
+ }
790
+
711
791
/// A client to receive logs from remote processes
712
792
#[ derive( Debug ) ]
713
793
#[ hyperactor:: export(
@@ -719,6 +799,8 @@ pub struct LogClientActor {
719
799
aggregators : HashMap < OutputTarget , Aggregator > ,
720
800
last_flush_time : SystemTime ,
721
801
next_flush_deadline : Option < SystemTime > ,
802
+ ongoing_flush_port : Option < OncePortRef < ( ) > > ,
803
+ unflushed_procs : usize ,
722
804
}
723
805
724
806
impl LogClientActor {
@@ -748,6 +830,12 @@ impl LogClientActor {
748
830
OutputTarget :: Stderr => eprintln ! ( "{}" , message) ,
749
831
}
750
832
}
833
+
834
+ fn flush_internal ( & mut self ) {
835
+ self . print_aggregators ( ) ;
836
+ self . last_flush_time = RealClock . system_time_now ( ) ;
837
+ self . next_flush_deadline = None ;
838
+ }
751
839
}
752
840
753
841
#[ async_trait]
@@ -766,6 +854,8 @@ impl Actor for LogClientActor {
766
854
aggregators,
767
855
last_flush_time : RealClock . system_time_now ( ) ,
768
856
next_flush_deadline : None ,
857
+ ongoing_flush_port : None ,
858
+ unflushed_procs : 0 ,
769
859
} )
770
860
}
771
861
}
@@ -817,20 +907,23 @@ impl LogMessageHandler for LogClientActor {
817
907
let new_deadline = self . last_flush_time + Duration :: from_secs ( window) ;
818
908
let now = RealClock . system_time_now ( ) ;
819
909
if new_deadline <= now {
820
- self . flush ( cx ) . await ? ;
910
+ self . flush_internal ( ) ;
821
911
} else {
822
912
let delay = new_deadline. duration_since ( now) ?;
823
913
match self . next_flush_deadline {
824
914
None => {
825
915
self . next_flush_deadline = Some ( new_deadline) ;
826
- cx. self_message_with_delay ( LogMessage :: Flush { } , delay) ?;
916
+ cx. self_message_with_delay ( LogMessage :: Flush { synced : false } , delay) ?;
827
917
}
828
918
Some ( deadline) => {
829
919
// Some early log lines have alrady triggered the flush.
830
920
if new_deadline < deadline {
831
921
// This can happen if the user has adjusted the aggregation window.
832
922
self . next_flush_deadline = Some ( new_deadline) ;
833
- cx. self_message_with_delay ( LogMessage :: Flush { } , delay) ?;
923
+ cx. self_message_with_delay (
924
+ LogMessage :: Flush { synced : false } ,
925
+ delay,
926
+ ) ?;
834
927
}
835
928
}
836
929
}
@@ -841,10 +934,21 @@ impl LogMessageHandler for LogClientActor {
841
934
Ok ( ( ) )
842
935
}
843
936
844
- async fn flush ( & mut self , _cx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
845
- self . print_aggregators ( ) ;
846
- self . last_flush_time = RealClock . system_time_now ( ) ;
847
- self . next_flush_deadline = None ;
937
+ async fn flush ( & mut self , cx : & Context < Self > , synced : bool ) -> Result < ( ) , anyhow:: Error > {
938
+ if synced {
939
+ if self . unflushed_procs == 0 || self . ongoing_flush_port . is_none ( ) {
940
+ anyhow:: bail!( "found no ongoing flush request" ) ;
941
+ }
942
+ self . unflushed_procs -= 1 ;
943
+ if self . unflushed_procs == 0 {
944
+ self . flush_internal ( ) ;
945
+ let reply = self . ongoing_flush_port . take ( ) . unwrap ( ) ;
946
+ self . ongoing_flush_port = None ;
947
+ reply. send ( cx, ( ) ) . map_err ( anyhow:: Error :: from) ?;
948
+ }
949
+ } else {
950
+ self . flush_internal ( ) ;
951
+ }
848
952
849
953
Ok ( ( ) )
850
954
}
@@ -865,6 +969,21 @@ impl LogClientMessageHandler for LogClientActor {
865
969
self . aggregate_window_sec = aggregate_window_sec;
866
970
Ok ( ( ) )
867
971
}
972
+
973
+ async fn start_sync_flush (
974
+ & mut self ,
975
+ _cx : & Context < Self > ,
976
+ expected_procs_flushed : usize ,
977
+ reply : OncePortRef < ( ) > ,
978
+ ) -> Result < ( ) , anyhow:: Error > {
979
+ if self . unflushed_procs > 0 || self . ongoing_flush_port . is_some ( ) {
980
+ anyhow:: bail!( "forcing a flush while the ongoing flush has not finished yet" ) ;
981
+ }
982
+
983
+ self . ongoing_flush_port = Some ( reply. clone ( ) ) ;
984
+ self . unflushed_procs = expected_procs_flushed;
985
+ Ok ( ( ) )
986
+ }
868
987
}
869
988
870
989
#[ cfg( test) ]
0 commit comments