diff --git a/hyperactor_mesh/Cargo.toml b/hyperactor_mesh/Cargo.toml index 775fe7636..e63c9a64f 100644 --- a/hyperactor_mesh/Cargo.toml +++ b/hyperactor_mesh/Cargo.toml @@ -66,6 +66,7 @@ libc = "0.2.139" mockall = "0.13.1" ndslice = { version = "0.0.0", path = "../ndslice" } nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] } +notify = "5" opentelemetry = "0.29" pin-project = "1.1.10" preempt_rwlock = { version = "0.0.0", path = "../preempt_rwlock" } @@ -87,6 +88,7 @@ bytes = { version = "1.10", features = ["serde"] } itertools = "0.14.0" maplit = "1.0" proptest = "1.5" +tempfile = "3.22" timed_test = { version = "0.0.0", path = "../timed_test" } tracing-test = { version = "0.2.3", features = ["no-env-filter"] } diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index c6231d1a9..25b3dcc37 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -58,7 +58,10 @@ use tokio::sync::oneshot; use tokio::sync::watch; use crate::alloc::logtailer::LogTailer; +use crate::logging::LogFileMonitor; +use crate::logging::OutputTarget; use crate::logging::create_log_writers; +use crate::logging::create_temp_log; use crate::proc_mesh::mesh_agent::ProcMeshAgent; use crate::v1; use crate::v1::host_mesh::mesh_agent::HostAgentMode; @@ -75,7 +78,7 @@ declare_attrs! { pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true; /// Maximum number of log lines retained in a proc's stderr/stdout - /// tail buffer. Used by [`LogTailer::tee`] when wiring child + /// tail buffer. Used by [`LogFileMonitor`] when wiring child /// pipes. Default: 100 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()) pub attr MESH_TAIL_LOG_LINES: usize = 100; @@ -92,6 +95,10 @@ declare_attrs! { /// the child to exit before escalating to SIGKILL. @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()) pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10); + + /// Determines whether log tailer (false) or LogFileMonitor is enabled. + @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_ENABLE_LOG_MONITOR".to_string()) + pub attr MESH_ENABLE_LOG_MONITOR: bool = true; } pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR"; @@ -610,6 +617,14 @@ pub struct BootstrapProcHandle { /// Stderr tailer for this proc. Same behavior as `stdout_tailer` /// but for stderr (used for exit-reason enrichment). stderr_tailer: Arc>>, + /// Stdout monitor for this proc. Created with `LogFileMonitor::forward_to_client`, it + /// forwards output to a log channel and keeps a bounded ring buffer. + /// Transferred to the exit monitor, which joins it after `wait()` + /// to recover buffered lines. + stdout_monitor: Arc>>, + /// Stderr monitor for this proc. Same behavior as `stdout_monitor` + /// but for stderr (used for exit-reason enrichment). + stderr_monitor: Arc>>, /// Watch sender for status transitions. Every `mark_*` goes /// through [`BootstrapProcHandle::transition`], which updates the /// snapshot under the lock and then `send`s the new @@ -662,6 +677,8 @@ impl BootstrapProcHandle { child: Arc::new(std::sync::Mutex::new(Some(child))), stdout_tailer: Arc::new(std::sync::Mutex::new(None)), stderr_tailer: Arc::new(std::sync::Mutex::new(None)), + stdout_monitor: Arc::new(std::sync::Mutex::new(None)), + stderr_monitor: Arc::new(std::sync::Mutex::new(None)), tx, rx, } @@ -1059,6 +1076,7 @@ impl BootstrapProcHandle { } } + // TODO Delete once migrated to LogFileMonitor pub fn set_tailers(&self, out: Option, err: Option) { *self .stdout_tailer @@ -1083,6 +1101,31 @@ impl BootstrapProcHandle { .take(); (out, err) } + + pub fn set_stream_monitors(&self, out: Option, err: Option) { + *self + .stdout_monitor + .lock() + .expect("stdout_tailer mutex poisoned") = out; + *self + .stderr_monitor + .lock() + .expect("stderr_tailer mutex poisoned") = err; + } + + fn take_stream_monitors(&self) -> (Option, Option) { + let out = self + .stdout_monitor + .lock() + .expect("stdout_tailer mutex poisoned") + .take(); + let err = self + .stderr_monitor + .lock() + .expect("stderr_tailer mutex poisoned") + .take(); + (out, err) + } } #[async_trait] @@ -1481,8 +1524,6 @@ impl BootstrapProcManager { fn spawn_exit_monitor(&self, proc_id: ProcId, handle: BootstrapProcHandle) { let pid_table = Arc::clone(&self.pid_table); - let (stdout_tailer, stderr_tailer) = handle.take_tailers(); - let maybe_child = { let mut guard = handle.child.lock().expect("child mutex"); let taken = guard.take(); @@ -1499,12 +1540,26 @@ impl BootstrapProcManager { let wait_res = child.wait().await; let mut stderr_tail: Vec = Vec::new(); - if let Some(t) = stderr_tailer { - let (lines, _bytes) = t.join().await; - stderr_tail = lines; - } - if let Some(t) = stdout_tailer { - let (_lines, _bytes) = t.join().await; + let enable_log_monitor = hyperactor::config::global::get(MESH_ENABLE_LOG_MONITOR); + if enable_log_monitor { + let (stdout_mon, stderr_mon) = handle.take_stream_monitors(); + + if let Some(t) = stderr_mon { + let (lines, _bytes) = t.abort().await; + stderr_tail = lines; + } + if let Some(t) = stdout_mon { + let (_lines, _bytes) = t.abort().await; + } + } else { + let (stdout_tailer, stderr_tailer) = handle.take_tailers(); + if let Some(t) = stderr_tailer { + let (lines, _bytes) = t.join().await; + stderr_tail = lines; + } + if let Some(t) = stdout_tailer { + let (_lines, _bytes) = t.join().await; + } } match wait_res { @@ -1574,6 +1629,50 @@ impl BootstrapProcManager { } } +/// Create a shell command that wraps the original command with tee for logging +/// Returns the shell command +pub fn tee_cmd(cmd: &Command, out_path: Option, err_path: Option) -> Command { + let script = match (out_path.clone(), err_path.clone()) { + (Some(_), Some(_)) => { + r#" + "$0" "$@" > >(tee -a "$MONARCH_LOG_FILE") 2> >(tee -a "$MONARCH_ERR_FILE" >&2) + "# + } + (Some(_), None) => { + r#" + "$0" "$@" > >(tee -a "$MONARCH_LOG_FILE") + "# + } + (None, Some(_)) => { + r#" + "$0" "$@" 2> >(tee -a "$MONARCH_ERR_FILE" >&2) + "# + } + (None, None) => r#" "$0" "$@" "#, + }; + + let mut sh = Command::new("/bin/sh"); + let std_cmd = cmd.as_std(); + let program = std_cmd.get_program().to_os_string(); + let args: Vec<_> = std_cmd.get_args().map(|a| a.to_os_string()).collect(); + + // Copy all environment variables from the original command + for (key, value) in std_cmd.get_envs() { + if let (Some(key), Some(value)) = (key.to_str(), value.and_then(|v| v.to_str())) { + sh.env(key, value); + } + } + + sh.arg("-c") + .arg(script) + .arg(program) + .args(args) + .env("MONARCH_LOG_FILE", out_path.unwrap_or_default()) + .env("MONARCH_ERR_FILE", err_path.unwrap_or_default()); + + sh +} + #[async_trait] impl ProcManager for BootstrapProcManager { type Handle = BootstrapProcHandle; @@ -1644,16 +1743,23 @@ impl ProcManager for BootstrapProcManager { "HYPERACTOR_MESH_BOOTSTRAP_MODE", mode.to_env_safe_string() .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?, - ) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); + ); let log_channel = ChannelAddr::any(ChannelTransport::Unix); cmd.env(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string()); - let child = cmd - .spawn() - .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?; + let enable_log_monitor = hyperactor::config::global::get(MESH_ENABLE_LOG_MONITOR); + let stdout_path = create_temp_log().await.ok(); + let stderr_path = create_temp_log().await.ok(); + let child = if enable_log_monitor { + tee_cmd(&cmd, stdout_path.clone(), stderr_path.clone()) + .spawn() + .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))? + } else { + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + cmd.spawn() + .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))? + }; let pid = child.id().unwrap_or_default(); let handle = BootstrapProcHandle::new(proc_id.clone(), child); @@ -1665,31 +1771,71 @@ impl ProcManager for BootstrapProcManager { } } - // Writers: tee to local (stdout/stderr or file) + send over - // channel - let (out_writer, err_writer) = create_log_writers(0, log_channel.clone(), pid) - .unwrap_or_else(|_| (Box::new(tokio::io::stdout()), Box::new(tokio::io::stderr()))); + if enable_log_monitor { + let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES); - let mut stdout_tailer: Option = None; - let mut stderr_tailer: Option = None; + let stdout_monitor = stdout_path.and_then(|path| { + match LogFileMonitor::start( + path, + OutputTarget::Stdout, + tail_size, + log_channel.clone(), + pid, + ) { + Ok(monitor) => Some(monitor), + Err(err) => { + tracing::warn!( + "Could not create log monitor for {:?} due to {}", + OutputTarget::Stdout, + err + ); + None + } + } + }); - // Take the pipes from the child. - { - let mut guard = handle.child.lock().expect("child mutex poisoned"); - if let Some(child) = guard.as_mut() { - // LogTailer::tee forwards to our writers and keeps a - // tail buffer. - let max_tail_lines = hyperactor::config::global::get(MESH_TAIL_LOG_LINES); - if let Some(out) = child.stdout.take() { - stdout_tailer = Some(LogTailer::tee(max_tail_lines, out, out_writer)); + let stderr_monitor = stderr_path.and_then(|path| { + match LogFileMonitor::start(path, OutputTarget::Stderr, tail_size, log_channel, pid) + { + Ok(monitor) => Some(monitor), + Err(err) => { + tracing::warn!( + "Could not create log monitor for {:?} due to {}", + OutputTarget::Stderr, + err + ); + None + } } - if let Some(err) = child.stderr.take() { - stderr_tailer = Some(LogTailer::tee(max_tail_lines, err, err_writer)); + }); + handle.set_stream_monitors(stdout_monitor, stderr_monitor); + } else { + // Writers: tee to local (stdout/stderr or file) + send over + // channel + let (out_writer, err_writer) = create_log_writers(0, log_channel.clone(), pid) + .unwrap_or_else(|_| (Box::new(tokio::io::stdout()), Box::new(tokio::io::stderr()))); + + let mut stdout_tailer: Option = None; + let mut stderr_tailer: Option = None; + + // Take the pipes from the child. + { + let mut guard = handle.child.lock().expect("child mutex poisoned"); + if let Some(child) = guard.as_mut() { + // LogTailer::tee forwards to our writers and keeps a + // tail buffer. + let max_tail_lines = hyperactor::config::global::get(MESH_TAIL_LOG_LINES); + if let Some(out) = child.stdout.take() { + stdout_tailer = Some(LogTailer::tee(max_tail_lines, out, out_writer)); + } + if let Some(err) = child.stderr.take() { + stderr_tailer = Some(LogTailer::tee(max_tail_lines, err, err_writer)); + } + // Make the tailers visible to the exit monitor. + handle.set_tailers(stdout_tailer.take(), stderr_tailer.take()); + } else { + tracing::debug!("proc {proc_id}: child already taken before wiring IO"); } - // Make the tailers visible to the exit monitor. - handle.set_tailers(stdout_tailer.take(), stderr_tailer.take()); - } else { - tracing::debug!("proc {proc_id}: child already taken before wiring IO"); } } @@ -2037,7 +2183,6 @@ mod tests { use ndslice::Extent; use ndslice::ViewExt; use ndslice::extent; - use tokio::io; use tokio::process::Command; use super::*; @@ -3223,6 +3368,7 @@ mod tests { unsafe { std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false"); } + hyperactor_telemetry::initialize_logging_for_test(); // Create a "root" direct addressed proc. let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string()) .await @@ -3336,54 +3482,26 @@ mod tests { #[tokio::test] async fn exit_tail_is_attached_and_logged() { + hyperactor_telemetry::initialize_logging_for_test(); // Spawn a child that writes to stderr then exits 7. - let mut cmd = Command::new("sh"); - cmd.arg("-c") - .arg("printf 'boom-1\\nboom-2\\n' 1>&2; exit 7") - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - let child = cmd.spawn().expect("spawn"); + let boot_cmd = BootstrapCommand { + program: PathBuf::from("sh"), + arg0: None, + args: vec![ + "-c".to_string(), + // Add sleep to ensure LogFileMonitor has time to process logs + "printf 'boom-1\n' 1>&2; printf 'boom-2\n' 1>&2; sleep 1; exit 7".to_string(), + ], + env: HashMap::new(), + }; // Build a BootstrapProcHandle around this child (like // manager.spawn() does). let proc_id = hyperactor::id!(testproc[0]); - let handle = BootstrapProcHandle::new(proc_id.clone(), child); - // Wire tailers + dummy writers (stdout/stderr -> sinks), then - // stash them on the handle. - { - // Lock the child to get its pipes. - let mut guard = handle.child.lock().expect("child mutex poisoned"); - if let Some(child) = guard.as_mut() { - let out = child.stdout.take().expect("child stdout must be piped"); - let err = child.stderr.take().expect("child stderr must be piped"); - - // Use sinks as our "writers" (we don't care about - // forwarding in this test) - let out_writer: Box = Box::new(io::sink()); - let err_writer: Box = Box::new(io::sink()); - - // Create the tailers (they spawn background drainers). - let max_tail_lines = 3; - let out_tailer = LogTailer::tee(max_tail_lines, out, out_writer); - let err_tailer = LogTailer::tee(max_tail_lines, err, err_writer); - - // Make them visible to the exit monitor (so it can - // join on exit). - handle.set_tailers(Some(out_tailer), Some(err_tailer)); - } else { - panic!("child already taken before wiring tailers"); - } - } - - // Start an exit monitor (consumes the Child and tailers from - // the handle). - let manager = BootstrapProcManager::new(BootstrapCommand { - program: std::path::PathBuf::from("/bin/true"), // unused in this test - ..Default::default() - }); - manager.spawn_exit_monitor(proc_id.clone(), handle.clone()); + let addr = ChannelAddr::any(ChannelTransport::Unix); + let manager = BootstrapProcManager::new(boot_cmd); + let handle = manager.spawn(proc_id.clone(), addr).await.unwrap(); // Await terminal status and assert on exit code and stderr // tail. diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index b93de5510..ee9237ffa 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -7,6 +7,7 @@ */ use std::collections::HashMap; +use std::collections::VecDeque; use std::fmt; use std::path::Path; use std::path::PathBuf; @@ -21,6 +22,7 @@ use anyhow::Result; use async_trait::async_trait; use chrono::DateTime; use chrono::Local; +use hostname; use hyperactor::Actor; use hyperactor::ActorRef; use hyperactor::Bind; @@ -45,18 +47,33 @@ use hyperactor::clock::RealClock; use hyperactor::data::Serialized; use hyperactor_telemetry::env; use hyperactor_telemetry::log_file_path; +use notify::Event; +use notify::EventKind; +use notify::RecommendedWatcher; +use notify::Watcher; use serde::Deserialize; use serde::Serialize; +use tokio::fs; +use tokio::fs::File; use tokio::io; +use tokio::io::AsyncBufReadExt; +use tokio::io::AsyncSeekExt; +use tokio::io::BufReader; +use tokio::io::SeekFrom; use tokio::sync::Mutex; +use tokio::sync::RwLock; +use tokio::sync::mpsc; use tokio::sync::watch::Receiver; +use tokio::task::JoinHandle; use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL; +use crate::shortuuid::ShortUuid; mod line_prefixing_writer; use line_prefixing_writer::LinePrefixingWriter; pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5; +const MAX_LINE_SIZE: usize = 256 * 1024; /// Calculate the Levenshtein distance between two strings fn levenshtein_distance(left: &str, right: &str) -> usize { @@ -396,7 +413,7 @@ fn create_file_writer( OutputTarget::Stderr => "stderr", OutputTarget::Stdout => "stdout", }; - let (path, filename) = log_file_path(env)?; + let (path, filename) = log_file_path(env, None)?; let path = Path::new(&path); let mut full_path = PathBuf::from(path); @@ -565,6 +582,197 @@ impl } } +/// File monitor which watches a log file for changes +pub struct LogFileMonitor { + monitor: JoinHandle<()>, + // Shared buffer for peek functionality + recent_lines: Arc>>, + max_buffer_size: usize, + // Shutdown signal to stop the monitoring loop + shutdown_tx: Option>, +} + +impl LogFileMonitor { + /// Create a new LogFileMonitor instance, and start monitoring the provided path. + /// Once started + /// - Monitor will foward logs to the provided address + /// - And capture last `max_buffer_size` which can be used to inspect file contents via `peek`. + pub fn start( + path: PathBuf, + target: OutputTarget, + max_buffer_size: usize, + log_channel: ChannelAddr, + pid: u32, + ) -> Result { + let log_sender = Box::new(LocalLogSender::new(log_channel, pid)?); + + // Create shared buffer for recent lines + let recent_lines = Arc::new(RwLock::new(VecDeque::with_capacity(max_buffer_size))); + let recent_lines_clone = recent_lines.clone(); + + let (tx, rx) = mpsc::unbounded_channel(); + let (shutdown_tx, shutdown_rx) = mpsc::unbounded_channel(); + + let event_handler = { + let tx = tx.clone(); + move |res| match res { + Ok(event) => { + tracing::info!("got notification"); + if let Err(e) = tx.send(event) { + tracing::warn!("notify log file change dropped: {:?}", e); + } + } + Err(e) => tracing::warn!("log file watcher error: {:?}", e), + } + }; + let mut watcher = notify::recommended_watcher(event_handler)?; + + // Watch the path + watcher.watch(&path, notify::RecursiveMode::NonRecursive)?; + + // Clone path for the async task since it moves ownership + let path_clone = path.clone(); + let monitor = tokio::spawn(async move { + if let Err(e) = start_monitoring( + watcher, + rx, + shutdown_rx, + path_clone.clone(), + log_sender, + target, + Some(recent_lines_clone), + ) + .await + { + tracing::error!( + "file {} monitor failed: {}", + path_clone.to_string_lossy().into_owned(), + e + ); + } + }); + + Ok(LogFileMonitor { + monitor, + recent_lines, + max_buffer_size, + shutdown_tx: Some(shutdown_tx), + }) + } + + pub async fn abort(mut self) -> (Vec, Result<(), anyhow::Error>) { + // Send shutdown signal to stop the monitoring loop + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); // Ignore error if receiver is already dropped + } + + let lines = self.peek().await; + let result = match self.monitor.await { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + }; + + (lines, result) + } + + /// Inspect the latest `max_buffer` lines read from the file being monitored + /// Returns lines in chronological order (oldest first) + pub async fn peek(&self) -> Vec { + let lines = self.recent_lines.read().await; + let start_idx = if lines.len() > self.max_buffer_size { + lines.len() - self.max_buffer_size + } else { + 0 + }; + + lines.range(start_idx..).cloned().collect() + } +} + +/// Start monitoring the log file and forwarding content to the logging client +async fn start_monitoring( + watcher: RecommendedWatcher, + mut rx: mpsc::UnboundedReceiver, + mut shutdown_rx: mpsc::UnboundedReceiver<()>, + path: PathBuf, + mut log_sender: Box, + target: OutputTarget, + recent_lines: Option>>>, +) -> Result<()> { + let file = fs::OpenOptions::new().read(true).open(&path).await?; + + let mut reader = BufReader::new(file); + let mut position = reader.seek(SeekFrom::End(0)).await?; + + tracing::info!("Monitoring {:?} for new lines...", path); + + let _watcher_guard = watcher; + + loop { + tokio::select! { + event = rx.recv() => { + match event { + Some(event) => { + if let EventKind::Modify(_) = &event.kind { + // seek to last known position + reader.seek(SeekFrom::Start(position)).await?; + let mut buf = String::new(); + + // read all new lines + while reader.read_line(&mut buf).await? > 0 { + // Truncate the line if it's too long + if buf.len() > MAX_LINE_SIZE { + buf.truncate(MAX_LINE_SIZE); + buf.push_str("... [TRUNCATED]"); + } + + let line = buf.trim_end().to_string(); + + // Store line in buffer if provided + if let Some(ref recent_lines_ref) = recent_lines { + let mut recent_lines = recent_lines_ref.write().await; + recent_lines.push_back(line.clone()); + } + + if let Err(e) = log_sender.send(target, line.into_bytes()) { + tracing::error!("Failed to send log line: {}", e); + } + + buf.clear(); + } + + position = reader.seek(SeekFrom::Current(0)).await?; + } + } + None => { + tracing::debug!("File event channel closed, stopping monitoring"); + break; + } + } + } + _ = shutdown_rx.recv() => { + tracing::debug!("Shutdown signal received, stopping monitoring"); + break; + } + } + } + Ok(()) +} + +pub async fn create_temp_log() -> Result { + let key = &ShortUuid::generate().to_string(); + let (parent, filename) = log_file_path(env::Env::current(), Some(key))?; + let path = PathBuf::from(parent).join(filename); + + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + + let _ = File::create(&path).await?; + + Ok(path) +} + /// Messages that can be sent to the LogWriterActor #[derive( Debug, @@ -1448,4 +1656,201 @@ mod tests { // Check that the count is 3 assert_eq!(aggregator.lines[0].count, 3); } + + #[tokio::test] + async fn test_log_file_monitor_creation_and_forwarding() { + hyperactor_telemetry::initialize_logging_for_test(); + use std::io::Write as StdWrite; + // Create a temporary directory and file + let temp_dir = tempfile::TempDir::new().unwrap(); + let log_file_path = temp_dir.path().join("test.log"); + + // Create the file and write initial content + let mut file = std::fs::File::create(&log_file_path).unwrap(); + writeln!(file, "Initial log line").unwrap(); + file.sync_all().unwrap(); + drop(file); + + // Set up log channel for monitoring + let (log_channel, mut rx) = + channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + + // Create LogFileMonitor + let monitor = LogFileMonitor::start( + log_file_path.clone(), + OutputTarget::Stdout, + 3, // max_buffer_size + log_channel, + 12345, // pid + ) + .unwrap(); + + RealClock + .sleep(std::time::Duration::from_millis(2000)) + .await; + + // Append new content to the file + let mut file = std::fs::OpenOptions::new() + .append(true) + .open(&log_file_path) + .unwrap(); + for i in 1..=50 { + writeln!(file, "New log line {}", i).unwrap(); + } + file.sync_all().unwrap(); + drop(file); + + // Wait until log sender gets message + let timeout = Duration::from_secs(5); + let _ = RealClock + .timeout(timeout, rx.recv()) + .await + .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout)); + + let recent_lines = monitor.peek().await; + + assert!( + recent_lines.len() == 3, + "Expected buffer with size 3, got {}", + recent_lines.len() + ); + + let (_lines, _result) = monitor.abort().await; + } + + #[test] + fn test_aggregator_custom_threshold() { + // Test with very strict threshold (0.05) + let mut strict_aggregator = Aggregator::new_with_threshold(0.05); + strict_aggregator.add_line("ERROR 404").unwrap(); + strict_aggregator.add_line("ERROR 500").unwrap(); // Should not merge due to strict threshold + assert_eq!(strict_aggregator.lines.len(), 2); + + // Test with very lenient threshold (0.8) + let mut lenient_aggregator = Aggregator::new_with_threshold(0.8); + lenient_aggregator.add_line("ERROR 404").unwrap(); + lenient_aggregator.add_line("WARNING 200").unwrap(); // Should merge due to lenient threshold + assert_eq!(lenient_aggregator.lines.len(), 1); + assert_eq!(lenient_aggregator.lines[0].count, 2); + } + + #[test] + fn test_format_system_time() { + let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); // 2021-01-01 00:00:00 UTC + let formatted = format_system_time(test_time); + + // Just verify it's a reasonable format (contains date and time components) + assert!(formatted.contains("-")); + assert!(formatted.contains(":")); + assert!(formatted.len() > 10); // Should be reasonable length + } + + #[test] + fn test_aggregator_display_formatting() { + let mut aggregator = Aggregator::new(); + aggregator.add_line("Test error message").unwrap(); + aggregator.add_line("Test error message").unwrap(); // Should merge + + let display_string = format!("{}", aggregator); + + // Verify the output contains expected elements + assert!(display_string.contains("Aggregated Logs")); + assert!(display_string.contains("[2 similar log lines]")); + assert!(display_string.contains("Test error message")); + assert!(display_string.contains(">>>") && display_string.contains("<<<")); + } + + #[tokio::test] + async fn test_local_log_sender_inactive_status() { + let (log_channel, _) = + channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + let mut sender = LocalLogSender::new(log_channel, 12345).unwrap(); + + // This test verifies that the sender handles inactive status gracefully + // In a real scenario, the channel would be closed, but for testing we just + // verify the send/flush methods don't panic + let result = sender.send(OutputTarget::Stdout, b"test".to_vec()); + assert!(result.is_ok()); + + let result = sender.flush(); + assert!(result.is_ok()); + } + + #[test] + fn test_levenshtein_distance_edge_cases() { + // Test with empty strings + assert_eq!(levenshtein_distance("", ""), 0); + assert_eq!(levenshtein_distance("", "hello"), 5); + assert_eq!(levenshtein_distance("hello", ""), 5); + + // Test with identical strings + assert_eq!(levenshtein_distance("hello", "hello"), 0); + + // Test with single character differences + assert_eq!(levenshtein_distance("hello", "helo"), 1); // deletion + assert_eq!(levenshtein_distance("helo", "hello"), 1); // insertion + assert_eq!(levenshtein_distance("hello", "hallo"), 1); // substitution + + // Test with unicode characters + assert_eq!(levenshtein_distance("café", "cafe"), 1); + } + + #[test] + fn test_normalized_edit_distance_edge_cases() { + // Test with empty strings + assert_eq!(normalized_edit_distance("", ""), 0.0); + + // Test normalization + assert_eq!(normalized_edit_distance("hello", ""), 1.0); + assert_eq!(normalized_edit_distance("", "hello"), 1.0); + + // Test that result is always between 0.0 and 1.0 + let distance = normalized_edit_distance("completely", "different"); + assert!(distance >= 0.0 && distance <= 1.0); + } + + #[tokio::test] + async fn test_deserialize_message_lines_edge_cases() { + // Test with empty string + let empty_message = "".to_string(); + let serialized = Serialized::serialize(&empty_message).unwrap(); + let result = deserialize_message_lines(&serialized).unwrap(); + assert_eq!(result, Vec::::new()); + + // Test with trailing newline + let trailing_newline = "line1\nline2\n".to_string(); + let serialized = Serialized::serialize(&trailing_newline).unwrap(); + let result = deserialize_message_lines(&serialized).unwrap(); + assert_eq!(result, vec!["line1", "line2"]); + } + + #[test] + fn test_output_target_serialization() { + // Test that OutputTarget can be serialized and deserialized + let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap(); + let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap(); + + let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap(); + let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap(); + + assert_eq!(stdout_deserialized, OutputTarget::Stdout); + assert_eq!(stderr_deserialized, OutputTarget::Stderr); + } + + #[test] + fn test_log_line_display_formatting() { + let log_line = LogLine::new("Test message".to_string()); + let display_string = format!("{}", log_line); + + assert!(display_string.contains("[1 similar log lines]")); + assert!(display_string.contains("Test message")); + + // Test with higher count + let mut log_line_multi = LogLine::new("Test message".to_string()); + log_line_multi.count = 5; + let display_string_multi = format!("{}", log_line_multi); + + assert!(display_string_multi.contains("[5 similar log lines]")); + assert!(display_string_multi.contains("Test message")); + } } diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 8d2379660..4331c03df 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -125,7 +125,12 @@ pub fn username() -> String { } // Given an environment, determine the log file path to write to. -pub fn log_file_path(env: env::Env) -> Result<(String, String), anyhow::Error> { +// If a suffix is provided, it will be prepended with "_" and then appended to file name +pub fn log_file_path( + env: env::Env, + suffix: Option<&str>, +) -> Result<(String, String), anyhow::Error> { + let suffix = suffix.map(|s| format!("_{}", s)).unwrap_or_default(); match env { env::Env::Local | env::Env::MastEmulator => { let username = if whoami::username().is_empty() { @@ -133,9 +138,15 @@ pub fn log_file_path(env: env::Env) -> Result<(String, String), anyhow::Error> { } else { whoami::username() }; - Ok((format!("/tmp/{}", username), "monarch_log".to_string())) + Ok(( + format!("/tmp/{}", username), + format!("monarch_log{}", suffix), + )) } - env::Env::Mast => Ok(("/logs/".to_string(), "dedicated_log_monarch".to_string())), + env::Env::Mast => Ok(( + "/logs/".to_string(), + format!("dedicated_log_monarch{}", suffix), + )), _ => Err(anyhow::anyhow!( "file writer unsupported for environment {}", env @@ -161,7 +172,7 @@ fn writer() -> Box { match env::Env::current() { env::Env::Test => Box::new(std::io::stderr()), env::Env::Local | env::Env::MastEmulator | env::Env::Mast => { - let (path, filename) = log_file_path(env::Env::current()).unwrap(); + let (path, filename) = log_file_path(env::Env::current(), None).unwrap(); match try_create_appender(&path, &filename, true) { Ok(file_appender) => Box::new(file_appender), Err(e) => {