diff --git a/hyperactor_mesh/Cargo.toml b/hyperactor_mesh/Cargo.toml index 775fe7636..e63c9a64f 100644 --- a/hyperactor_mesh/Cargo.toml +++ b/hyperactor_mesh/Cargo.toml @@ -66,6 +66,7 @@ libc = "0.2.139" mockall = "0.13.1" ndslice = { version = "0.0.0", path = "../ndslice" } nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] } +notify = "5" opentelemetry = "0.29" pin-project = "1.1.10" preempt_rwlock = { version = "0.0.0", path = "../preempt_rwlock" } @@ -87,6 +88,7 @@ bytes = { version = "1.10", features = ["serde"] } itertools = "0.14.0" maplit = "1.0" proptest = "1.5" +tempfile = "3.22" timed_test = { version = "0.0.0", path = "../timed_test" } tracing-test = { version = "0.2.3", features = ["no-env-filter"] } diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index b93de5510..39a368a9a 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -7,6 +7,7 @@ */ use std::collections::HashMap; +use std::collections::VecDeque; use std::fmt; use std::path::Path; use std::path::PathBuf; @@ -21,6 +22,7 @@ use anyhow::Result; use async_trait::async_trait; use chrono::DateTime; use chrono::Local; +use hostname; use hyperactor::Actor; use hyperactor::ActorRef; use hyperactor::Bind; @@ -45,18 +47,36 @@ use hyperactor::clock::RealClock; use hyperactor::data::Serialized; use hyperactor_telemetry::env; use hyperactor_telemetry::log_file_path; +use notify::Event; +use notify::EventKind; +use notify::RecommendedWatcher; +use notify::Watcher; use serde::Deserialize; use serde::Serialize; +use tokio::fs; +use tokio::fs::File; use tokio::io; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; +use tokio::io::BufReader; +use tokio::io::SeekFrom; use tokio::sync::Mutex; +use tokio::sync::Notify; +use tokio::sync::RwLock; +use tokio::sync::mpsc; +use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::watch::Receiver; +use tokio::task::JoinHandle; use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL; +use crate::shortuuid::ShortUuid; mod line_prefixing_writer; use line_prefixing_writer::LinePrefixingWriter; pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5; +const MAX_LINE_SIZE: usize = 256 * 1024; /// Calculate the Levenshtein distance between two strings fn levenshtein_distance(left: &str, right: &str) -> usize { @@ -317,6 +337,14 @@ pub enum OutputTarget { Stderr, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)] +pub enum Stream { + /// Standard output stream + ChildStdout, + /// Standard error stream + ChildStderr, +} + /// Write the log to a local unix channel so some actors can listen to it and stream the log back. pub struct LocalLogSender { hostname: String, @@ -381,6 +409,7 @@ impl LogSender for LocalLogSender { /// A custom writer that tees to both stdout/stderr. /// It captures output lines and sends them to the child process. +// TODO delete once FileLogMonitor is validated pub struct LogWriter { output_target: OutputTarget, std_writer: S, @@ -396,7 +425,7 @@ fn create_file_writer( OutputTarget::Stderr => "stderr", OutputTarget::Stdout => "stdout", }; - let (path, filename) = log_file_path(env)?; + let (path, filename) = log_file_path(env, None)?; let path = Path::new(&path); let mut full_path = PathBuf::from(path); @@ -565,6 +594,256 @@ impl } } +struct FileWatcher { + rx: UnboundedReceiver, + watcher: RecommendedWatcher, + path: PathBuf, +} +impl FileWatcher { + fn new(path: PathBuf) -> Result { + let (tx, rx) = mpsc::unbounded_channel(); + let mut watcher = notify::recommended_watcher({ + let tx = tx.clone(); + move |res| match res { + Ok(event) => { + if let Err(e) = tx.send(event) { + tracing::warn!("stream watcher dropped: {:?}", e); + } + } + Err(e) => tracing::warn!("stream watcher error: {:?}", e), + } + })?; + watcher.watch(&path.clone(), notify::RecursiveMode::NonRecursive)?; + + Ok(Self { rx, watcher, path }) + } +} + +/// Given a stream forwards data to the provided channel. +pub struct StreamFwder { + fwder: JoinHandle<()>, + // Shared buffer for peek functionality + recent_lines: Arc>>, + max_buffer_size: usize, + // Shutdown signal to stop the monitoring loop + stop: Arc, +} + +impl StreamFwder { + /// Create a new StreamFwder instance, and start monitoring the provided path. + /// Once started Monitor will + /// - foward logs to the provided address + /// - pipe reader to target + /// - And capture last `max_buffer_size` which can be used to inspect file contents via `peek`. + pub async fn start( + reader: impl AsyncRead + Unpin + Send + 'static, + target: OutputTarget, + max_buffer_size: usize, + log_channel: ChannelAddr, + pid: u32, + ) -> Result { + let stop = Arc::new(Notify::new()); + + let path = create_temp_log().await?; + let path_clone = path.clone(); + let file_watcher = FileWatcher::new(path.clone())?; + + // Keep recent lines to allow peeks + let recent_lines = Arc::new(RwLock::new(VecDeque::with_capacity(max_buffer_size))); + let recent_lines_clone = recent_lines.clone(); + + let log_sender = Box::new(LocalLogSender::new(log_channel, pid)?); + let fwd_stop = stop.clone(); + let fwder = tokio::spawn(async move { + if let Err(e) = fwd_on_notify( + file_watcher, + &fwd_stop, + log_sender, + target, + recent_lines_clone, + max_buffer_size, + ) + .await + { + tracing::error!( + "file {} fwder failed: {}", + path.to_string_lossy().into_owned(), + e + ); + } + }); + + Ok(StreamFwder { + fwder, + recent_lines, + max_buffer_size, + stop, + }) + } + + pub async fn abort(self) -> (Vec, Result<(), anyhow::Error>) { + // Send shutdown signal to stop the monitoring loop + self.stop.notify_waiters(); + + let lines = self.peek().await; + let result = match self.fwder.await { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + }; + + (lines, result) + } + + /// Inspect the latest `max_buffer` lines read from the file being monitored + /// Returns lines in chronological order (oldest first) + pub async fn peek(&self) -> Vec { + let lines = self.recent_lines.read().await; + let start_idx = if lines.len() > self.max_buffer_size { + lines.len() - self.max_buffer_size + } else { + 0 + }; + + lines.range(start_idx..).cloned().collect() + } +} + +/// Start monitoring the log file and forwarding content to the logging client +async fn fwd_on_notify( + mut watcher: FileWatcher, + stop: &Arc, + mut log_sender: Box, + target: OutputTarget, + recent_lines: Arc>>, + max_buffer_size: usize, +) -> Result<()> { + let _watcher_guard = watcher.watcher; + let path = watcher.path; + let file = fs::OpenOptions::new().read(true).open(&path).await?; + let mut reader = BufReader::new(file); + let mut position = reader.seek(SeekFrom::End(0)).await?; + + tracing::debug!("Monitoring {:?} for new lines...", path); + + loop { + tokio::select! { + event = watcher.rx.recv() => { + match event { + Some(event) => { + if let EventKind::Modify(_) = &event.kind { + // Get the current file size to detect truncation + let file_metadata = fs::metadata(&path).await?; + let file_size = file_metadata.len(); + + // Handle potential file truncation/rotation + if position > file_size { + tracing::warn!("Log file {:?} appears to have been truncated (position {} > file size {}), resetting to start", path, position, file_size); + reader.seek(SeekFrom::Start(0)).await?; + position = 0; + } else if position < file_size { + reader.seek(SeekFrom::Start(position)).await?; + } + + let mut buf = vec![0u8; 128 * 1024]; + let mut line_buffer = Vec::with_capacity(2048); + let mut lines = Vec::with_capacity(100); + + loop { + let bytes_read = reader.read(&mut buf).await?; + if bytes_read == 0 { + break; + } + + let chunk = &buf[..bytes_read]; + + let mut start = 0; + while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') { + let absolute_pos = start + newline_pos; + + line_buffer.extend_from_slice(&chunk[start..absolute_pos]); + + if !line_buffer.is_empty() { + if line_buffer.len() > MAX_LINE_SIZE { + line_buffer.truncate(MAX_LINE_SIZE); + line_buffer.extend_from_slice(b"... [TRUNCATED]"); + } + + let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048)); + lines.push(line_data); + } + + start = absolute_pos + 1; + } + + if start < chunk.len() { + line_buffer.extend_from_slice(&chunk[start..]); + } + } + + if !line_buffer.is_empty() { + if line_buffer.len() > MAX_LINE_SIZE { + line_buffer.truncate(MAX_LINE_SIZE); + line_buffer.extend_from_slice(b"... [TRUNCATED]"); + } + let line_data = line_buffer.clone(); + lines.push(line_data); + } + + // Add all complete lines to the recent_lines buffer + if !lines.is_empty() { + let mut recent_lines_guard = recent_lines.write().await; + for line_data in &lines { + let line_str = String::from_utf8_lossy(line_data); + recent_lines_guard.push_back(line_str.trim_end().to_string()); + if recent_lines_guard.len() > max_buffer_size { + recent_lines_guard.pop_front(); + } + } + drop(recent_lines_guard); + } + + // TODO(pablorfb) change to batch update + for line in lines { + if let Err(e) = log_sender.send(target, line) { + tracing::error!("Failed to send log lines: {}", e); + } + } + + position = reader.stream_position().await?; + } + } + None => { + tracing::debug!("File event channel closed, stopping monitoring"); + break; + } + } + } + _ = stop.notified() => { + tracing::debug!("Shutdown signal received, stopping monitoring"); + if let Err(e) = log_sender.flush() { + tracing::error!("Failed to send log lines: {}", e); + } + break; + } + } + } + Ok(()) +} + +pub async fn create_temp_log() -> Result { + let key = &ShortUuid::generate().to_string(); + let (parent, filename) = log_file_path(env::Env::current(), Some(key))?; + let path = PathBuf::from(parent).join(filename); + + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + + let _ = File::create(&path).await?; + + Ok(path) +} + /// Messages that can be sent to the LogWriterActor #[derive( Debug, @@ -1226,7 +1505,8 @@ mod tests { self.log_sender .send((output_target, line)) - .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e)) + .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?; + Ok(()) } fn flush(&mut self) -> anyhow::Result<()> { @@ -1448,4 +1728,140 @@ mod tests { // Check that the count is 3 assert_eq!(aggregator.lines[0].count, 3); } + + #[test] + fn test_aggregator_custom_threshold() { + // Test with very strict threshold (0.05) + let mut strict_aggregator = Aggregator::new_with_threshold(0.05); + strict_aggregator.add_line("ERROR 404").unwrap(); + strict_aggregator.add_line("ERROR 500").unwrap(); // Should not merge due to strict threshold + assert_eq!(strict_aggregator.lines.len(), 2); + + // Test with very lenient threshold (0.8) + let mut lenient_aggregator = Aggregator::new_with_threshold(0.8); + lenient_aggregator.add_line("ERROR 404").unwrap(); + lenient_aggregator.add_line("WARNING 200").unwrap(); // Should merge due to lenient threshold + assert_eq!(lenient_aggregator.lines.len(), 1); + assert_eq!(lenient_aggregator.lines[0].count, 2); + } + + #[test] + fn test_format_system_time() { + let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); // 2021-01-01 00:00:00 UTC + let formatted = format_system_time(test_time); + + // Just verify it's a reasonable format (contains date and time components) + assert!(formatted.contains("-")); + assert!(formatted.contains(":")); + assert!(formatted.len() > 10); // Should be reasonable length + } + + #[test] + fn test_aggregator_display_formatting() { + let mut aggregator = Aggregator::new(); + aggregator.add_line("Test error message").unwrap(); + aggregator.add_line("Test error message").unwrap(); // Should merge + + let display_string = format!("{}", aggregator); + + // Verify the output contains expected elements + assert!(display_string.contains("Aggregated Logs")); + assert!(display_string.contains("[2 similar log lines]")); + assert!(display_string.contains("Test error message")); + assert!(display_string.contains(">>>") && display_string.contains("<<<")); + } + + #[tokio::test] + async fn test_local_log_sender_inactive_status() { + let (log_channel, _) = + channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + let mut sender = LocalLogSender::new(log_channel, 12345).unwrap(); + + // This test verifies that the sender handles inactive status gracefully + // In a real scenario, the channel would be closed, but for testing we just + // verify the send/flush methods don't panic + let result = sender.send(OutputTarget::Stdout, b"test".to_vec()); + assert!(result.is_ok()); + + let result = sender.flush(); + assert!(result.is_ok()); + } + + #[test] + fn test_levenshtein_distance_edge_cases() { + // Test with empty strings + assert_eq!(levenshtein_distance("", ""), 0); + assert_eq!(levenshtein_distance("", "hello"), 5); + assert_eq!(levenshtein_distance("hello", ""), 5); + + // Test with identical strings + assert_eq!(levenshtein_distance("hello", "hello"), 0); + + // Test with single character differences + assert_eq!(levenshtein_distance("hello", "helo"), 1); // deletion + assert_eq!(levenshtein_distance("helo", "hello"), 1); // insertion + assert_eq!(levenshtein_distance("hello", "hallo"), 1); // substitution + + // Test with unicode characters + assert_eq!(levenshtein_distance("café", "cafe"), 1); + } + + #[test] + fn test_normalized_edit_distance_edge_cases() { + // Test with empty strings + assert_eq!(normalized_edit_distance("", ""), 0.0); + + // Test normalization + assert_eq!(normalized_edit_distance("hello", ""), 1.0); + assert_eq!(normalized_edit_distance("", "hello"), 1.0); + + // Test that result is always between 0.0 and 1.0 + let distance = normalized_edit_distance("completely", "different"); + assert!(distance >= 0.0 && distance <= 1.0); + } + + #[tokio::test] + async fn test_deserialize_message_lines_edge_cases() { + // Test with empty string + let empty_message = "".to_string(); + let serialized = Serialized::serialize(&empty_message).unwrap(); + let result = deserialize_message_lines(&serialized).unwrap(); + assert_eq!(result, Vec::::new()); + + // Test with trailing newline + let trailing_newline = "line1\nline2\n".to_string(); + let serialized = Serialized::serialize(&trailing_newline).unwrap(); + let result = deserialize_message_lines(&serialized).unwrap(); + assert_eq!(result, vec!["line1", "line2"]); + } + + #[test] + fn test_output_target_serialization() { + // Test that OutputTarget can be serialized and deserialized + let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap(); + let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap(); + + let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap(); + let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap(); + + assert_eq!(stdout_deserialized, OutputTarget::Stdout); + assert_eq!(stderr_deserialized, OutputTarget::Stderr); + } + + #[test] + fn test_log_line_display_formatting() { + let log_line = LogLine::new("Test message".to_string()); + let display_string = format!("{}", log_line); + + assert!(display_string.contains("[1 similar log lines]")); + assert!(display_string.contains("Test message")); + + // Test with higher count + let mut log_line_multi = LogLine::new("Test message".to_string()); + log_line_multi.count = 5; + let display_string_multi = format!("{}", log_line_multi); + + assert!(display_string_multi.contains("[5 similar log lines]")); + assert!(display_string_multi.contains("Test message")); + } } diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 8d2379660..4331c03df 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -125,7 +125,12 @@ pub fn username() -> String { } // Given an environment, determine the log file path to write to. -pub fn log_file_path(env: env::Env) -> Result<(String, String), anyhow::Error> { +// If a suffix is provided, it will be prepended with "_" and then appended to file name +pub fn log_file_path( + env: env::Env, + suffix: Option<&str>, +) -> Result<(String, String), anyhow::Error> { + let suffix = suffix.map(|s| format!("_{}", s)).unwrap_or_default(); match env { env::Env::Local | env::Env::MastEmulator => { let username = if whoami::username().is_empty() { @@ -133,9 +138,15 @@ pub fn log_file_path(env: env::Env) -> Result<(String, String), anyhow::Error> { } else { whoami::username() }; - Ok((format!("/tmp/{}", username), "monarch_log".to_string())) + Ok(( + format!("/tmp/{}", username), + format!("monarch_log{}", suffix), + )) } - env::Env::Mast => Ok(("/logs/".to_string(), "dedicated_log_monarch".to_string())), + env::Env::Mast => Ok(( + "/logs/".to_string(), + format!("dedicated_log_monarch{}", suffix), + )), _ => Err(anyhow::anyhow!( "file writer unsupported for environment {}", env @@ -161,7 +172,7 @@ fn writer() -> Box { match env::Env::current() { env::Env::Test => Box::new(std::io::stderr()), env::Env::Local | env::Env::MastEmulator | env::Env::Mast => { - let (path, filename) = log_file_path(env::Env::current()).unwrap(); + let (path, filename) = log_file_path(env::Env::current(), None).unwrap(); match try_create_appender(&path, &filename, true) { Ok(file_appender) => Box::new(file_appender), Err(e) => {