diff --git a/hyperactor_telemetry/Cargo.toml b/hyperactor_telemetry/Cargo.toml index d01e23b0f..82cac324f 100644 --- a/hyperactor_telemetry/Cargo.toml +++ b/hyperactor_telemetry/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry +# @generated by autocargo from //monarch/hyperactor_telemetry:[correctness_test,hyperactor_telemetry,telemetry_benchmark] [package] name = "hyperactor_telemetry" @@ -10,13 +10,23 @@ license = "BSD-3-Clause" [lib] edition = "2024" +[[bench]] +name = "correctness_test" +edition = "2024" + +[[bench]] +name = "telemetry_benchmark" +edition = "2024" + [dependencies] anyhow = "1.0.98" chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false } dashmap = { version = "5.5.3", features = ["rayon", "serde"] } fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } hdrhistogram = "7.5" +indexmap = { version = "2.9.0", features = ["arbitrary", "rayon", "serde"] } lazy_static = "1.5" +libc = "0.2.139" opentelemetry = "0.29" opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] } rand = { version = "0.8", features = ["small_rng"] } diff --git a/hyperactor_telemetry/benches/correctness_test.rs b/hyperactor_telemetry/benches/correctness_test.rs new file mode 100644 index 000000000..b41babdbc --- /dev/null +++ b/hyperactor_telemetry/benches/correctness_test.rs @@ -0,0 +1,621 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Correctness test harness comparing old vs unified telemetry implementations. +//! +//! This test harness runs identical workloads through both implementations and +//! verifies that the outputs are equivalent across all exporters: +//! - Glog: Read log files and compare lines +//! - SQLite: Query database and compare rows +//! +//! Usage: +//! buck2 run //monarch/hyperactor_telemetry:correctness_test + +#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock` + +use std::path::PathBuf; + +use anyhow::Result; +use hyperactor_telemetry::*; + +struct TestResults { + glog_path: Option, + sqlite_path: Option, + #[allow(dead_code)] + _sqlite_tracing: Option, +} + +struct CorrectnessTestHarness {} + +impl CorrectnessTestHarness { + fn run(&self, workload: F, unified: bool) -> Result + where + F: Fn(), + { + initialize_logging_with_log_prefix( + DefaultTelemetryClock {}, + Some("TEST_LOG_PREFIX".to_string()), + ); + + let sqlite_tracing = if unified { + None + } else { + let sqlite_tracing = hyperactor_telemetry::sqlite::SqliteTracing::new() + .expect("Failed to create SqliteTracing"); + let db_path = sqlite_tracing.db_path().expect("No db_path"); + println!("SqliteTracing created successfully, db_path: {:?}", db_path); + println!("Database exists: {}", db_path.exists()); + Some(sqlite_tracing) + }; + + workload(); + + std::thread::sleep(std::time::Duration::from_millis(300)); + + let username = whoami::username(); + let possible_paths = vec![ + format!( + "/tmp/{}/hyperactor_trace_{}.db", + username, + std::process::id() + ), + format!("/tmp/hyperactor_trace_{}.db", std::process::id()), + format!("/tmp/traces/hyperactor_trace_{}.db", std::process::id()), + format!("./hyperactor_trace_{}.db", std::process::id()), + ]; + + let mut sqlite_path = None; + for path in possible_paths { + if std::path::Path::new(&path).exists() { + sqlite_path = Some(PathBuf::from(path)); + break; + } + } + + Ok(TestResults { + sqlite_path, + glog_path: Self::find_glog_path(), + _sqlite_tracing: sqlite_tracing, + }) + } + + fn find_glog_path() -> Option { + let username = whoami::username(); + let suffix = std::env::var(hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV) + .map(|s| format!("_{}", s)) + .unwrap_or_default(); + let possible_paths = vec![ + format!("/tmp/{}/monarch_log{}.log", username, suffix), + format!("/tmp/monarch_log{}.log", suffix), + format!("/logs/dedicated_log_monarch{}.log", suffix), + ]; + + for path in possible_paths { + if std::path::Path::new(&path).exists() { + return Some(PathBuf::from(path)); + } + } + None + } + + /// Normalize a glog line by removing timestamp, thread ID, file:line, and prefix for comparison. + /// Both old and unified implementations should now use the same format: + /// "[prefix]Lmmdd HH:MM:SS.ffffff thread_id file:line] message, fields" + /// + /// Normalized to: "L] message, fields" (prefix removed) + fn normalize_glog_line(line: &str) -> String { + // Find the level character position + if let Some(level_pos) = line + .chars() + .position(|c| matches!(c, 'I' | 'D' | 'E' | 'W' | 'T')) + { + // Find the closing bracket that comes AFTER the level character (not the one in the prefix) + if let Some(close_bracket) = line[level_pos..].find(']') { + let actual_bracket_pos = level_pos + close_bracket; + let level = &line[level_pos..=level_pos]; // e.g., "I" + let rest = &line[actual_bracket_pos + 1..].trim_start(); // Everything after the real "]" + // Don't include prefix - just level + content + return format!("{}] {}", level, rest); + } + } + + line.to_string() + } + + fn compare_glog_files(&self, old_file: &PathBuf, unified_file: &PathBuf) -> Result<()> { + println!("\n[Comparing Glog Files]"); + println!(" Old: {}", old_file.display()); + println!(" Unified: {}", unified_file.display()); + + let old_content = std::fs::read_to_string(old_file)?; + let unified_content = std::fs::read_to_string(unified_file)?; + + println!(" Old lines: {}", old_content.lines().count()); + println!(" Unified lines: {}", unified_content.lines().count()); + + let old_lines: Vec = old_content.lines().map(Self::normalize_glog_line).collect(); + + let unified_lines: Vec = unified_content + .lines() + .map(Self::normalize_glog_line) + .collect(); + + if old_lines.len() != unified_lines.len() { + return Err(anyhow::anyhow!( + "Line count mismatch: old={} unified={}", + old_lines.len(), + unified_lines.len() + )); + } + + let skip_lines = 1; + + for (i, (old_line, unified_line)) in old_lines + .iter() + .zip(unified_lines.iter()) + .enumerate() + .skip(skip_lines) + { + if old_line != unified_line { + return Err(anyhow::anyhow!( + "Line #{} mismatch:\n old: {}\n unified: {}", + i, + old_line, + unified_line + )); + } + } + + println!( + " ✓ All {} lines match (skipped {} initialization lines)!", + old_lines.len() - skip_lines, + skip_lines + ); + Ok(()) + } + + fn compare_sqlite_databases(&self, old_db: &PathBuf, unified_db: &PathBuf) -> Result<()> { + println!("\n[Comparing SQLite Databases]"); + println!(" Old: {}", old_db.display()); + println!(" Unified: {}", unified_db.display()); + + let old_conn = rusqlite::Connection::open(old_db)?; + + old_conn.execute(&format!("ATTACH '{}' AS unified", unified_db.display()), [])?; + + let tables = vec!["log_events", "messages", "actor_lifecycle"]; + + for table in tables { + println!("\n Comparing table: {}", table); + + let old_count: i64 = + old_conn.query_row(&format!("SELECT COUNT(*) FROM main.{}", table), [], |row| { + row.get(0) + })?; + let unified_count: i64 = old_conn.query_row( + &format!("SELECT COUNT(*) FROM unified.{}", table), + [], + |row| row.get(0), + )?; + + println!(" Old rows: {}", old_count); + println!(" Unified rows: {}", unified_count); + + if old_count != unified_count { + return Err(anyhow::anyhow!( + "Table {} row count mismatch: old={} unified={}", + table, + old_count, + unified_count + )); + } + + let mut stmt = old_conn.prepare(&format!("PRAGMA table_info({})", table))?; + let columns: Vec = stmt + .query_map([], |row| { + let name: String = row.get(1)?; + Ok(name) + })? + .collect::, _>>()? + .into_iter() + .filter(|col| col != "time_us") // Ignore time_us column + .collect(); + + if columns.is_empty() { + continue; + } + + let col_list = columns.join(", "); + + let diff_query = format!( + "SELECT '-' as diff_type, {cols} FROM main.{table} + EXCEPT + SELECT '-' as diff_type, {cols} FROM unified.{table} + UNION ALL + SELECT '+' as diff_type, {cols} FROM unified.{table} + EXCEPT + SELECT '+' as diff_type, {cols} FROM main.{table}", + table = table, + cols = col_list + ); + + let mut stmt = old_conn.prepare(&diff_query)?; + let mut rows = stmt.query([])?; + + let mut diffs = Vec::new(); + while let Some(row) = rows.next()? { + let diff_type: String = row.get(0)?; + let color = if diff_type == "-" { + "\x1b[31m" // red + } else { + "\x1b[32m" // green + }; + let mut row_str = format!("{}{} ", color, diff_type); + for i in 1..row.as_ref().column_count() { + let col_name = row.as_ref().column_name(i)?; + let val: Option = row.get(i).ok(); + row_str.push_str(&format!("{}={:?}, ", col_name, val)); + } + row_str.push_str("\x1b[0m"); // reset color + diffs.push(row_str); + } + + if !diffs.is_empty() { + return Err(anyhow::anyhow!( + "Table {} has differences:\n{}", + table, + diffs.join("\n") + )); + } + + println!(" ✓ {} rows match", old_count); + } + + println!("\n ✓ All tables match!"); + Ok(()) + } +} + +// ============================================================================ +// Test Workloads +// ============================================================================ + +fn workload_simple_info_events() { + for i in 0..100 { + tracing::info!(iteration = i, "simple info event"); + } +} + +fn workload_spans_with_fields() { + for i in 0..50 { + let _span = tracing::info_span!( + "test_span", + iteration = i, + foo = 42, + message_type = "Request" + ) + .entered(); + } +} + +fn workload_nested_spans() { + for i in 0..20 { + let _outer = tracing::info_span!("outer", iteration = i).entered(); + { + let _middle = tracing::info_span!("middle", level = 2).entered(); + { + let _inner = tracing::info_span!("inner", level = 3).entered(); + tracing::info!("inside nested span"); + } + } + } +} + +fn workload_events_with_fields() { + for i in 0..100 { + tracing::info!( + iteration = i, + foo = 42, + message_type = "Request", + status = "ok", + count = 100, + "event with many fields" + ); + } +} + +fn workload_mixed_log_levels() { + for _ in 0..25 { + tracing::trace!("trace event"); + tracing::debug!(value = 1, "debug event"); + tracing::info!(value = 2, "info event"); + tracing::warn!(value = 3, "warn event"); + tracing::error!(value = 4, "error event"); + } +} + +fn workload_events_in_spans() { + for i in 0..30 { + let _span = tracing::info_span!("outer_span", iteration = i).entered(); + tracing::info!(step = "start", "starting work"); + tracing::debug!(step = "middle", "doing work"); + tracing::info!(step = "end", "finished work"); + } +} + +fn main() -> Result<()> { + let args: Vec = std::env::args().collect(); + + // This script will spawn itself into this branch + if args.len() > 2 { + let test_name = &args[1]; + let impl_type = &args[2]; + return run_single_test(test_name, impl_type); + } + + println!("\n\nHyperactor Telemetry Correctness Test Suite"); + println!("Comparing OLD vs UNIFIED implementations\n"); + + let tests = vec![ + "simple_info_events", + "spans_with_fields", + "nested_spans", + "events_with_fields", + "mixed_log_levels", + "events_in_spans", + ]; + + let mut all_passed = true; + + for test_name in tests { + println!("\n{}", "=".repeat(80)); + println!("Running test: {}", test_name_to_display(test_name)); + println!("{}", "=".repeat(80)); + + let mut test_passed = true; + + println!("\n[Running OLD implementation...]"); + let old_log_suffix = format!("{}_old", test_name); + let old_status = std::process::Command::new(&args[0]) + .arg(test_name) + .arg("--old") + .env("TEST_LOG_PREFIX", "test") + .env( + hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV, + &old_log_suffix, + ) + .env("ENABLE_SQLITE_TRACING", "1") + .status()?; + + if !old_status.success() { + println!("\n✗ OLD implementation FAILED"); + all_passed = false; + test_passed = false; + } + + println!("\n[Running UNIFIED implementation...]"); + let unified_log_suffix = format!("{}_unified", test_name); + let unified_status = std::process::Command::new(&args[0]) + .arg(test_name) + .arg("--unified") + .env("TEST_LOG_PREFIX", "test") + .env( + hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV, + &unified_log_suffix, + ) + .env("ENABLE_SQLITE_TRACING", "1") + .status()?; + + if !unified_status.success() { + println!("\n✗ UNIFIED implementation FAILED"); + all_passed = false; + test_passed = false; + } + + let username = whoami::username(); + let harness = CorrectnessTestHarness {}; + + // Compare glog files + let old_log = PathBuf::from(format!("/tmp/{}/test_{}_old.log", username, test_name)); + let unified_log = + PathBuf::from(format!("/tmp/{}/test_{}_unified.log", username, test_name)); + + if !old_log.exists() || !unified_log.exists() { + println!("\n⚠ Glog files not found, skipping comparison"); + if !old_log.exists() { + println!(" Missing: {}", old_log.display()); + } + if !unified_log.exists() { + println!(" Missing: {}", unified_log.display()); + } + all_passed = false; + test_passed = false; + } else { + match harness.compare_glog_files(&old_log, &unified_log) { + Ok(()) => { + println!("\n✓ Glog files match"); + } + Err(e) => { + println!("\n✗ Glog comparison FAILED: {}", e); + all_passed = false; + test_passed = false; + } + } + } + + // Compare SQLite databases + let old_db = PathBuf::from(format!("/tmp/{}/test_{}_old.db", username, test_name)); + let unified_db = PathBuf::from(format!("/tmp/{}/test_{}_unified.db", username, test_name)); + + // SQLite databases are now required - both implementations should create them + if !old_db.exists() { + println!("\n✗ OLD database not found: {}", old_db.display()); + all_passed = false; + test_passed = false; + } else if !unified_db.exists() { + println!("\n✗ UNIFIED database not found: {}", unified_db.display()); + all_passed = false; + test_passed = false; + } else { + match harness.compare_sqlite_databases(&old_db, &unified_db) { + Ok(()) => { + println!("\n✓ SQLite databases match"); + } + Err(e) => { + println!("\n✗ SQLite comparison FAILED: {}", e); + all_passed = false; + test_passed = false; + } + } + } + + if test_passed { + println!("\n✓ Test PASSED: {}", test_name_to_display(test_name)); + } else { + println!("\n✗ Test FAILED: {}", test_name_to_display(test_name)); + } + + // Clean up test files + let _ = std::fs::remove_file(&old_db); + let _ = std::fs::remove_file(&unified_db); + let _ = std::fs::remove_file(&old_log); + let _ = std::fs::remove_file(&unified_log); + } + + println!("\n\n{}", "=".repeat(80)); + if all_passed { + println!("All tests completed successfully!"); + } else { + println!("Some tests FAILED!"); + return Err(anyhow::anyhow!("Test failures detected")); + } + println!("{}", "=".repeat(80)); + + Ok(()) +} + +/// Called in child process +fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { + let impl_suffix = if impl_type == "--old" { + "old" + } else { + "unified" + }; + let log_suffix = format!("{}_{}", test_name, impl_suffix); + let username = whoami::username(); + let possible_log_paths = vec![ + format!("/tmp/{}/monarch_log_{}.log", username, log_suffix), + format!("/tmp/monarch_log_{}.log", log_suffix), + format!("/logs/dedicated_log_monarch_{}.log", log_suffix), + ]; + + for path in &possible_log_paths { + if std::path::Path::new(path).exists() { + let _ = std::fs::remove_file(path); + println!("Cleaned up existing log file: {}", path); + } + } + + let target_log_copy = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix); + if std::path::Path::new(&target_log_copy).exists() { + let _ = std::fs::remove_file(&target_log_copy); + println!("Cleaned up existing copy file: {}", target_log_copy); + } + + let harness = CorrectnessTestHarness {}; + + let workload: fn() = match test_name { + "simple_info_events" => workload_simple_info_events, + "spans_with_fields" => workload_spans_with_fields, + "nested_spans" => workload_nested_spans, + "events_with_fields" => workload_events_with_fields, + "mixed_log_levels" => workload_mixed_log_levels, + "events_in_spans" => workload_events_in_spans, + _ => { + return Err(anyhow::anyhow!("Unknown test: {}", test_name)); + } + }; + + let results = match impl_type { + "--old" => { + println!("Running with OLD implementation..."); + harness.run(workload, false)? + } + "--unified" => { + println!("Running with UNIFIED implementation..."); + // Set USE_UNIFIED_LAYER to use unified implementation + // SAFETY: Setting before any telemetry initialization + unsafe { + std::env::set_var("USE_UNIFIED_LAYER", "1"); + } + harness.run(workload, true)? + } + _ => { + return Err(anyhow::anyhow!( + "Unknown implementation type: {}", + impl_type + )); + } + }; + + if let Some(glog_path) = results.glog_path { + let target_path = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix); + + std::fs::copy(&glog_path, &target_path)?; + println!("Glog file copied to: {}", target_path); + } + + if let Some(db_path) = results.sqlite_path { + let target_path = format!("/tmp/{}/test_{}_{}.db", username, test_name, impl_suffix); + + println!( + "Attempting to copy database from {} to {}", + db_path.display(), + target_path + ); + std::fs::copy(&db_path, &target_path).map_err(|e| { + anyhow::anyhow!( + "Failed to copy database from {} to {}: {}", + db_path.display(), + target_path, + e + ) + })?; + + // Also copy WAL files if they exist (SQLite WAL mode) + let wal_path = format!("{}-wal", db_path.display()); + let shm_path = format!("{}-shm", db_path.display()); + let target_wal = format!("{}-wal", target_path); + let target_shm = format!("{}-shm", target_path); + + if std::path::Path::new(&wal_path).exists() { + let _ = std::fs::copy(&wal_path, &target_wal); + } + if std::path::Path::new(&shm_path).exists() { + let _ = std::fs::copy(&shm_path, &target_shm); + } + + println!("Database copied to: {}", target_path); + } else { + println!("Warning: No SQLite database path found"); + } + + Ok(()) +} + +fn test_name_to_display(test_name: &str) -> &str { + match test_name { + "simple_info_events" => "Simple info events", + "spans_with_fields" => "Spans with fields", + "nested_spans" => "Nested spans", + "events_with_fields" => "Events with many fields", + "mixed_log_levels" => "Mixed log levels", + "events_in_spans" => "Events in spans", + _ => test_name, + } +} diff --git a/hyperactor_telemetry/benches/telemetry_benchmark.rs b/hyperactor_telemetry/benches/telemetry_benchmark.rs new file mode 100644 index 000000000..7a392ca37 --- /dev/null +++ b/hyperactor_telemetry/benches/telemetry_benchmark.rs @@ -0,0 +1,258 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Benchmark comparing old vs unified telemetry implementations. +//! +//! This benchmark simulates a realistic workload with: +//! - Nested spans (simulating actor message processing) +//! - Events at different log levels +//! - Field recording +//! - Multiple iterations +//! +//! Usage: +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --old-no-sqlite +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --old-with-sqlite +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --unified +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --compare + +#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor` just for `hyperactor::clock::Clock` + +use std::time::Instant; + +use hyperactor_telemetry::*; + +fn stage_debug_events_only(iterations: usize) { + for i in 0..iterations { + tracing::debug!(iteration = i, "debug event"); + } +} + +fn stage_info_events_only(iterations: usize) { + for i in 0..iterations { + tracing::info!(iteration = i, "info event"); + } +} + +fn stage_trace_events_only(iterations: usize) { + for i in 0..iterations { + tracing::trace!(iteration = i, "trace event"); + } +} + +fn stage_error_events_only(iterations: usize) { + for i in 0..iterations { + tracing::error!(iteration = i, "error event"); + } +} + +fn stage_simple_spans_only(iterations: usize) { + for _ in 0..iterations { + let _span = tracing::info_span!("simple_span").entered(); + } +} + +fn stage_spans_with_fields(iterations: usize) { + for i in 0..iterations { + let _span = tracing::info_span!( + "span_with_fields", + iteration = i, + foo = 42, + message_type = "Request" + ) + .entered(); + } +} + +fn stage_nested_spans(iterations: usize) { + for _ in 0..iterations { + let _outer = tracing::info_span!("outer", level = 1).entered(); + { + let _middle = tracing::info_span!("middle", level = 2).entered(); + { + let _inner = tracing::info_span!("inner", level = 3).entered(); + } + } + } +} + +fn stage_events_with_fields(iterations: usize) { + for i in 0..iterations { + tracing::info!( + iteration = i, + foo = 42, + message_type = "Request", + status = "ok", + count = 100, + "event with fields" + ); + } +} + +fn run_benchmark_stages(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + let stages: Vec<(&'static str, fn(usize))> = vec![ + ("Debug events only", stage_debug_events_only), + ("Info events only", stage_info_events_only), + ("Trace events only", stage_trace_events_only), + ("Error events only", stage_error_events_only), + ("Simple spans only", stage_simple_spans_only), + ("Spans with fields", stage_spans_with_fields), + ("Nested spans (3 levels)", stage_nested_spans), + ("Events with fields", stage_events_with_fields), + ]; + + let mut results = Vec::new(); + + for (name, stage_fn) in stages { + // Warm up + stage_fn(10); + + // Benchmark + let start = Instant::now(); + stage_fn(iterations); + let elapsed = start.elapsed(); + + println!( + " {:30} {} iterations in {:>12?} ({:>10?}/iter)", + format!("{}:", name), + iterations, + elapsed, + elapsed / iterations as u32 + ); + + results.push((name, elapsed)); + } + + results +} + +fn benchmark_no_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None); + + let results = run_benchmark_stages(iterations); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + results +} + +fn benchmark_with_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None); + + let _sqlite_tracing = + hyperactor_telemetry::sqlite::SqliteTracing::new().expect("Failed to create SqliteTracing"); + + let results = run_benchmark_stages(iterations); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + results +} + +fn main() { + let args: Vec = std::env::args().collect(); + + let iterations = 1000; + + if args.len() < 2 { + println!("Usage: {} [OPTIONS]", args[0]); + println!(" --old-no-sqlite: Benchmark old implementation without SQLite"); + println!(" --old-with-sqlite: Benchmark old implementation with SQLite"); + println!( + " --unified: Benchmark unified implementation (use ENABLE_SQLITE_TRACING)" + ); + println!(" --compare: Run all four benchmarks and compare"); + return; + } + + match args[1].as_str() { + "--old-no-sqlite" => { + println!("\n{}", "=".repeat(100)); + println!("Benchmarking OLD implementation (SQLite DISABLED)..."); + // Don't set USE_UNIFIED_LAYER - uses old implementation + let _results = benchmark_no_sqlite(iterations); + } + "--old-with-sqlite" => { + println!("\n{}", "=".repeat(100)); + println!("Benchmarking OLD implementation (SQLite ENABLED)..."); + let _results = benchmark_with_sqlite(iterations); + } + "--unified" => { + println!("Benchmarking UNIFIED implementation..."); + // Set USE_UNIFIED_LAYER to use unified implementation + // SAFETY: Setting before any telemetry initialization + unsafe { + std::env::set_var("USE_UNIFIED_LAYER", "1"); + } + println!("\n{}", "=".repeat(100)); + println!( + "Benchmarking UNIFIED implementation (SQLite {})...", + if std::env::var("ENABLE_SQLITE_TRACING").unwrap_or_default() == "1" { + "ENABLED" + } else { + "DISABLED" + } + ); + let _results = benchmark_no_sqlite(iterations); + } + "--compare" => { + println!( + "Running comparison benchmark with {} iterations...\n", + iterations + ); + + let old_no_sqlite_status = std::process::Command::new(&args[0]) + .arg("--old-no-sqlite") + .status() + .expect("Failed to spawn old implementation without SQLite"); + + if !old_no_sqlite_status.success() { + eprintln!("\n✗ OLD implementation (no SQLite) benchmark FAILED"); + return; + } + + let old_with_sqlite_status = std::process::Command::new(&args[0]) + .arg("--old-with-sqlite") + .env("ENABLE_SQLITE_TRACING", "1") + .status() + .expect("Failed to spawn old implementation with SQLite"); + + if !old_with_sqlite_status.success() { + eprintln!("\n✗ OLD implementation (with SQLite) benchmark FAILED"); + return; + } + + let unified_no_sqlite_status = std::process::Command::new(&args[0]) + .arg("--unified") + .status() + .expect("Failed to spawn unified implementation without SQLite"); + + if !unified_no_sqlite_status.success() { + eprintln!("\n✗ UNIFIED implementation (no SQLite) benchmark FAILED"); + } + + let unified_with_sqlite_status = std::process::Command::new(&args[0]) + .arg("--unified") + .env("ENABLE_SQLITE_TRACING", "1") + .status() + .expect("Failed to spawn unified implementation with SQLite"); + + if !unified_with_sqlite_status.success() { + eprintln!("\n✗ UNIFIED implementation (with SQLite) benchmark FAILED"); + return; + } + + println!("All benchmarks completed successfully!"); + } + _ => { + println!("Unknown option: {}", args[1]); + println!( + "Use --old-no-sqlite, --old-with-sqlite, --unified-no-sqlite, --unified-with-sqlite, or --compare" + ); + } + } +} diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 0210e0a5e..4bf43f555 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -7,6 +7,7 @@ */ #![allow(internal_features)] +#![allow(clippy::disallowed_methods)] // hyperactor_telemetry can't use hyperactor::clock::Clock (circular dependency) #![feature(assert_matches)] #![feature(sync_unsafe_cell)] #![feature(mpmc_channel)] @@ -33,9 +34,15 @@ pub const ENABLE_SQLITE_TRACING: &str = "ENABLE_SQLITE_TRACING"; /// Environment variable constants // Log level (debug, info, warn, error, critical) to capture for Monarch traces on dedicated log file (changes based on environment, see `log_file_path`). const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG"; +// Suffix to append to log filenames for test isolation +pub const MONARCH_LOG_SUFFIX_ENV: &str = "MONARCH_LOG_SUFFIX"; pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME"; +/// Environment variable to enable the unified layer. +/// Set to "1" to enable. +pub const USE_UNIFIED_LAYER: &str = "USE_UNIFIED_LAYER"; + // Log level constants const LOG_LEVEL_INFO: &str = "info"; const LOG_LEVEL_DEBUG: &str = "debug"; @@ -59,10 +66,12 @@ mod meta; mod otel; mod pool; pub mod recorder; +pub mod sinks; mod spool; pub mod sqlite; pub mod task; pub mod trace; +pub mod trace_dispatcher; use std::io::Write; use std::str::FromStr; use std::sync::Arc; @@ -170,7 +179,8 @@ fn writer() -> Box { match env::Env::current() { env::Env::Test => Box::new(std::io::stderr()), env::Env::Local | env::Env::MastEmulator | env::Env::Mast => { - let (path, filename) = log_file_path(env::Env::current(), None).unwrap(); + let suffix = std::env::var(MONARCH_LOG_SUFFIX_ENV).ok(); + let (path, filename) = log_file_path(env::Env::current(), suffix.as_deref()).unwrap(); match try_create_appender(&path, &filename, true) { Ok(file_appender) => Box::new(file_appender), Err(e) => { @@ -549,6 +559,13 @@ pub fn initialize_logging_for_test() { initialize_logging(DefaultTelemetryClock {}); } +fn is_layer_enabled(env_var: &str) -> bool { + std::env::var(env_var).unwrap_or_default() == "1" +} +fn is_layer_disabled(env_var: &str) -> bool { + std::env::var(env_var).unwrap_or_default() == "1" +} + /// Set up logging based on the given execution environment. We specialize logging based on how the /// logs are consumed. The destination scuba table is specialized based on the execution environment. /// mast -> monarch_tracing/prod @@ -567,6 +584,8 @@ pub fn initialize_logging_with_log_prefix( clock: impl TelemetryClock + Send + 'static, prefix_env_var: Option, ) { + let use_unified = std::env::var(USE_UNIFIED_LAYER).unwrap_or_default() == "1"; + swap_telemetry_clock(clock); let file_log_level = match env::Env::current() { env::Env::Local => LOG_LEVEL_INFO, @@ -574,27 +593,6 @@ pub fn initialize_logging_with_log_prefix( env::Env::Mast => LOG_LEVEL_INFO, env::Env::Test => LOG_LEVEL_DEBUG, }; - let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default() - .lossy(false) - .finish(writer()); - let writer_guard = Arc::new((non_blocking, guard)); - let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); - - let file_layer = fmt::Layer::default() - .with_writer(writer_guard.0.clone()) - .event_format(PrefixedFormatter::new(prefix_env_var.clone())) - .fmt_fields(GlogFields::default().compact()) - .with_ansi(false) - .with_filter( - Targets::new() - .with_default(LevelFilter::from_level( - tracing::Level::from_str( - &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()), - ) - .expect("Invalid log level"), - )) - .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about - ); use tracing_subscriber::Registry; use tracing_subscriber::layer::SubscriberExt; @@ -603,50 +601,111 @@ pub fn initialize_logging_with_log_prefix( #[cfg(fbcode_build)] { use crate::env::Env; - fn is_layer_enabled(env_var: &str) -> bool { - std::env::var(env_var).unwrap_or_default() == "1" - } - fn is_layer_disabled(env_var: &str) -> bool { - std::env::var(env_var).unwrap_or_default() == "1" - } - if let Err(err) = Registry::default() - .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { - // TODO: get_reloadable_sqlite_layer currently still returns None, - // and some additional work is required to make it work. - Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) - } else { - None - }) - .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { - Some(otel::tracing_layer()) - } else { - None - }) - .with(file_layer) - .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { - Some(recorder().layer()) - } else { - None - }) - .try_init() - { - tracing::debug!("logging already initialized for this process: {}", err); + if use_unified { + let mut sinks: Vec> = Vec::new(); + sinks.push(Box::new(sinks::glog::GlogSink::new( + writer(), + prefix_env_var.clone(), + file_log_level, + ))); + + let mut max_level = None; + + let sqlite_enabled = std::env::var(ENABLE_SQLITE_TRACING).unwrap_or_default() == "1"; + + if sqlite_enabled { + match create_sqlite_sink() { + Ok(sink) => { + max_level = Some(tracing::level_filters::LevelFilter::TRACE); + sinks.push(Box::new(sink)); + } + Err(e) => { + tracing::warn!("failed to create SqliteSink: {}", e); + } + } + } + + if let Err(err) = Registry::default() + .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { + Some(otel::tracing_layer()) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }) + .with(trace_dispatcher::TraceEventDispatcher::new( + sinks, max_level, + )) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else { + // For file_layer, use NonBlocking + let (non_blocking, guard) = + tracing_appender::non_blocking::NonBlockingBuilder::default() + .lossy(false) + .finish(writer()); + let writer_guard = Arc::new((non_blocking, guard)); + let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); + + let file_layer = fmt::Layer::default() + .with_writer(writer_guard.0.clone()) + .event_format(PrefixedFormatter::new(prefix_env_var.clone())) + .fmt_fields(GlogFields::default().compact()) + .with_ansi(false) + .with_filter( + Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV) + .unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about + ); + + if let Err(err) = Registry::default() + .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { + // TODO: get_reloadable_sqlite_layer currently still returns None, + // and some additional work is required to make it work. + Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { + Some(otel::tracing_layer()) + } else { + None + }) + .with(file_layer) + .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } } let exec_id = env::execution_id(); - tracing::info!( - target: "execution", - execution_id = exec_id, - environment = %Env::current(), - args = ?std::env::args(), - build_mode = build_info::BuildInfo::get_build_mode(), - compiler = build_info::BuildInfo::get_compiler(), - compiler_version = build_info::BuildInfo::get_compiler_version(), - buck_rule = build_info::BuildInfo::get_rule(), - package_name = build_info::BuildInfo::get_package_name(), - package_release = build_info::BuildInfo::get_package_release(), - upstream_revision = build_info::BuildInfo::get_upstream_revision(), - revision = build_info::BuildInfo::get_revision(), - "logging_initialized" + meta::log_execution_event( + &exec_id, + &Env::current().to_string(), + std::env::args().collect(), + build_info::BuildInfo::get_build_mode(), + build_info::BuildInfo::get_compiler(), + build_info::BuildInfo::get_compiler_version(), + build_info::BuildInfo::get_rule(), + build_info::BuildInfo::get_package_name(), + build_info::BuildInfo::get_package_release(), + build_info::BuildInfo::get_upstream_revision(), + build_info::BuildInfo::get_revision(), ); if !is_layer_disabled(DISABLE_OTEL_METRICS) { @@ -655,22 +714,66 @@ pub fn initialize_logging_with_log_prefix( } #[cfg(not(fbcode_build))] { - if let Err(err) = Registry::default() - .with(file_layer) - .with( - if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" { - Some(recorder().layer()) - } else { - None - }, - ) - .try_init() - { - tracing::debug!("logging already initialized for this process: {}", err); + let registry = Registry::default().with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }); + + if use_unified { + let mut sinks: Vec> = Vec::new(); + sinks.push(Box::new(sinks::glog::GlogSink::new( + writer(), + prefix_env_var.clone(), + file_log_level, + ))); + + if let Err(err) = registry + .with(trace_dispatcher::TraceEventDispatcher::new(sinks, None)) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else { + let (non_blocking, guard) = + tracing_appender::non_blocking::NonBlockingBuilder::default() + .lossy(false) + .finish(writer()); + let writer_guard = Arc::new((non_blocking, guard)); + let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); + + let file_layer = fmt::Layer::default() + .with_writer(writer_guard.0.clone()) + .event_format(PrefixedFormatter::new(prefix_env_var.clone())) + .fmt_fields(GlogFields::default().compact()) + .with_ansi(false) + .with_filter( + Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV) + .unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), + ); + + if let Err(err) = registry.with(file_layer).try_init() { + tracing::debug!("logging already initialized for this process: {}", err); + } } } } +fn create_sqlite_sink() -> anyhow::Result { + let (db_path, _) = log_file_path(env::Env::current(), Some("traces")) + .expect("failed to determine trace db path"); + let db_file = format!("{}/hyperactor_trace_{}.db", db_path, std::process::id()); + + Ok(sinks::sqlite::SqliteSink::new_with_file(&db_file, 100)?) +} + pub mod env { use rand::RngCore; diff --git a/hyperactor_telemetry/src/sinks/glog.rs b/hyperactor_telemetry/src/sinks/glog.rs new file mode 100644 index 000000000..c65803df9 --- /dev/null +++ b/hyperactor_telemetry/src/sinks/glog.rs @@ -0,0 +1,257 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Glog-formatted text sink for trace events. +//! Replicates the behavior of the fmt::Layer with glog formatting. + +use std::collections::HashMap; +use std::io::BufWriter; +use std::io::Write; +use std::str::FromStr; + +use anyhow::Result; +use indexmap::IndexMap; +use tracing_core::LevelFilter; +use tracing_subscriber::filter::Targets; + +use crate::MONARCH_FILE_LOG_ENV; +use crate::trace_dispatcher::FieldValue; +use crate::trace_dispatcher::TraceEvent; +use crate::trace_dispatcher::TraceEventSink; + +/// Glog sink that writes events in glog format to a file. +/// This replaces the fmt::Layer that was previously used for text logging. +/// +/// This only logs Events, not Spans (matching old fmt::Layer behavior). +pub struct GlogSink { + writer: BufWriter>, + prefix: Option, + /// Track active spans by ID with (name, fields, parent_id) to show span context in event logs + active_spans: HashMap, Option)>, + targets: Targets, +} + +impl GlogSink { + /// Create a new glog sink with the given writer. + /// + /// # Arguments + /// * `writer` - Writer to write log events to (will be buffered) + /// * `prefix_env_var` - Optional environment variable name to read prefix from (matching old impl) + /// * `min_level` - Minimum log level to capture (e.g., INFO, DEBUG) + pub fn new( + writer: Box, + prefix_env_var: Option, + file_log_level: &str, + ) -> Self { + let prefix = if let Some(ref env_var_name) = prefix_env_var { + std::env::var(env_var_name).ok() + } else { + None + }; + + Self { + writer: BufWriter::new(writer), + prefix, + active_spans: HashMap::new(), + targets: Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about + } + } + + fn write_event(&mut self, event: &TraceEvent) -> Result<()> { + let timestamp_str = match event { + TraceEvent::Event { timestamp, .. } => { + let datetime: chrono::DateTime = (*timestamp).into(); + datetime.format("%m%d %H:%M:%S%.6f").to_string() + } + // write_event is only called for Events, but keep this for safety + _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(), + }; + + let prefix_str = if let Some(ref p) = self.prefix { + format!("[{}]", p) + } else { + "[-]".to_string() + }; + + match event { + TraceEvent::Event { + level, + fields, + parent_span, + thread_id, + file, + line, + .. + } => { + let level_char = match *level { + tracing::Level::ERROR => 'E', + tracing::Level::WARN => 'W', + tracing::Level::INFO => 'I', + tracing::Level::DEBUG => 'D', + tracing::Level::TRACE => 'T', + }; + + // [prefix]LMMDD HH:MM:SS.ffffff thread_id file:line] message, key:value, key:value + write!( + self.writer, + "{}{}{} {} ", + prefix_str, level_char, timestamp_str, thread_id + )?; + + if let (Some(f), Some(l)) = (file, line) { + write!(self.writer, "{}:{}] ", f, l)?; + } else { + write!(self.writer, "unknown:0] ")?; + } + + if let Some(parent_id) = parent_span { + self.write_span_context(*parent_id)?; + } + + if let Some(v) = fields.get("message") { + match v { + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + _ => write!(self.writer, "event")?, + } + } else { + write!(self.writer, "event")?; + } + + for (k, v) in fields.iter() { + if k != "message" { + write!(self.writer, ", {}:", k)?; + match v { + FieldValue::Bool(b) => write!(self.writer, "{}", b)?, + FieldValue::I64(i) => write!(self.writer, "{}", i)?, + FieldValue::U64(u) => write!(self.writer, "{}", u)?, + FieldValue::F64(f) => write!(self.writer, "{}", f)?, + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + } + } + } + + writeln!(self.writer)?; + } + + // write_event should only be called for Events, but handle gracefully + _ => { + writeln!( + self.writer, + "{}I{} - unknown:0] unexpected event type", + prefix_str, timestamp_str + )?; + } + } + + Ok(()) + } + + /// Writes span context: "[outer{field:value}, inner{field:value}] " + fn write_span_context(&mut self, span_id: u64) -> Result<()> { + let mut span_ids = Vec::new(); + let mut current_id = Some(span_id); + + while let Some(id) = current_id { + if let Some((_, _, parent_id)) = self.active_spans.get(&id) { + span_ids.push(id); + current_id = *parent_id; + } else { + break; + } + } + if span_ids.is_empty() { + return Ok(()); + } + + write!(self.writer, "[")?; + + for (i, id) in span_ids.iter().rev().enumerate() { + if i > 0 { + write!(self.writer, ", ")?; + } + + if let Some((name, fields, _)) = self.active_spans.get(id) { + write!(self.writer, "{}", name)?; + if !fields.is_empty() { + write!(self.writer, "{{")?; + + let mut first = true; + for (k, v) in fields.iter() { + if !first { + write!(self.writer, ", ")?; + } + first = false; + write!(self.writer, "{}:", k)?; + + match v { + FieldValue::Bool(b) => write!(self.writer, "{}", b)?, + FieldValue::I64(i) => write!(self.writer, "{}", i)?, + FieldValue::U64(u) => write!(self.writer, "{}", u)?, + FieldValue::F64(f) => write!(self.writer, "{}", f)?, + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + } + } + + write!(self.writer, "}}")?; + } + } + } + + write!(self.writer, "] ")?; + Ok(()) + } +} + +impl TraceEventSink for GlogSink { + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> { + match event { + // Track span lifecycle for context display (must happen even if we don't export spans) + TraceEvent::NewSpan { + id, + name, + fields, + parent_id, + .. + } => { + self.active_spans + .insert(*id, (name.to_string(), fields.clone(), *parent_id)); + } + TraceEvent::SpanClose { id, .. } => { + self.active_spans.remove(id); + } + TraceEvent::Event { .. } => { + self.write_event(event)?; + } + _ => {} + } + Ok(()) + } + + fn flush(&mut self) -> Result<(), anyhow::Error> { + self.writer.flush()?; + Ok(()) + } + + fn name(&self) -> &str { + "GlogSink" + } + + fn target_filter(&self) -> Option<&Targets> { + Some(&self.targets) + } +} diff --git a/hyperactor_telemetry/src/sinks/mod.rs b/hyperactor_telemetry/src/sinks/mod.rs new file mode 100644 index 000000000..8e5c11b46 --- /dev/null +++ b/hyperactor_telemetry/src/sinks/mod.rs @@ -0,0 +1,14 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Exporters for the unified telemetry layer. +//! Each exporter implements the TraceExporter trait and handles +//! writing events to a specific backend (SQLite, Scuba, glog, etc). + +pub mod glog; +pub mod sqlite; diff --git a/hyperactor_telemetry/src/sinks/sqlite.rs b/hyperactor_telemetry/src/sinks/sqlite.rs new file mode 100644 index 000000000..fbb183366 --- /dev/null +++ b/hyperactor_telemetry/src/sinks/sqlite.rs @@ -0,0 +1,203 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! SQLite sink with batched writes and transactions. +//! Runs on background thread to avoid blocking application threads. +//! +//! Reuses table definitions and insertion logic from the old SqliteLayer +//! to ensure 100% identical behavior. + +use std::path::Path; + +use anyhow::Result; +use anyhow::anyhow; +use rusqlite::Connection; +use rusqlite::functions::FunctionFlags; +use serde_json::Value as JValue; +use tracing_core::LevelFilter; +use tracing_subscriber::filter::Targets; + +use crate::sqlite; +use crate::trace_dispatcher::FieldValue; +use crate::trace_dispatcher::TraceEvent; +use crate::trace_dispatcher::TraceEventSink; + +/// SQLite sink that batches events and writes them in transactions. +/// Reuses the exact same table schema and insertion logic from SqliteLayer. +pub struct SqliteSink { + conn: Connection, + batch: Vec, + batch_size: usize, + target_filter: Targets, +} + +impl SqliteSink { + /// Create a new SQLite sink with an in-memory database. + /// Matches the API of SqliteLayer::new() + /// + /// # Arguments + /// * `batch_size` - Number of events to batch before flushing to disk + pub fn new(batch_size: usize) -> Result { + let conn = Connection::open_in_memory()?; + Self::setup_connection(conn, batch_size) + } + + /// Create a new SQLite sink with a file-based database. + /// Matches the API of SqliteLayer::new_with_file() + /// + /// # Arguments + /// * `db_path` - Path to SQLite database file + /// * `batch_size` - Number of events to batch before flushing to disk + pub fn new_with_file(db_path: impl AsRef, batch_size: usize) -> Result { + let conn = Connection::open(db_path)?; + Self::setup_connection(conn, batch_size) + } + + fn setup_connection(conn: Connection, batch_size: usize) -> Result { + for table in sqlite::ALL_TABLES.iter() { + conn.execute(&table.create_table_stmt, [])?; + } + + conn.create_scalar_function( + "assert", + 2, + FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC, + move |ctx| { + let condition: bool = ctx.get(0)?; + let message: String = ctx.get(1)?; + + if !condition { + return Err(rusqlite::Error::UserFunctionError( + anyhow!("assertion failed:{condition} {message}",).into(), + )); + } + + Ok(condition) + }, + )?; + + Ok(Self { + conn, + batch: Vec::with_capacity(batch_size), + batch_size, + target_filter: Targets::new() + .with_target("execution", LevelFilter::OFF) + .with_target("opentelemetry", LevelFilter::OFF) + .with_target("hyperactor_telemetry", LevelFilter::OFF) + .with_default(LevelFilter::TRACE), + }) + } + + fn flush_batch(&mut self) -> Result<()> { + if self.batch.is_empty() { + return Ok(()); + } + + let tx = self.conn.transaction()?; + + for event in &self.batch { + // We only batch Event variants in consume(), so this match is guaranteed to succeed + let TraceEvent::Event { + target, + fields, + timestamp, + module_path, + file, + line, + .. + } = event + else { + unreachable!("Only Event variants should be in batch") + }; + + let timestamp_us = timestamp + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_micros() + .to_string(); + + let mut visitor = sqlite::SqlVisitor::default(); + + visitor + .0 + .insert("time_us".to_string(), JValue::String(timestamp_us)); + + if let Some(mp) = module_path { + visitor + .0 + .insert("module_path".to_string(), JValue::String(mp.to_string())); + } + if let Some(l) = line { + visitor + .0 + .insert("line".to_string(), JValue::String(l.to_string())); + } + if let Some(f) = file { + visitor + .0 + .insert("file".to_string(), JValue::String(f.to_string())); + } + + for (key, value) in fields { + let json_value = match value { + FieldValue::Bool(b) => JValue::Bool(*b), + FieldValue::I64(i) => JValue::Number((*i).into()), + FieldValue::U64(u) => JValue::Number((*u).into()), + FieldValue::F64(f) => serde_json::Number::from_f64(*f) + .map(JValue::Number) + .unwrap_or(JValue::Null), + FieldValue::Str(s) => JValue::String(s.clone()), + FieldValue::Debug(d) => JValue::String(d.clone()), + }; + visitor.0.insert(key.clone(), json_value); + } + + let table = if &**target == sqlite::TableName::ACTOR_LIFECYCLE_STR { + sqlite::TableName::ActorLifecycle.get_table() + } else if &**target == sqlite::TableName::MESSAGES_STR { + sqlite::TableName::Messages.get_table() + } else { + sqlite::TableName::LogEvents.get_table() + }; + + sqlite::insert_event_fields(&tx, table, visitor)?; + } + + tx.commit()?; + self.batch.clear(); + + Ok(()) + } +} + +impl TraceEventSink for SqliteSink { + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> { + // Only batch Event variants - we ignore spans + if matches!(event, TraceEvent::Event { .. }) { + self.batch.push(event.clone()); + + if self.batch.len() >= self.batch_size { + self.flush_batch()?; + } + } + + Ok(()) + } + + fn flush(&mut self) -> Result<(), anyhow::Error> { + self.flush_batch() + } + + fn name(&self) -> &str { + "SqliteSink" + } + + fn target_filter(&self) -> Option<&Targets> { + Some(&self.target_filter) + } +} diff --git a/hyperactor_telemetry/src/sqlite.rs b/hyperactor_telemetry/src/sqlite.rs index c5ddbed8e..95da36eff 100644 --- a/hyperactor_telemetry/src/sqlite.rs +++ b/hyperactor_telemetry/src/sqlite.rs @@ -165,7 +165,7 @@ lazy_static! { .as_slice() ) .into(); - static ref ALL_TABLES: Vec = vec![ + pub static ref ALL_TABLES: Vec
= vec![ ACTOR_LIFECYCLE.clone(), MESSAGES.clone(), LOG_EVENTS.clone() @@ -178,7 +178,7 @@ pub struct SqliteLayer { use tracing::field::Visit; #[derive(Debug, Clone, Default, Serialize)] -struct SqlVisitor(HashMap); +pub struct SqlVisitor(pub HashMap); impl Visit for SqlVisitor { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { @@ -232,6 +232,17 @@ macro_rules! insert_event { }; } +/// Public helper to insert event fields into database using the same logic as the old implementation. +/// This is used by the unified SqliteExporter to ensure identical behavior. +pub fn insert_event_fields(conn: &Connection, table: &Table, fields: SqlVisitor) -> Result<()> { + conn.prepare_cached(&table.insert_stmt)?.execute( + serde_rusqlite::to_params_named_with_fields(fields, table.columns)? + .to_slice() + .as_slice(), + )?; + Ok(()) +} + impl SqliteLayer { pub fn new() -> Result { let conn = Connection::open_in_memory()?; diff --git a/hyperactor_telemetry/src/trace_dispatcher.rs b/hyperactor_telemetry/src/trace_dispatcher.rs new file mode 100644 index 000000000..5d4726a7c --- /dev/null +++ b/hyperactor_telemetry/src/trace_dispatcher.rs @@ -0,0 +1,439 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Unified telemetry layer that captures trace events once and fans out to multiple exporters +//! on a background thread, eliminating redundant capture and moving work off the application +//! thread. + +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::mpsc; +use std::thread::JoinHandle; +use std::time::Duration; +use std::time::SystemTime; + +use indexmap::IndexMap; +use tracing::Id; +use tracing::Subscriber; +use tracing_subscriber::filter::Targets; +use tracing_subscriber::layer::Context; +use tracing_subscriber::layer::Layer; +use tracing_subscriber::registry::LookupSpan; + +const QUEUE_CAPACITY: usize = 100_000; + +/// Unified representation of a trace event captured from the tracing layer. +/// This is captured once on the application thread, then sent to the background +/// worker for fan-out to multiple exporters. +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) enum TraceEvent { + /// A new span was created (on_new_span) + NewSpan { + id: u64, + name: &'static str, + target: &'static str, + level: tracing::Level, + fields: IndexMap, + timestamp: SystemTime, + parent_id: Option, + thread_name: String, + file: Option<&'static str>, + line: Option, + }, + /// A span was entered (on_enter) + SpanEnter { id: u64, timestamp: SystemTime }, + /// A span was exited (on_exit) + SpanExit { id: u64, timestamp: SystemTime }, + /// A span was closed (dropped) + SpanClose { id: u64, timestamp: SystemTime }, + /// A tracing event occurred (e.g., tracing::info!()) + Event { + name: &'static str, + target: &'static str, + level: tracing::Level, + fields: IndexMap, + timestamp: SystemTime, + parent_span: Option, + thread_id: String, + thread_name: String, + module_path: Option<&'static str>, + file: Option<&'static str>, + line: Option, + }, +} + +/// Simplified field value representation for trace events +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) enum FieldValue { + Bool(bool), + I64(i64), + U64(u64), + F64(f64), + Str(String), + Debug(String), +} + +/// Trait for sinks that receive trace events from the dispatcher. +/// Implementations run on the background worker thread and can perform +/// expensive I/O operations without blocking the application. +pub(crate) trait TraceEventSink: Send + 'static { + /// Consume a single event. Called on background thread. + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>; + + /// Optional target/level filter for this sink. + /// + /// The worker loop automatically applies this filter before calling `consume()`, + /// so sinks don't need to check target/level in their consume implementation. + /// Only `NewSpan` and `Event` are filtered by target/level; other event types + /// are always passed through. + /// + /// # Returns + /// - `None` - No filtering, all events are consumed (default) + /// - `Some(Targets)` - Only consume events matching the target/level filter + /// + /// # Example + /// ```ignore + /// fn target_filter(&self) -> Option<&Targets> { + /// Some(Targets::new() + /// .with_target("opentelemetry", LevelFilter::OFF) + /// .with_default(LevelFilter::DEBUG)) + /// } + /// ``` + fn target_filter(&self) -> Option<&Targets> { + None + } + + /// Flush any buffered events to the backend. + /// Called periodically and on shutdown. + fn flush(&mut self) -> Result<(), anyhow::Error>; + + /// Optional: return name for debugging/logging + fn name(&self) -> &str { + std::any::type_name::() + } +} + +/// The trace event dispatcher that captures events once and dispatches to multiple sinks +/// on a background thread. +pub struct TraceEventDispatcher { + sender: Option>, + _worker_handle: WorkerHandle, + max_level: Option, + dropped_events: Arc, +} + +struct WorkerHandle { + join_handle: Option>, +} + +impl TraceEventDispatcher { + /// Create a new trace event dispatcher with the given sinks. + /// Uses a bounded channel (capacity 10,000) to ensure telemetry never blocks + /// the application. Events are dropped with a warning if the queue is full. + /// + /// # Arguments + /// * `sinks` - List of sinks to dispatch events to. + /// * `max_level` - Maximum level filter hint (None for no filtering) + pub(crate) fn new( + sinks: Vec>, + max_level: Option, + ) -> Self { + let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY); + let dropped_events = Arc::new(AtomicU64::new(0)); + let dropped_events_worker = Arc::clone(&dropped_events); + + let worker_handle = std::thread::Builder::new() + .name("telemetry-worker".into()) + .spawn(move || { + worker_loop(receiver, sinks, dropped_events_worker); + }) + .expect("failed to spawn telemetry worker thread"); + + Self { + sender: Some(sender), + _worker_handle: WorkerHandle { + join_handle: Some(worker_handle), + }, + max_level, + dropped_events, + } + } + + fn send_event(&self, event: TraceEvent) { + if let Some(sender) = &self.sender { + if let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event) { + let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1; + + if dropped.is_multiple_of(1000) { + eprintln!( + "[telemetry] WARNING: {} events dropped due to full queue (capacity: 10,000). \ + Telemetry worker may be falling behind.", + dropped + ); + } + } + } + } +} + +impl Drop for TraceEventDispatcher { + fn drop(&mut self) { + // Explicitly drop the sender to close the channel. + // The next field to be dropped is `worker_handle` which + // will run it's own drop impl to join the thread and flush + drop(self.sender.take()); + } +} + +impl Layer for TraceEventDispatcher +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let metadata = attrs.metadata(); + let mut fields = IndexMap::new(); + + let mut visitor = FieldVisitor(&mut fields); + attrs.record(&mut visitor); + + let parent_id = if let Some(parent) = attrs.parent() { + Some(parent.into_u64()) + } else { + ctx.current_span().id().map(|id| id.into_u64()) + }; + + let thread_name = std::thread::current() + .name() + .unwrap_or_default() + .to_string(); + + let event = TraceEvent::NewSpan { + id: id.into_u64(), + name: metadata.name(), + target: metadata.target(), + level: *metadata.level(), + fields, + timestamp: SystemTime::now(), + parent_id, + thread_name, + file: metadata.file(), + line: metadata.line(), + }; + + self.send_event(event); + } + + fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanEnter { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanExit { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + let metadata = event.metadata(); + let mut fields = IndexMap::new(); + let mut visitor = FieldVisitor(&mut fields); + event.record(&mut visitor); + + let parent_span = ctx.event_span(event).map(|span| span.id().into_u64()); + + #[cfg(target_os = "linux")] + let thread_id_num = { + // SAFETY: syscall(SYS_gettid) is always safe to call - it's a read-only + // syscall that returns the current thread's kernel thread ID (TID). + // The cast to u64 is safe because gettid() returns a positive pid_t. + unsafe { libc::syscall(libc::SYS_gettid) as u64 } + }; + #[cfg(not(target_os = "linux"))] + let thread_id_num = { + let tid = std::thread::current().id(); + // SAFETY: ThreadId is a newtype wrapper around a u64 counter. + // This transmute relies on the internal representation of ThreadId, + // which is stable in practice but not guaranteed by Rust's API. + // On non-Linux platforms this is a best-effort approximation. + // See: https://doc.rust-lang.org/std/thread/struct.ThreadId.html + unsafe { std::mem::transmute::(tid) } + }; + let thread_id_str = thread_id_num.to_string(); + + let thread_name = std::thread::current() + .name() + .unwrap_or_default() + .to_string(); + + let trace_event = TraceEvent::Event { + name: metadata.name(), + target: metadata.target(), + level: *metadata.level(), + fields, + timestamp: SystemTime::now(), + parent_span, + thread_id: thread_id_str, + thread_name, + module_path: metadata.module_path(), + file: metadata.file(), + line: metadata.line(), + }; + + self.send_event(trace_event); + } + + fn on_close(&self, id: Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanClose { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn max_level_hint(&self) -> Option { + self.max_level + } +} + +struct FieldVisitor<'a>(&'a mut IndexMap); + +impl<'a> tracing::field::Visit for FieldVisitor<'a> { + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.0 + .insert(field.name().to_string(), FieldValue::Bool(value)); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.0 + .insert(field.name().to_string(), FieldValue::I64(value)); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.0 + .insert(field.name().to_string(), FieldValue::U64(value)); + } + + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.0 + .insert(field.name().to_string(), FieldValue::F64(value)); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.0 + .insert(field.name().to_string(), FieldValue::Str(value.to_string())); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert( + field.name().to_string(), + FieldValue::Debug(format!("{:?}", value)), + ); + } +} + +/// Background worker loop that receives events and dispatches them to sinks. +/// Runs until the sender is dropped +fn worker_loop( + receiver: mpsc::Receiver, + mut sinks: Vec>, + dropped_events: Arc, +) { + const FLUSH_INTERVAL: Duration = Duration::from_millis(100); + const FLUSH_EVENT_COUNT: usize = 1000; + let mut last_flush = std::time::Instant::now(); + let mut events_since_flush = 0; + + fn flush_sinks(sinks: &mut [Box]) { + for sink in sinks { + if let Err(e) = sink.flush() { + eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e); + } + } + } + + fn dispatch_to_sinks(sinks: &mut [Box], event: TraceEvent) { + for sink in sinks { + if match &event { + TraceEvent::NewSpan { target, level, .. } + | TraceEvent::Event { target, level, .. } => match sink.target_filter() { + Some(targets) => targets.would_enable(target, level), + None => true, + }, + _ => true, + } { + if let Err(e) = sink.consume(&event) { + eprintln!( + "[telemetry] sink {} failed to consume event: {}", + sink.name(), + e + ); + } + } + } + } + + loop { + match receiver.recv_timeout(FLUSH_INTERVAL) { + Ok(event) => { + dispatch_to_sinks(&mut sinks, event); + events_since_flush += 1; + + if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL + { + flush_sinks(&mut sinks); + last_flush = std::time::Instant::now(); + events_since_flush = 0; + } + } + Err(mpsc::RecvTimeoutError::Timeout) => { + flush_sinks(&mut sinks); + last_flush = std::time::Instant::now(); + events_since_flush = 0; + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + break; + } + } + } + + while let Ok(event) = receiver.try_recv() { + dispatch_to_sinks(&mut sinks, event); + } + + flush_sinks(&mut sinks); + + let total_dropped = dropped_events.load(Ordering::Relaxed); + if total_dropped > 0 { + eprintln!( + "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}", + total_dropped + ); + } +} + +impl Drop for WorkerHandle { + fn drop(&mut self) { + if let Some(handle) = self.join_handle.take() { + if let Err(e) = handle.join() { + eprintln!("[telemetry] worker thread panicked: {:?}", e); + } + } + } +}