Skip to content

Commit 29071ff

Browse files
James Sunfacebook-github-bot
authored andcommitted
sync flush of proc mesh (#823)
Summary: Provide sync flush so it is guaranteed all the flushed logs on the remote procs will be streamed back and flushed on client's stdout/stderr. Differential Revision: D80051803
1 parent 6754b42 commit 29071ff

File tree

5 files changed

+296
-45
lines changed

5 files changed

+296
-45
lines changed

hyperactor_mesh/src/logging.rs

Lines changed: 199 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@ use chrono::DateTime;
2222
use chrono::Local;
2323
use hyperactor::Actor;
2424
use hyperactor::ActorRef;
25+
use hyperactor::Bind;
2526
use hyperactor::Context;
2627
use hyperactor::HandleClient;
2728
use hyperactor::Handler;
2829
use hyperactor::Instance;
2930
use hyperactor::Named;
31+
use hyperactor::OncePortRef;
3032
use hyperactor::RefClient;
33+
use hyperactor::Unbind;
3134
use hyperactor::channel;
3235
use hyperactor::channel::ChannelAddr;
3336
use hyperactor::channel::ChannelRx;
@@ -39,9 +42,6 @@ use hyperactor::channel::TxStatus;
3942
use hyperactor::clock::Clock;
4043
use hyperactor::clock::RealClock;
4144
use hyperactor::data::Serialized;
42-
use hyperactor::message::Bind;
43-
use hyperactor::message::Bindings;
44-
use hyperactor::message::Unbind;
4545
use hyperactor_telemetry::env;
4646
use hyperactor_telemetry::log_file_path;
4747
use serde::Deserialize;
@@ -235,6 +235,24 @@ impl fmt::Display for Aggregator {
235235
}
236236
}
237237

238+
/// Messages that can be sent to the LogClientActor remotely.
239+
#[derive(
240+
Debug,
241+
Clone,
242+
Serialize,
243+
Deserialize,
244+
Named,
245+
Handler,
246+
HandleClient,
247+
RefClient,
248+
Bind,
249+
Unbind
250+
)]
251+
pub enum LogFlushMessage {
252+
/// Flush the log
253+
ForceSyncFlush { version: u64 },
254+
}
255+
238256
/// Messages that can be sent to the LogClientActor remotely.
239257
#[derive(
240258
Debug,
@@ -260,7 +278,11 @@ pub enum LogMessage {
260278
},
261279

262280
/// Flush the log
263-
Flush {},
281+
Flush {
282+
/// Indicate if the current flush is synced or non-synced.
283+
/// If synced, a version number is available. Otherwise, none.
284+
sync_version: Option<u64>,
285+
},
264286
}
265287

266288
/// Messages that can be sent to the LogClient locally.
@@ -279,6 +301,16 @@ pub enum LogClientMessage {
279301
/// The time window in seconds to aggregate logs. If None, aggregation is disabled.
280302
aggregate_window_sec: Option<u64>,
281303
},
304+
305+
/// Synchronously flush all the logs from all the procs. This is for client to call.
306+
StartSyncFlush {
307+
/// Expect these many procs to ack the flush message.
308+
expected_procs: usize,
309+
/// Return once we have received the acks from all the procs
310+
reply: OncePortRef<()>,
311+
/// Return to the caller the current flush version
312+
version: OncePortRef<u64>,
313+
},
282314
}
283315

284316
/// Trait for sending logs
@@ -352,7 +384,7 @@ impl LogSender for LocalLogSender {
352384
// send will make sure message is delivered
353385
if TxStatus::Active == *self.status.borrow() {
354386
// Do not use tx.send, it will block the allocator as the child process state is unknown.
355-
self.tx.post(LogMessage::Flush {});
387+
self.tx.post(LogMessage::Flush { sync_version: None });
356388
} else {
357389
tracing::debug!(
358390
"log sender {} is not active, skip sending flush message",
@@ -547,7 +579,9 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
547579
Named,
548580
Handler,
549581
HandleClient,
550-
RefClient
582+
RefClient,
583+
Bind,
584+
Unbind
551585
)]
552586
pub enum LogForwardMessage {
553587
/// Receive the log from the parent process and forward ti to the client.
@@ -557,18 +591,6 @@ pub enum LogForwardMessage {
557591
SetMode { stream_to_client: bool },
558592
}
559593

560-
impl Bind for LogForwardMessage {
561-
fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
562-
Ok(())
563-
}
564-
}
565-
566-
impl Unbind for LogForwardMessage {
567-
fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
568-
Ok(())
569-
}
570-
}
571-
572594
/// A log forwarder that receives the log from its parent process and forward it back to the client
573595
#[derive(Debug)]
574596
#[hyperactor::export(
@@ -636,17 +658,32 @@ impl Actor for LogForwardActor {
636658
#[hyperactor::forward(LogForwardMessage)]
637659
impl LogForwardMessageHandler for LogForwardActor {
638660
async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
639-
if let Ok(LogMessage::Log {
640-
hostname,
641-
pid,
642-
output_target,
643-
payload,
644-
}) = self.rx.recv().await
645-
{
646-
if self.stream_to_client {
647-
self.logging_client_ref
648-
.log(ctx, hostname, pid, output_target, payload)
649-
.await?;
661+
match self.rx.recv().await {
662+
Ok(LogMessage::Flush { sync_version }) => {
663+
match sync_version {
664+
None => {
665+
// no need to do anything. The previous messages have already been sent to the client.
666+
// Client will flush based on its own frequency.
667+
}
668+
version => {
669+
self.logging_client_ref.flush(ctx, version).await?;
670+
}
671+
}
672+
}
673+
Ok(LogMessage::Log {
674+
hostname,
675+
pid,
676+
output_target,
677+
payload,
678+
}) => {
679+
if self.stream_to_client {
680+
self.logging_client_ref
681+
.log(ctx, hostname, pid, output_target, payload)
682+
.await?;
683+
}
684+
}
685+
Err(e) => {
686+
return Err(e.into());
650687
}
651688
}
652689

@@ -685,6 +722,60 @@ fn deserialize_message_lines(
685722
anyhow::bail!("Failed to deserialize message as either String or Vec<u8>")
686723
}
687724

725+
/// An actor that send flush message to the log forwarder actor.
726+
/// The reason we need an extra actor instead of reusing the log forwarder actor
727+
/// is because the log forwarder can be blocked on the rx.recv() that listens on the new log lines.
728+
/// Thus, we need to create anew channel as a tx to send the flush message to the log forwarder
729+
/// So we do not get into a deadlock.
730+
#[derive(Debug)]
731+
#[hyperactor::export(
732+
spawn = true,
733+
handlers = [LogFlushMessage {cast = true}],
734+
)]
735+
pub struct LogFlushActor {
736+
tx: ChannelTx<LogMessage>,
737+
}
738+
739+
#[async_trait]
740+
impl Actor for LogFlushActor {
741+
type Params = ();
742+
743+
async fn new(_: ()) -> Result<Self, anyhow::Error> {
744+
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
745+
Ok(channel) => channel.parse()?,
746+
Err(err) => {
747+
tracing::debug!(
748+
"log forwarder actor failed to read env var {}: {}",
749+
BOOTSTRAP_LOG_CHANNEL,
750+
err
751+
);
752+
// TODO: this should error out; it can only happen with local proc; we need to fix it.
753+
ChannelAddr::any(ChannelTransport::Unix)
754+
}
755+
};
756+
let tx = channel::dial::<LogMessage>(log_channel)?;
757+
758+
Ok(Self { tx })
759+
}
760+
}
761+
762+
#[async_trait]
763+
#[hyperactor::forward(LogFlushMessage)]
764+
impl LogFlushMessageHandler for LogFlushActor {
765+
async fn force_sync_flush(
766+
&mut self,
767+
_cx: &Context<Self>,
768+
version: u64,
769+
) -> Result<(), anyhow::Error> {
770+
self.tx
771+
.send(LogMessage::Flush {
772+
sync_version: Some(version),
773+
})
774+
.await
775+
.map_err(anyhow::Error::from)
776+
}
777+
}
778+
688779
/// A client to receive logs from remote processes
689780
#[derive(Debug)]
690781
#[hyperactor::export(
@@ -696,6 +787,11 @@ pub struct LogClientActor {
696787
aggregators: HashMap<OutputTarget, Aggregator>,
697788
last_flush_time: SystemTime,
698789
next_flush_deadline: Option<SystemTime>,
790+
791+
// For flush sync barrier
792+
current_flush_version: u64,
793+
current_flush_port: Option<OncePortRef<()>>,
794+
current_unflushed_procs: usize,
699795
}
700796

701797
impl LogClientActor {
@@ -725,6 +821,12 @@ impl LogClientActor {
725821
OutputTarget::Stderr => eprintln!("{}", message),
726822
}
727823
}
824+
825+
fn flush_internal(&mut self) {
826+
self.print_aggregators();
827+
self.last_flush_time = RealClock.system_time_now();
828+
self.next_flush_deadline = None;
829+
}
728830
}
729831

730832
#[async_trait]
@@ -743,6 +845,9 @@ impl Actor for LogClientActor {
743845
aggregators,
744846
last_flush_time: RealClock.system_time_now(),
745847
next_flush_deadline: None,
848+
current_flush_version: 0,
849+
current_flush_port: None,
850+
current_unflushed_procs: 0,
746851
})
747852
}
748853
}
@@ -794,20 +899,26 @@ impl LogMessageHandler for LogClientActor {
794899
let new_deadline = self.last_flush_time + Duration::from_secs(window);
795900
let now = RealClock.system_time_now();
796901
if new_deadline <= now {
797-
self.flush(cx).await?;
902+
self.flush_internal();
798903
} else {
799904
let delay = new_deadline.duration_since(now)?;
800905
match self.next_flush_deadline {
801906
None => {
802907
self.next_flush_deadline = Some(new_deadline);
803-
cx.self_message_with_delay(LogMessage::Flush {}, delay)?;
908+
cx.self_message_with_delay(
909+
LogMessage::Flush { sync_version: None },
910+
delay,
911+
)?;
804912
}
805913
Some(deadline) => {
806914
// Some early log lines have alrady triggered the flush.
807915
if new_deadline < deadline {
808916
// This can happen if the user has adjusted the aggregation window.
809917
self.next_flush_deadline = Some(new_deadline);
810-
cx.self_message_with_delay(LogMessage::Flush {}, delay)?;
918+
cx.self_message_with_delay(
919+
LogMessage::Flush { sync_version: None },
920+
delay,
921+
)?;
811922
}
812923
}
813924
}
@@ -818,10 +929,38 @@ impl LogMessageHandler for LogClientActor {
818929
Ok(())
819930
}
820931

821-
async fn flush(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
822-
self.print_aggregators();
823-
self.last_flush_time = RealClock.system_time_now();
824-
self.next_flush_deadline = None;
932+
async fn flush(
933+
&mut self,
934+
cx: &Context<Self>,
935+
sync_version: Option<u64>,
936+
) -> Result<(), anyhow::Error> {
937+
match sync_version {
938+
None => {
939+
self.flush_internal();
940+
}
941+
Some(version) => {
942+
if version != self.current_flush_version {
943+
tracing::error!(
944+
"found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
945+
version,
946+
self.current_flush_version
947+
);
948+
return Ok(());
949+
}
950+
951+
if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
952+
// This is a serious issue; it's better to error out.
953+
anyhow::bail!("found no ongoing flush request");
954+
}
955+
self.current_unflushed_procs -= 1;
956+
if self.current_unflushed_procs == 0 {
957+
self.flush_internal();
958+
let reply = self.current_flush_port.take().unwrap();
959+
self.current_flush_port = None;
960+
reply.send(cx, ()).map_err(anyhow::Error::from)?;
961+
}
962+
}
963+
}
825964

826965
Ok(())
827966
}
@@ -842,6 +981,30 @@ impl LogClientMessageHandler for LogClientActor {
842981
self.aggregate_window_sec = aggregate_window_sec;
843982
Ok(())
844983
}
984+
985+
async fn start_sync_flush(
986+
&mut self,
987+
cx: &Context<Self>,
988+
expected_procs_flushed: usize,
989+
reply: OncePortRef<()>,
990+
version: OncePortRef<u64>,
991+
) -> Result<(), anyhow::Error> {
992+
if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
993+
tracing::warn!(
994+
"found unfinished ongoing flush: version {}; {} unflushed procs",
995+
self.current_flush_version,
996+
self.current_unflushed_procs,
997+
);
998+
}
999+
1000+
self.current_flush_version += 1;
1001+
self.current_flush_port = Some(reply.clone());
1002+
self.current_unflushed_procs = expected_procs_flushed;
1003+
version
1004+
.send(cx, self.current_flush_version)
1005+
.map_err(anyhow::Error::from)?;
1006+
Ok(())
1007+
}
8451008
}
8461009

8471010
#[cfg(test)]

0 commit comments

Comments
 (0)