Skip to content

Commit 3d36314

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Create Glog Exporter (#1929)
Summary: Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff we will create an `Exporter` meant to replace `file_layer` and update our correctness test to ensure that files produced by both the existing and the new implementation are the same Differential Revision: D87363775
1 parent 9126765 commit 3d36314

File tree

4 files changed

+553
-58
lines changed

4 files changed

+553
-58
lines changed

hyperactor_telemetry/benches/correctness_test.rs

Lines changed: 184 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,22 @@
99
//! Correctness test harness comparing old vs unified telemetry implementations.
1010
//!
1111
//! This test harness runs identical workloads through both implementations and
12-
//! verifies that the outputs are equivalent across all exporters
12+
//! verifies that the outputs are equivalent across all exporters:
13+
//! - Glog: Read log files and compare lines
1314
//!
1415
//! Usage:
1516
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
1617
1718
#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock`
1819

20+
use std::path::PathBuf;
21+
1922
use anyhow::Result;
2023
use hyperactor_telemetry::*;
2124

22-
struct TestResults {}
25+
struct TestResults {
26+
glog_path: Option<PathBuf>,
27+
}
2328

2429
struct CorrectnessTestHarness {}
2530

@@ -37,7 +42,104 @@ impl CorrectnessTestHarness {
3742

3843
std::thread::sleep(std::time::Duration::from_millis(300));
3944

40-
Ok(TestResults {})
45+
Ok(TestResults {
46+
glog_path: Self::find_glog_path(),
47+
})
48+
}
49+
50+
fn find_glog_path() -> Option<PathBuf> {
51+
let username = whoami::username();
52+
let suffix = std::env::var(hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV)
53+
.map(|s| format!("_{}", s))
54+
.unwrap_or_default();
55+
let possible_paths = vec![
56+
format!("/tmp/{}/monarch_log{}.log", username, suffix),
57+
format!("/tmp/monarch_log{}.log", suffix),
58+
format!("/logs/dedicated_log_monarch{}.log", suffix),
59+
];
60+
61+
for path in possible_paths {
62+
if std::path::Path::new(&path).exists() {
63+
return Some(PathBuf::from(path));
64+
}
65+
}
66+
None
67+
}
68+
69+
/// Normalize a glog line by removing timestamp, thread ID, file:line, and prefix for comparison.
70+
/// Both old and unified implementations should now use the same format:
71+
/// "[prefix]Lmmdd HH:MM:SS.ffffff thread_id file:line] message, fields"
72+
///
73+
/// Normalized to: "L] message, fields" (prefix removed)
74+
fn normalize_glog_line(line: &str) -> String {
75+
// Find the level character position
76+
if let Some(level_pos) = line
77+
.chars()
78+
.position(|c| matches!(c, 'I' | 'D' | 'E' | 'W' | 'T'))
79+
{
80+
// Find the closing bracket that comes AFTER the level character (not the one in the prefix)
81+
if let Some(close_bracket) = line[level_pos..].find(']') {
82+
let actual_bracket_pos = level_pos + close_bracket;
83+
let level = &line[level_pos..=level_pos]; // e.g., "I"
84+
let rest = &line[actual_bracket_pos + 1..].trim_start(); // Everything after the real "]"
85+
// Don't include prefix - just level + content
86+
return format!("{}] {}", level, rest);
87+
}
88+
}
89+
90+
line.to_string()
91+
}
92+
93+
fn compare_glog_files(&self, old_file: &PathBuf, unified_file: &PathBuf) -> Result<()> {
94+
println!("\n[Comparing Glog Files]");
95+
println!(" Old: {}", old_file.display());
96+
println!(" Unified: {}", unified_file.display());
97+
98+
let old_content = std::fs::read_to_string(old_file)?;
99+
let unified_content = std::fs::read_to_string(unified_file)?;
100+
101+
println!(" Old lines: {}", old_content.lines().count());
102+
println!(" Unified lines: {}", unified_content.lines().count());
103+
104+
let old_lines: Vec<String> = old_content.lines().map(Self::normalize_glog_line).collect();
105+
106+
let unified_lines: Vec<String> = unified_content
107+
.lines()
108+
.map(Self::normalize_glog_line)
109+
.collect();
110+
111+
if old_lines.len() != unified_lines.len() {
112+
return Err(anyhow::anyhow!(
113+
"Line count mismatch: old={} unified={}",
114+
old_lines.len(),
115+
unified_lines.len()
116+
));
117+
}
118+
119+
let skip_lines = 1;
120+
121+
for (i, (old_line, unified_line)) in old_lines
122+
.iter()
123+
.zip(unified_lines.iter())
124+
.enumerate()
125+
.skip(skip_lines)
126+
{
127+
if old_line != unified_line {
128+
return Err(anyhow::anyhow!(
129+
"Line #{} mismatch:\n old: {}\n unified: {}",
130+
i,
131+
old_line,
132+
unified_line
133+
));
134+
}
135+
}
136+
137+
println!(
138+
" ✓ All {} lines match (skipped {} initialization lines)!",
139+
old_lines.len() - skip_lines,
140+
skip_lines
141+
);
142+
Ok(())
41143
}
42144
}
43145

@@ -140,10 +242,15 @@ fn main() -> Result<()> {
140242
let mut test_passed = true;
141243

142244
println!("\n[Running OLD implementation...]");
245+
let old_log_suffix = format!("{}_old", test_name);
143246
let old_status = std::process::Command::new(&args[0])
144247
.arg(test_name)
145248
.arg("--old")
146249
.env("TEST_LOG_PREFIX", "test")
250+
.env(
251+
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
252+
&old_log_suffix,
253+
)
147254
.status()?;
148255

149256
if !old_status.success() {
@@ -153,10 +260,15 @@ fn main() -> Result<()> {
153260
}
154261

155262
println!("\n[Running UNIFIED implementation...]");
263+
let unified_log_suffix = format!("{}_unified", test_name);
156264
let unified_status = std::process::Command::new(&args[0])
157265
.arg(test_name)
158266
.arg("--unified")
159267
.env("TEST_LOG_PREFIX", "test")
268+
.env(
269+
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
270+
&unified_log_suffix,
271+
)
160272
.status()?;
161273

162274
if !unified_status.success() {
@@ -165,11 +277,46 @@ fn main() -> Result<()> {
165277
test_passed = false;
166278
}
167279

280+
let username = whoami::username();
281+
let harness = CorrectnessTestHarness {};
282+
283+
// Compare glog files
284+
let old_log = PathBuf::from(format!("/tmp/{}/test_{}_old.log", username, test_name));
285+
let unified_log =
286+
PathBuf::from(format!("/tmp/{}/test_{}_unified.log", username, test_name));
287+
288+
if !old_log.exists() || !unified_log.exists() {
289+
println!("\n⚠ Glog files not found, skipping comparison");
290+
if !old_log.exists() {
291+
println!(" Missing: {}", old_log.display());
292+
}
293+
if !unified_log.exists() {
294+
println!(" Missing: {}", unified_log.display());
295+
}
296+
all_passed = false;
297+
test_passed = false;
298+
} else {
299+
match harness.compare_glog_files(&old_log, &unified_log) {
300+
Ok(()) => {
301+
println!("\n✓ Glog files match");
302+
}
303+
Err(e) => {
304+
println!("\n✗ Glog comparison FAILED: {}", e);
305+
all_passed = false;
306+
test_passed = false;
307+
}
308+
}
309+
}
310+
168311
if test_passed {
169312
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
170313
} else {
171314
println!("\n✗ Test FAILED: {}", test_name_to_display(test_name));
172315
}
316+
317+
// Clean up test files
318+
let _ = std::fs::remove_file(&old_log);
319+
let _ = std::fs::remove_file(&unified_log);
173320
}
174321

175322
println!("\n\n{}", "=".repeat(80));
@@ -186,6 +333,32 @@ fn main() -> Result<()> {
186333

187334
/// Called in child process
188335
fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
336+
let impl_suffix = if impl_type == "--old" {
337+
"old"
338+
} else {
339+
"unified"
340+
};
341+
let log_suffix = format!("{}_{}", test_name, impl_suffix);
342+
let username = whoami::username();
343+
let possible_log_paths = vec![
344+
format!("/tmp/{}/monarch_log_{}.log", username, log_suffix),
345+
format!("/tmp/monarch_log_{}.log", log_suffix),
346+
format!("/logs/dedicated_log_monarch_{}.log", log_suffix),
347+
];
348+
349+
for path in &possible_log_paths {
350+
if std::path::Path::new(path).exists() {
351+
let _ = std::fs::remove_file(path);
352+
println!("Cleaned up existing log file: {}", path);
353+
}
354+
}
355+
356+
let target_log_copy = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix);
357+
if std::path::Path::new(&target_log_copy).exists() {
358+
let _ = std::fs::remove_file(&target_log_copy);
359+
println!("Cleaned up existing copy file: {}", target_log_copy);
360+
}
361+
189362
let harness = CorrectnessTestHarness {};
190363

191364
let workload: fn() = match test_name {
@@ -200,7 +373,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
200373
}
201374
};
202375

203-
let _results = match impl_type {
376+
let results = match impl_type {
204377
"--old" => {
205378
println!("Running with OLD implementation...");
206379
harness.run(workload)?
@@ -222,6 +395,13 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
222395
}
223396
};
224397

398+
if let Some(glog_path) = results.glog_path {
399+
let target_path = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix);
400+
401+
std::fs::copy(&glog_path, &target_path)?;
402+
println!("Glog file copied to: {}", target_path);
403+
}
404+
225405
Ok(())
226406
}
227407

0 commit comments

Comments
 (0)