Skip to content

check if we are in ipython env #816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 189 additions & 35 deletions hyperactor_mesh/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::fmt;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context as TaskContext;
use std::task::Poll;
use std::time::Duration;
Expand All @@ -22,12 +23,15 @@ use chrono::DateTime;
use chrono::Local;
use hyperactor::Actor;
use hyperactor::ActorRef;
use hyperactor::Bind;
use hyperactor::Context;
use hyperactor::HandleClient;
use hyperactor::Handler;
use hyperactor::Instance;
use hyperactor::Named;
use hyperactor::OncePortRef;
use hyperactor::RefClient;
use hyperactor::Unbind;
use hyperactor::channel;
use hyperactor::channel::ChannelAddr;
use hyperactor::channel::ChannelRx;
Expand All @@ -39,14 +43,12 @@ use hyperactor::channel::TxStatus;
use hyperactor::clock::Clock;
use hyperactor::clock::RealClock;
use hyperactor::data::Serialized;
use hyperactor::message::Bind;
use hyperactor::message::Bindings;
use hyperactor::message::Unbind;
use hyperactor_telemetry::env;
use hyperactor_telemetry::log_file_path;
use serde::Deserialize;
use serde::Serialize;
use tokio::io;
use tokio::sync::Mutex;
use tokio::sync::watch::Receiver;

use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
Expand Down Expand Up @@ -260,7 +262,11 @@ pub enum LogMessage {
},

/// Flush the log
Flush {},
Flush {
/// Indicate if the current flush is synced or non-synced.
/// If synced, a version number is available. Otherwise, none.
sync_version: Option<u64>,
},
}

/// Messages that can be sent to the LogClient locally.
Expand All @@ -279,6 +285,16 @@ pub enum LogClientMessage {
/// The time window in seconds to aggregate logs. If None, aggregation is disabled.
aggregate_window_sec: Option<u64>,
},

/// Synchronously flush all the logs from all the procs. This is for client to call.
StartSyncFlush {
/// Expect these many procs to ack the flush message.
expected_procs: usize,
/// Return once we have received the acks from all the procs
reply: OncePortRef<()>,
/// Return to the caller the current flush version
version: OncePortRef<u64>,
},
}

/// Trait for sending logs
Expand Down Expand Up @@ -352,7 +368,7 @@ impl LogSender for LocalLogSender {
// send will make sure message is delivered
if TxStatus::Active == *self.status.borrow() {
// Do not use tx.send, it will block the allocator as the child process state is unknown.
self.tx.post(LogMessage::Flush {});
self.tx.post(LogMessage::Flush { sync_version: None });
} else {
tracing::debug!(
"log sender {} is not active, skip sending flush message",
Expand Down Expand Up @@ -558,26 +574,19 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
Named,
Handler,
HandleClient,
RefClient
RefClient,
Bind,
Unbind
)]
pub enum LogForwardMessage {
/// Receive the log from the parent process and forward ti to the client.
Forward {},

/// If to stream the log back to the client.
SetMode { stream_to_client: bool },
}

impl Bind for LogForwardMessage {
fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
Ok(())
}
}

impl Unbind for LogForwardMessage {
fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
Ok(())
}
/// Flush the log with a version number.
ForceSyncFlush { version: u64 },
}

/// A log forwarder that receives the log from its parent process and forward it back to the client
Expand All @@ -588,6 +597,8 @@ impl Unbind for LogForwardMessage {
)]
pub struct LogForwardActor {
rx: ChannelRx<LogMessage>,
flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
next_flush_deadline: SystemTime,
logging_client_ref: ActorRef<LogClientActor>,
stream_to_client: bool,
}
Expand Down Expand Up @@ -630,15 +641,29 @@ impl Actor for LogForwardActor {
.1
}
};

// Dial the same channel to send flush message to drain the log queue.
let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
let now = RealClock.system_time_now();

Ok(Self {
rx,
flush_tx,
next_flush_deadline: now,
logging_client_ref,
stream_to_client: true,
})
}

async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;

// Make sure we start the flush loop periodically so the log channel will not deadlock.
self.flush_tx
.lock()
.await
.send(LogMessage::Flush { sync_version: None })
.await?;
Ok(())
}
}
Expand All @@ -647,17 +672,48 @@ impl Actor for LogForwardActor {
#[hyperactor::forward(LogForwardMessage)]
impl LogForwardMessageHandler for LogForwardActor {
async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
if let Ok(LogMessage::Log {
hostname,
pid,
output_target,
payload,
}) = self.rx.recv().await
{
if self.stream_to_client {
self.logging_client_ref
.log(ctx, hostname, pid, output_target, payload)
.await?;
match self.rx.recv().await {
Ok(LogMessage::Flush { sync_version }) => {
let now = RealClock.system_time_now();
match sync_version {
None => {
// Schedule another flush to keep the log channel from deadlocking.
let delay = Duration::from_secs(1);
if now >= self.next_flush_deadline {
self.next_flush_deadline = now + delay;
let flush_tx = self.flush_tx.clone();
tokio::spawn(async move {
RealClock.sleep(delay).await;
if let Err(e) = flush_tx
.lock()
.await
.send(LogMessage::Flush { sync_version: None })
.await
{
tracing::error!("failed to send flush message: {}", e);
}
});
}
}
version => {
self.logging_client_ref.flush(ctx, version).await?;
}
}
}
Ok(LogMessage::Log {
hostname,
pid,
output_target,
payload,
}) => {
if self.stream_to_client {
self.logging_client_ref
.log(ctx, hostname, pid, output_target, payload)
.await?;
}
}
Err(e) => {
return Err(e.into());
}
}

Expand All @@ -675,6 +731,21 @@ impl LogForwardMessageHandler for LogForwardActor {
self.stream_to_client = stream_to_client;
Ok(())
}

async fn force_sync_flush(
&mut self,
_cx: &Context<Self>,
version: u64,
) -> Result<(), anyhow::Error> {
self.flush_tx
.lock()
.await
.send(LogMessage::Flush {
sync_version: Some(version),
})
.await
.map_err(anyhow::Error::from)
}
}

/// Deserialize a serialized message and split it into UTF-8 lines
Expand Down Expand Up @@ -707,6 +778,11 @@ pub struct LogClientActor {
aggregators: HashMap<OutputTarget, Aggregator>,
last_flush_time: SystemTime,
next_flush_deadline: Option<SystemTime>,

// For flush sync barrier
current_flush_version: u64,
current_flush_port: Option<OncePortRef<()>>,
current_unflushed_procs: usize,
}

impl LogClientActor {
Expand Down Expand Up @@ -736,6 +812,12 @@ impl LogClientActor {
OutputTarget::Stderr => eprintln!("{}", message),
}
}

fn flush_internal(&mut self) {
self.print_aggregators();
self.last_flush_time = RealClock.system_time_now();
self.next_flush_deadline = None;
}
}

#[async_trait]
Expand All @@ -754,6 +836,9 @@ impl Actor for LogClientActor {
aggregators,
last_flush_time: RealClock.system_time_now(),
next_flush_deadline: None,
current_flush_version: 0,
current_flush_port: None,
current_unflushed_procs: 0,
})
}
}
Expand Down Expand Up @@ -805,20 +890,26 @@ impl LogMessageHandler for LogClientActor {
let new_deadline = self.last_flush_time + Duration::from_secs(window);
let now = RealClock.system_time_now();
if new_deadline <= now {
self.flush(cx).await?;
self.flush_internal();
} else {
let delay = new_deadline.duration_since(now)?;
match self.next_flush_deadline {
None => {
self.next_flush_deadline = Some(new_deadline);
cx.self_message_with_delay(LogMessage::Flush {}, delay)?;
cx.self_message_with_delay(
LogMessage::Flush { sync_version: None },
delay,
)?;
}
Some(deadline) => {
// Some early log lines have alrady triggered the flush.
if new_deadline < deadline {
// This can happen if the user has adjusted the aggregation window.
self.next_flush_deadline = Some(new_deadline);
cx.self_message_with_delay(LogMessage::Flush {}, delay)?;
cx.self_message_with_delay(
LogMessage::Flush { sync_version: None },
delay,
)?;
}
}
}
Expand All @@ -829,10 +920,45 @@ impl LogMessageHandler for LogClientActor {
Ok(())
}

async fn flush(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
self.print_aggregators();
self.last_flush_time = RealClock.system_time_now();
self.next_flush_deadline = None;
async fn flush(
&mut self,
cx: &Context<Self>,
sync_version: Option<u64>,
) -> Result<(), anyhow::Error> {
match sync_version {
None => {
self.flush_internal();
}
Some(version) => {
if version != self.current_flush_version {
tracing::error!(
"found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
version,
self.current_flush_version
);
return Ok(());
}

if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
// This is a serious issue; it's better to error out.
anyhow::bail!("found no ongoing flush request");
}
self.current_unflushed_procs -= 1;

tracing::debug!(
"ack sync flush: version {}; remaining procs: {}",
self.current_flush_version,
self.current_unflushed_procs
);

if self.current_unflushed_procs == 0 {
self.flush_internal();
let reply = self.current_flush_port.take().unwrap();
self.current_flush_port = None;
reply.send(cx, ()).map_err(anyhow::Error::from)?;
}
}
}

Ok(())
}
Expand All @@ -853,6 +979,34 @@ impl LogClientMessageHandler for LogClientActor {
self.aggregate_window_sec = aggregate_window_sec;
Ok(())
}

async fn start_sync_flush(
&mut self,
cx: &Context<Self>,
expected_procs_flushed: usize,
reply: OncePortRef<()>,
version: OncePortRef<u64>,
) -> Result<(), anyhow::Error> {
if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
tracing::warn!(
"found unfinished ongoing flush: version {}; {} unflushed procs",
self.current_flush_version,
self.current_unflushed_procs,
);
}

self.current_flush_version += 1;
tracing::debug!(
"start sync flush with version {}",
self.current_flush_version
);
self.current_flush_port = Some(reply.clone());
self.current_unflushed_procs = expected_procs_flushed;
version
.send(cx, self.current_flush_version)
.map_err(anyhow::Error::from)?;
Ok(())
}
}

#[cfg(test)]
Expand Down
Loading