Skip to content

Commit 7bf3f51

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Create Glog Exporter (#1929)
Summary: Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff we will create an `Exporter` meant to replace `file_layer` and update our correctness test to ensure that files produced by both the existing and the new implementation are the same Differential Revision: D87363775
1 parent e68ae13 commit 7bf3f51

File tree

4 files changed

+502
-18
lines changed

4 files changed

+502
-18
lines changed

hyperactor_telemetry/benches/correctness_test.rs

Lines changed: 183 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,20 @@
99
//! Correctness test harness comparing old vs unified telemetry implementations.
1010
//!
1111
//! This test harness runs identical workloads through both implementations and
12-
//! verifies that the outputs are equivalent across all exporters
12+
//! verifies that the outputs are equivalent across all exporters:
13+
//! - Glog: Read log files and compare lines
1314
//!
1415
//! Usage:
1516
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
1617
18+
use std::path::PathBuf;
19+
1720
use anyhow::Result;
1821
use hyperactor_telemetry::*;
1922

20-
struct TestResults {}
23+
struct TestResults {
24+
glog_path: Option<PathBuf>,
25+
}
2126

2227
struct CorrectnessTestHarness {}
2328

@@ -35,7 +40,104 @@ impl CorrectnessTestHarness {
3540

3641
std::thread::sleep(std::time::Duration::from_millis(300));
3742

38-
Ok(TestResults {})
43+
Ok(TestResults {
44+
glog_path: Self::find_glog_path(),
45+
})
46+
}
47+
48+
fn find_glog_path() -> Option<PathBuf> {
49+
let username = whoami::username();
50+
let suffix = std::env::var(hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV)
51+
.map(|s| format!("_{}", s))
52+
.unwrap_or_default();
53+
let possible_paths = vec![
54+
format!("/tmp/{}/monarch_log{}.log", username, suffix),
55+
format!("/tmp/monarch_log{}.log", suffix),
56+
format!("/logs/dedicated_log_monarch{}.log", suffix),
57+
];
58+
59+
for path in possible_paths {
60+
if std::path::Path::new(&path).exists() {
61+
return Some(PathBuf::from(path));
62+
}
63+
}
64+
None
65+
}
66+
67+
/// Normalize a glog line by removing timestamp, thread ID, file:line, and prefix for comparison.
68+
/// Both old and unified implementations should now use the same format:
69+
/// "[prefix]Lmmdd HH:MM:SS.ffffff thread_id file:line] message, fields"
70+
///
71+
/// Normalized to: "L] message, fields" (prefix removed)
72+
fn normalize_glog_line(line: &str) -> String {
73+
// Find the level character position
74+
if let Some(level_pos) = line
75+
.chars()
76+
.position(|c| matches!(c, 'I' | 'D' | 'E' | 'W' | 'T'))
77+
{
78+
// Find the closing bracket that comes AFTER the level character (not the one in the prefix)
79+
if let Some(close_bracket) = line[level_pos..].find(']') {
80+
let actual_bracket_pos = level_pos + close_bracket;
81+
let level = &line[level_pos..=level_pos]; // e.g., "I"
82+
let rest = &line[actual_bracket_pos + 1..].trim_start(); // Everything after the real "]"
83+
// Don't include prefix - just level + content
84+
return format!("{}] {}", level, rest);
85+
}
86+
}
87+
88+
line.to_string()
89+
}
90+
91+
fn compare_glog_files(&self, old_file: &PathBuf, unified_file: &PathBuf) -> Result<()> {
92+
println!("\n[Comparing Glog Files]");
93+
println!(" Old: {}", old_file.display());
94+
println!(" Unified: {}", unified_file.display());
95+
96+
let old_content = std::fs::read_to_string(old_file)?;
97+
let unified_content = std::fs::read_to_string(unified_file)?;
98+
99+
println!(" Old lines: {}", old_content.lines().count());
100+
println!(" Unified lines: {}", unified_content.lines().count());
101+
102+
let old_lines: Vec<String> = old_content.lines().map(Self::normalize_glog_line).collect();
103+
104+
let unified_lines: Vec<String> = unified_content
105+
.lines()
106+
.map(Self::normalize_glog_line)
107+
.collect();
108+
109+
if old_lines.len() != unified_lines.len() {
110+
return Err(anyhow::anyhow!(
111+
"Line count mismatch: old={} unified={}",
112+
old_lines.len(),
113+
unified_lines.len()
114+
));
115+
}
116+
117+
let skip_lines = 1;
118+
119+
for (i, (old_line, unified_line)) in old_lines
120+
.iter()
121+
.zip(unified_lines.iter())
122+
.enumerate()
123+
.skip(skip_lines)
124+
{
125+
if old_line != unified_line {
126+
return Err(anyhow::anyhow!(
127+
"Line #{} mismatch:\n old: {}\n unified: {}",
128+
i,
129+
old_line,
130+
unified_line
131+
));
132+
}
133+
}
134+
135+
println!(
136+
" ✓ All {} lines match (skipped {} initialization lines)!",
137+
old_lines.len() - skip_lines,
138+
skip_lines
139+
);
140+
Ok(())
39141
}
40142
}
41143

@@ -138,10 +240,15 @@ fn main() -> Result<()> {
138240
let mut test_passed = true;
139241

140242
println!("\n[Running OLD implementation...]");
243+
let old_log_suffix = format!("{}_old", test_name);
141244
let old_status = std::process::Command::new(&args[0])
142245
.arg(test_name)
143246
.arg("--old")
144247
.env("TEST_LOG_PREFIX", "test")
248+
.env(
249+
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
250+
&old_log_suffix,
251+
)
145252
.status()?;
146253

147254
if !old_status.success() {
@@ -152,10 +259,15 @@ fn main() -> Result<()> {
152259
}
153260

154261
println!("\n[Running UNIFIED implementation...]");
262+
let unified_log_suffix = format!("{}_unified", test_name);
155263
let unified_status = std::process::Command::new(&args[0])
156264
.arg(test_name)
157265
.arg("--unified")
158266
.env("TEST_LOG_PREFIX", "test")
267+
.env(
268+
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
269+
&unified_log_suffix,
270+
)
159271
.status()?;
160272

161273
if !unified_status.success() {
@@ -165,11 +277,46 @@ fn main() -> Result<()> {
165277
continue;
166278
}
167279

280+
let username = whoami::username();
281+
let harness = CorrectnessTestHarness {};
282+
283+
// Compare glog files
284+
let old_log = PathBuf::from(format!("/tmp/{}/test_{}_old.log", username, test_name));
285+
let unified_log =
286+
PathBuf::from(format!("/tmp/{}/test_{}_unified.log", username, test_name));
287+
288+
if !old_log.exists() || !unified_log.exists() {
289+
println!("\n⚠ Glog files not found, skipping comparison");
290+
if !old_log.exists() {
291+
println!(" Missing: {}", old_log.display());
292+
}
293+
if !unified_log.exists() {
294+
println!(" Missing: {}", unified_log.display());
295+
}
296+
all_passed = false;
297+
test_passed = false;
298+
} else {
299+
match harness.compare_glog_files(&old_log, &unified_log) {
300+
Ok(()) => {
301+
println!("\n✓ Glog files match");
302+
}
303+
Err(e) => {
304+
println!("\n✗ Glog comparison FAILED: {}", e);
305+
all_passed = false;
306+
test_passed = false;
307+
}
308+
}
309+
}
310+
168311
if test_passed {
169312
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
170313
} else {
171314
println!("\n✗ Test FAILED: {}", test_name_to_display(test_name));
172315
}
316+
317+
// Clean up test files
318+
let _ = std::fs::remove_file(&old_log);
319+
let _ = std::fs::remove_file(&unified_log);
173320
}
174321

175322
println!("\n\n{}", "=".repeat(80));
@@ -186,6 +333,32 @@ fn main() -> Result<()> {
186333

187334
/// Called in child process
188335
fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
336+
let impl_suffix = if impl_type == "--old" {
337+
"old"
338+
} else {
339+
"unified"
340+
};
341+
let log_suffix = format!("{}_{}", test_name, impl_suffix);
342+
let username = whoami::username();
343+
let possible_log_paths = vec![
344+
format!("/tmp/{}/monarch_log_{}.log", username, log_suffix),
345+
format!("/tmp/monarch_log_{}.log", log_suffix),
346+
format!("/logs/dedicated_log_monarch_{}.log", log_suffix),
347+
];
348+
349+
for path in &possible_log_paths {
350+
if std::path::Path::new(path).exists() {
351+
let _ = std::fs::remove_file(path);
352+
println!("Cleaned up existing log file: {}", path);
353+
}
354+
}
355+
356+
let target_log_copy = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix);
357+
if std::path::Path::new(&target_log_copy).exists() {
358+
let _ = std::fs::remove_file(&target_log_copy);
359+
println!("Cleaned up existing copy file: {}", target_log_copy);
360+
}
361+
189362
let harness = CorrectnessTestHarness {};
190363

191364
let workload: fn() = match test_name {
@@ -222,6 +395,13 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
222395
}
223396
};
224397

398+
if let Some(glog_path) = results.glog_path {
399+
let target_path = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix);
400+
401+
std::fs::copy(&glog_path, &target_path)?;
402+
println!("Glog file copied to: {}", target_path);
403+
}
404+
225405
Ok(())
226406
}
227407

0 commit comments

Comments
 (0)