Skip to content

Commit 11a217e

Browse files
highkerfacebook-github-bot
authored andcommitted
sync flush of proc mesh
Summary: Provide sync flush so it is guaranteed all the flushed logs on the remote procs will be streamed back and flushed on client's stdout/stderr. Differential Revision: D80051803
1 parent 5bfacfe commit 11a217e

File tree

5 files changed

+314
-45
lines changed

5 files changed

+314
-45
lines changed

hyperactor_mesh/src/logging.rs

Lines changed: 210 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@ use chrono::DateTime;
2222
use chrono::Local;
2323
use hyperactor::Actor;
2424
use hyperactor::ActorRef;
25+
use hyperactor::Bind;
2526
use hyperactor::Context;
2627
use hyperactor::HandleClient;
2728
use hyperactor::Handler;
2829
use hyperactor::Instance;
2930
use hyperactor::Named;
31+
use hyperactor::OncePortRef;
3032
use hyperactor::RefClient;
33+
use hyperactor::Unbind;
3134
use hyperactor::channel;
3235
use hyperactor::channel::ChannelAddr;
3336
use hyperactor::channel::ChannelRx;
@@ -39,9 +42,6 @@ use hyperactor::channel::TxStatus;
3942
use hyperactor::clock::Clock;
4043
use hyperactor::clock::RealClock;
4144
use hyperactor::data::Serialized;
42-
use hyperactor::message::Bind;
43-
use hyperactor::message::Bindings;
44-
use hyperactor::message::Unbind;
4545
use hyperactor_telemetry::env;
4646
use hyperactor_telemetry::log_file_path;
4747
use serde::Deserialize;
@@ -235,6 +235,24 @@ impl fmt::Display for Aggregator {
235235
}
236236
}
237237

238+
/// Messages that can be sent to the LogClientActor remotely.
239+
#[derive(
240+
Debug,
241+
Clone,
242+
Serialize,
243+
Deserialize,
244+
Named,
245+
Handler,
246+
HandleClient,
247+
RefClient,
248+
Bind,
249+
Unbind
250+
)]
251+
pub enum LogFlushMessage {
252+
/// Flush the log
253+
ForceSyncFlush { version: u64 },
254+
}
255+
238256
/// Messages that can be sent to the LogClientActor remotely.
239257
#[derive(
240258
Debug,
@@ -260,7 +278,11 @@ pub enum LogMessage {
260278
},
261279

262280
/// Flush the log
263-
Flush {},
281+
Flush {
282+
/// Indicate if the current flush is synced or non-synced.
283+
/// If synced, a version number is available. Otherwise, none.
284+
sync_version: Option<u64>,
285+
},
264286
}
265287

266288
/// Messages that can be sent to the LogClient locally.
@@ -279,6 +301,16 @@ pub enum LogClientMessage {
279301
/// The time window in seconds to aggregate logs. If None, aggregation is disabled.
280302
aggregate_window_sec: Option<u64>,
281303
},
304+
305+
/// Synchronously flush all the logs from all the procs. This is for client to call.
306+
StartSyncFlush {
307+
/// Expect these many procs to ack the flush message.
308+
expected_procs: usize,
309+
/// Return once we have received the acks from all the procs
310+
reply: OncePortRef<()>,
311+
/// Return to the caller the current flush version
312+
version: OncePortRef<u64>,
313+
},
282314
}
283315

284316
/// Trait for sending logs
@@ -352,7 +384,7 @@ impl LogSender for LocalLogSender {
352384
// send will make sure message is delivered
353385
if TxStatus::Active == *self.status.borrow() {
354386
// Do not use tx.send, it will block the allocator as the child process state is unknown.
355-
self.tx.post(LogMessage::Flush {});
387+
self.tx.post(LogMessage::Flush { sync_version: None });
356388
} else {
357389
tracing::debug!(
358390
"log sender {} is not active, skip sending flush message",
@@ -558,7 +590,9 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
558590
Named,
559591
Handler,
560592
HandleClient,
561-
RefClient
593+
RefClient,
594+
Bind,
595+
Unbind
562596
)]
563597
pub enum LogForwardMessage {
564598
/// Receive the log from the parent process and forward ti to the client.
@@ -568,18 +602,6 @@ pub enum LogForwardMessage {
568602
SetMode { stream_to_client: bool },
569603
}
570604

571-
impl Bind for LogForwardMessage {
572-
fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
573-
Ok(())
574-
}
575-
}
576-
577-
impl Unbind for LogForwardMessage {
578-
fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
579-
Ok(())
580-
}
581-
}
582-
583605
/// A log forwarder that receives the log from its parent process and forward it back to the client
584606
#[derive(Debug)]
585607
#[hyperactor::export(
@@ -647,17 +669,32 @@ impl Actor for LogForwardActor {
647669
#[hyperactor::forward(LogForwardMessage)]
648670
impl LogForwardMessageHandler for LogForwardActor {
649671
async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
650-
if let Ok(LogMessage::Log {
651-
hostname,
652-
pid,
653-
output_target,
654-
payload,
655-
}) = self.rx.recv().await
656-
{
657-
if self.stream_to_client {
658-
self.logging_client_ref
659-
.log(ctx, hostname, pid, output_target, payload)
660-
.await?;
672+
match self.rx.recv().await {
673+
Ok(LogMessage::Flush { sync_version }) => {
674+
match sync_version {
675+
None => {
676+
// no need to do anything. The previous messages have already been sent to the client.
677+
// Client will flush based on its own frequency.
678+
}
679+
version => {
680+
self.logging_client_ref.flush(ctx, version).await?;
681+
}
682+
}
683+
}
684+
Ok(LogMessage::Log {
685+
hostname,
686+
pid,
687+
output_target,
688+
payload,
689+
}) => {
690+
if self.stream_to_client {
691+
self.logging_client_ref
692+
.log(ctx, hostname, pid, output_target, payload)
693+
.await?;
694+
}
695+
}
696+
Err(e) => {
697+
return Err(e.into());
661698
}
662699
}
663700

@@ -696,6 +733,60 @@ fn deserialize_message_lines(
696733
anyhow::bail!("Failed to deserialize message as either String or Vec<u8>")
697734
}
698735

736+
/// An actor that send flush message to the log forwarder actor.
737+
/// The reason we need an extra actor instead of reusing the log forwarder actor
738+
/// is because the log forwarder can be blocked on the rx.recv() that listens on the new log lines.
739+
/// Thus, we need to create anew channel as a tx to send the flush message to the log forwarder
740+
/// So we do not get into a deadlock.
741+
#[derive(Debug)]
742+
#[hyperactor::export(
743+
spawn = true,
744+
handlers = [LogFlushMessage {cast = true}],
745+
)]
746+
pub struct LogFlushActor {
747+
tx: ChannelTx<LogMessage>,
748+
}
749+
750+
#[async_trait]
751+
impl Actor for LogFlushActor {
752+
type Params = ();
753+
754+
async fn new(_: ()) -> Result<Self, anyhow::Error> {
755+
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
756+
Ok(channel) => channel.parse()?,
757+
Err(err) => {
758+
tracing::debug!(
759+
"log forwarder actor failed to read env var {}: {}",
760+
BOOTSTRAP_LOG_CHANNEL,
761+
err
762+
);
763+
// TODO: this should error out; it can only happen with local proc; we need to fix it.
764+
ChannelAddr::any(ChannelTransport::Unix)
765+
}
766+
};
767+
let tx = channel::dial::<LogMessage>(log_channel)?;
768+
769+
Ok(Self { tx })
770+
}
771+
}
772+
773+
#[async_trait]
774+
#[hyperactor::forward(LogFlushMessage)]
775+
impl LogFlushMessageHandler for LogFlushActor {
776+
async fn force_sync_flush(
777+
&mut self,
778+
_cx: &Context<Self>,
779+
version: u64,
780+
) -> Result<(), anyhow::Error> {
781+
self.tx
782+
.send(LogMessage::Flush {
783+
sync_version: Some(version),
784+
})
785+
.await
786+
.map_err(anyhow::Error::from)
787+
}
788+
}
789+
699790
/// A client to receive logs from remote processes
700791
#[derive(Debug)]
701792
#[hyperactor::export(
@@ -707,6 +798,11 @@ pub struct LogClientActor {
707798
aggregators: HashMap<OutputTarget, Aggregator>,
708799
last_flush_time: SystemTime,
709800
next_flush_deadline: Option<SystemTime>,
801+
802+
// For flush sync barrier
803+
current_flush_version: u64,
804+
current_flush_port: Option<OncePortRef<()>>,
805+
current_unflushed_procs: usize,
710806
}
711807

712808
impl LogClientActor {
@@ -736,6 +832,12 @@ impl LogClientActor {
736832
OutputTarget::Stderr => eprintln!("{}", message),
737833
}
738834
}
835+
836+
fn flush_internal(&mut self) {
837+
self.print_aggregators();
838+
self.last_flush_time = RealClock.system_time_now();
839+
self.next_flush_deadline = None;
840+
}
739841
}
740842

741843
#[async_trait]
@@ -754,6 +856,9 @@ impl Actor for LogClientActor {
754856
aggregators,
755857
last_flush_time: RealClock.system_time_now(),
756858
next_flush_deadline: None,
859+
current_flush_version: 0,
860+
current_flush_port: None,
861+
current_unflushed_procs: 0,
757862
})
758863
}
759864
}
@@ -805,20 +910,26 @@ impl LogMessageHandler for LogClientActor {
805910
let new_deadline = self.last_flush_time + Duration::from_secs(window);
806911
let now = RealClock.system_time_now();
807912
if new_deadline <= now {
808-
self.flush(cx).await?;
913+
self.flush_internal();
809914
} else {
810915
let delay = new_deadline.duration_since(now)?;
811916
match self.next_flush_deadline {
812917
None => {
813918
self.next_flush_deadline = Some(new_deadline);
814-
cx.self_message_with_delay(LogMessage::Flush {}, delay)?;
919+
cx.self_message_with_delay(
920+
LogMessage::Flush { sync_version: None },
921+
delay,
922+
)?;
815923
}
816924
Some(deadline) => {
817925
// Some early log lines have alrady triggered the flush.
818926
if new_deadline < deadline {
819927
// This can happen if the user has adjusted the aggregation window.
820928
self.next_flush_deadline = Some(new_deadline);
821-
cx.self_message_with_delay(LogMessage::Flush {}, delay)?;
929+
cx.self_message_with_delay(
930+
LogMessage::Flush { sync_version: None },
931+
delay,
932+
)?;
822933
}
823934
}
824935
}
@@ -829,10 +940,45 @@ impl LogMessageHandler for LogClientActor {
829940
Ok(())
830941
}
831942

832-
async fn flush(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
833-
self.print_aggregators();
834-
self.last_flush_time = RealClock.system_time_now();
835-
self.next_flush_deadline = None;
943+
async fn flush(
944+
&mut self,
945+
cx: &Context<Self>,
946+
sync_version: Option<u64>,
947+
) -> Result<(), anyhow::Error> {
948+
match sync_version {
949+
None => {
950+
self.flush_internal();
951+
}
952+
Some(version) => {
953+
if version != self.current_flush_version {
954+
tracing::error!(
955+
"found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
956+
version,
957+
self.current_flush_version
958+
);
959+
return Ok(());
960+
}
961+
962+
if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
963+
// This is a serious issue; it's better to error out.
964+
anyhow::bail!("found no ongoing flush request");
965+
}
966+
self.current_unflushed_procs -= 1;
967+
968+
tracing::debug!(
969+
"ack sync flush: version {}; remaining procs: {}",
970+
self.current_flush_version,
971+
self.current_unflushed_procs
972+
);
973+
974+
if self.current_unflushed_procs == 0 {
975+
self.flush_internal();
976+
let reply = self.current_flush_port.take().unwrap();
977+
self.current_flush_port = None;
978+
reply.send(cx, ()).map_err(anyhow::Error::from)?;
979+
}
980+
}
981+
}
836982

837983
Ok(())
838984
}
@@ -853,6 +999,34 @@ impl LogClientMessageHandler for LogClientActor {
853999
self.aggregate_window_sec = aggregate_window_sec;
8541000
Ok(())
8551001
}
1002+
1003+
async fn start_sync_flush(
1004+
&mut self,
1005+
cx: &Context<Self>,
1006+
expected_procs_flushed: usize,
1007+
reply: OncePortRef<()>,
1008+
version: OncePortRef<u64>,
1009+
) -> Result<(), anyhow::Error> {
1010+
if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1011+
tracing::warn!(
1012+
"found unfinished ongoing flush: version {}; {} unflushed procs",
1013+
self.current_flush_version,
1014+
self.current_unflushed_procs,
1015+
);
1016+
}
1017+
1018+
self.current_flush_version += 1;
1019+
tracing::debug!(
1020+
"start sync flush with version {}",
1021+
self.current_flush_version
1022+
);
1023+
self.current_flush_port = Some(reply.clone());
1024+
self.current_unflushed_procs = expected_procs_flushed;
1025+
version
1026+
.send(cx, self.current_flush_version)
1027+
.map_err(anyhow::Error::from)?;
1028+
Ok(())
1029+
}
8561030
}
8571031

8581032
#[cfg(test)]

0 commit comments

Comments
 (0)