Skip to content

sync flush of proc mesh #823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 210 additions & 36 deletions hyperactor_mesh/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ use chrono::DateTime;
use chrono::Local;
use hyperactor::Actor;
use hyperactor::ActorRef;
use hyperactor::Bind;
use hyperactor::Context;
use hyperactor::HandleClient;
use hyperactor::Handler;
use hyperactor::Instance;
use hyperactor::Named;
use hyperactor::OncePortRef;
use hyperactor::RefClient;
use hyperactor::Unbind;
use hyperactor::channel;
use hyperactor::channel::ChannelAddr;
use hyperactor::channel::ChannelRx;
Expand All @@ -39,9 +42,6 @@ use hyperactor::channel::TxStatus;
use hyperactor::clock::Clock;
use hyperactor::clock::RealClock;
use hyperactor::data::Serialized;
use hyperactor::message::Bind;
use hyperactor::message::Bindings;
use hyperactor::message::Unbind;
use hyperactor_telemetry::env;
use hyperactor_telemetry::log_file_path;
use serde::Deserialize;
Expand Down Expand Up @@ -235,6 +235,24 @@ impl fmt::Display for Aggregator {
}
}

/// Messages that can be sent to the LogClientActor remotely.
#[derive(
Debug,
Clone,
Serialize,
Deserialize,
Named,
Handler,
HandleClient,
RefClient,
Bind,
Unbind
)]
pub enum LogFlushMessage {
/// Flush the log
ForceSyncFlush { version: u64 },
}

/// Messages that can be sent to the LogClientActor remotely.
#[derive(
Debug,
Expand All @@ -260,7 +278,11 @@ pub enum LogMessage {
},

/// Flush the log
Flush {},
Flush {
/// Indicate if the current flush is synced or non-synced.
/// If synced, a version number is available. Otherwise, none.
sync_version: Option<u64>,
},
}

/// Messages that can be sent to the LogClient locally.
Expand All @@ -279,6 +301,16 @@ pub enum LogClientMessage {
/// The time window in seconds to aggregate logs. If None, aggregation is disabled.
aggregate_window_sec: Option<u64>,
},

/// Synchronously flush all the logs from all the procs. This is for client to call.
StartSyncFlush {
/// Expect these many procs to ack the flush message.
expected_procs: usize,
/// Return once we have received the acks from all the procs
reply: OncePortRef<()>,
/// Return to the caller the current flush version
version: OncePortRef<u64>,
},
}

/// Trait for sending logs
Expand Down Expand Up @@ -352,7 +384,7 @@ impl LogSender for LocalLogSender {
// send will make sure message is delivered
if TxStatus::Active == *self.status.borrow() {
// Do not use tx.send, it will block the allocator as the child process state is unknown.
self.tx.post(LogMessage::Flush {});
self.tx.post(LogMessage::Flush { sync_version: None });
} else {
tracing::debug!(
"log sender {} is not active, skip sending flush message",
Expand Down Expand Up @@ -547,7 +579,9 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
Named,
Handler,
HandleClient,
RefClient
RefClient,
Bind,
Unbind
)]
pub enum LogForwardMessage {
/// Receive the log from the parent process and forward ti to the client.
Expand All @@ -557,18 +591,6 @@ pub enum LogForwardMessage {
SetMode { stream_to_client: bool },
}

impl Bind for LogForwardMessage {
fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
Ok(())
}
}

impl Unbind for LogForwardMessage {
fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
Ok(())
}
}

/// A log forwarder that receives the log from its parent process and forward it back to the client
#[derive(Debug)]
#[hyperactor::export(
Expand Down Expand Up @@ -636,17 +658,32 @@ impl Actor for LogForwardActor {
#[hyperactor::forward(LogForwardMessage)]
impl LogForwardMessageHandler for LogForwardActor {
async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
if let Ok(LogMessage::Log {
hostname,
pid,
output_target,
payload,
}) = self.rx.recv().await
{
if self.stream_to_client {
self.logging_client_ref
.log(ctx, hostname, pid, output_target, payload)
.await?;
match self.rx.recv().await {
Ok(LogMessage::Flush { sync_version }) => {
match sync_version {
None => {
// no need to do anything. The previous messages have already been sent to the client.
// Client will flush based on its own frequency.
}
version => {
self.logging_client_ref.flush(ctx, version).await?;
}
}
}
Ok(LogMessage::Log {
hostname,
pid,
output_target,
payload,
}) => {
if self.stream_to_client {
self.logging_client_ref
.log(ctx, hostname, pid, output_target, payload)
.await?;
}
}
Err(e) => {
return Err(e.into());
}
}

Expand Down Expand Up @@ -685,6 +722,60 @@ fn deserialize_message_lines(
anyhow::bail!("Failed to deserialize message as either String or Vec<u8>")
}

/// An actor that send flush message to the log forwarder actor.
/// The reason we need an extra actor instead of reusing the log forwarder actor
/// is because the log forwarder can be blocked on the rx.recv() that listens on the new log lines.
/// Thus, we need to create anew channel as a tx to send the flush message to the log forwarder
/// So we do not get into a deadlock.
#[derive(Debug)]
#[hyperactor::export(
spawn = true,
handlers = [LogFlushMessage {cast = true}],
)]
pub struct LogFlushActor {
tx: ChannelTx<LogMessage>,
}

#[async_trait]
impl Actor for LogFlushActor {
type Params = ();

async fn new(_: ()) -> Result<Self, anyhow::Error> {
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
Ok(channel) => channel.parse()?,
Err(err) => {
tracing::debug!(
"log forwarder actor failed to read env var {}: {}",
BOOTSTRAP_LOG_CHANNEL,
err
);
// TODO: this should error out; it can only happen with local proc; we need to fix it.
ChannelAddr::any(ChannelTransport::Unix)
}
};
let tx = channel::dial::<LogMessage>(log_channel)?;

Ok(Self { tx })
}
}

#[async_trait]
#[hyperactor::forward(LogFlushMessage)]
impl LogFlushMessageHandler for LogFlushActor {
async fn force_sync_flush(
&mut self,
_cx: &Context<Self>,
version: u64,
) -> Result<(), anyhow::Error> {
self.tx
.send(LogMessage::Flush {
sync_version: Some(version),
})
.await
.map_err(anyhow::Error::from)
}
}

/// A client to receive logs from remote processes
#[derive(Debug)]
#[hyperactor::export(
Expand All @@ -696,6 +787,11 @@ pub struct LogClientActor {
aggregators: HashMap<OutputTarget, Aggregator>,
last_flush_time: SystemTime,
next_flush_deadline: Option<SystemTime>,

// For flush sync barrier
current_flush_version: u64,
current_flush_port: Option<OncePortRef<()>>,
current_unflushed_procs: usize,
}

impl LogClientActor {
Expand Down Expand Up @@ -725,6 +821,12 @@ impl LogClientActor {
OutputTarget::Stderr => eprintln!("{}", message),
}
}

fn flush_internal(&mut self) {
self.print_aggregators();
self.last_flush_time = RealClock.system_time_now();
self.next_flush_deadline = None;
}
}

#[async_trait]
Expand All @@ -743,6 +845,9 @@ impl Actor for LogClientActor {
aggregators,
last_flush_time: RealClock.system_time_now(),
next_flush_deadline: None,
current_flush_version: 0,
current_flush_port: None,
current_unflushed_procs: 0,
})
}
}
Expand Down Expand Up @@ -794,20 +899,26 @@ impl LogMessageHandler for LogClientActor {
let new_deadline = self.last_flush_time + Duration::from_secs(window);
let now = RealClock.system_time_now();
if new_deadline <= now {
self.flush(cx).await?;
self.flush_internal();
} else {
let delay = new_deadline.duration_since(now)?;
match self.next_flush_deadline {
None => {
self.next_flush_deadline = Some(new_deadline);
cx.self_message_with_delay(LogMessage::Flush {}, delay)?;
cx.self_message_with_delay(
LogMessage::Flush { sync_version: None },
delay,
)?;
}
Some(deadline) => {
// Some early log lines have alrady triggered the flush.
if new_deadline < deadline {
// This can happen if the user has adjusted the aggregation window.
self.next_flush_deadline = Some(new_deadline);
cx.self_message_with_delay(LogMessage::Flush {}, delay)?;
cx.self_message_with_delay(
LogMessage::Flush { sync_version: None },
delay,
)?;
}
}
}
Expand All @@ -818,10 +929,45 @@ impl LogMessageHandler for LogClientActor {
Ok(())
}

async fn flush(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
self.print_aggregators();
self.last_flush_time = RealClock.system_time_now();
self.next_flush_deadline = None;
async fn flush(
&mut self,
cx: &Context<Self>,
sync_version: Option<u64>,
) -> Result<(), anyhow::Error> {
match sync_version {
None => {
self.flush_internal();
}
Some(version) => {
if version != self.current_flush_version {
tracing::error!(
"found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
version,
self.current_flush_version
);
return Ok(());
}

if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
// This is a serious issue; it's better to error out.
anyhow::bail!("found no ongoing flush request");
}
self.current_unflushed_procs -= 1;

tracing::debug!(
"ack sync flush: version {}; remaining procs: {}",
self.current_flush_version,
self.current_unflushed_procs
);

if self.current_unflushed_procs == 0 {
self.flush_internal();
let reply = self.current_flush_port.take().unwrap();
self.current_flush_port = None;
reply.send(cx, ()).map_err(anyhow::Error::from)?;
}
}
}

Ok(())
}
Expand All @@ -842,6 +988,34 @@ impl LogClientMessageHandler for LogClientActor {
self.aggregate_window_sec = aggregate_window_sec;
Ok(())
}

async fn start_sync_flush(
&mut self,
cx: &Context<Self>,
expected_procs_flushed: usize,
reply: OncePortRef<()>,
version: OncePortRef<u64>,
) -> Result<(), anyhow::Error> {
if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
tracing::warn!(
"found unfinished ongoing flush: version {}; {} unflushed procs",
self.current_flush_version,
self.current_unflushed_procs,
);
}

self.current_flush_version += 1;
tracing::debug!(
"start sync flush with version {}",
self.current_flush_version
);
self.current_flush_port = Some(reply.clone());
self.current_unflushed_procs = expected_procs_flushed;
version
.send(cx, self.current_flush_version)
.map_err(anyhow::Error::from)?;
Ok(())
}
}

#[cfg(test)]
Expand Down
Loading