Skip to content

Commit c8f7202

Browse files
thomasywangmeta-codesync[bot]
authored andcommitted
Create Glog Exporter
Differential Revision: D87363775
1 parent 10913a5 commit c8f7202

File tree

4 files changed

+555
-58
lines changed

4 files changed

+555
-58
lines changed

hyperactor_telemetry/benches/correctness_test.rs

Lines changed: 184 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,22 @@
99
//! Correctness test harness comparing old vs unified telemetry implementations.
1010
//!
1111
//! This test harness runs identical workloads through both implementations and
12-
//! verifies that the outputs are equivalent across all exporters
12+
//! verifies that the outputs are equivalent across all exporters:
13+
//! - Glog: Read log files and compare lines
1314
//!
1415
//! Usage:
1516
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
1617
1718
#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock`
1819

20+
use std::path::PathBuf;
21+
1922
use anyhow::Result;
2023
use hyperactor_telemetry::*;
2124

22-
struct TestResults {}
25+
struct TestResults {
26+
glog_path: Option<PathBuf>,
27+
}
2328

2429
struct CorrectnessTestHarness {}
2530

@@ -37,7 +42,104 @@ impl CorrectnessTestHarness {
3742

3843
std::thread::sleep(std::time::Duration::from_millis(300));
3944

40-
Ok(TestResults {})
45+
Ok(TestResults {
46+
glog_path: Self::find_glog_path(),
47+
})
48+
}
49+
50+
fn find_glog_path() -> Option<PathBuf> {
51+
let username = whoami::username();
52+
let suffix = std::env::var(hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV)
53+
.map(|s| format!("_{}", s))
54+
.unwrap_or_default();
55+
let possible_paths = vec![
56+
format!("/tmp/{}/monarch_log{}.log", username, suffix),
57+
format!("/tmp/monarch_log{}.log", suffix),
58+
format!("/logs/dedicated_log_monarch{}.log", suffix),
59+
];
60+
61+
for path in possible_paths {
62+
if std::path::Path::new(&path).exists() {
63+
return Some(PathBuf::from(path));
64+
}
65+
}
66+
None
67+
}
68+
69+
/// Normalize a glog line by removing timestamp, thread ID, file:line, and prefix for comparison.
70+
/// Both old and unified implementations should now use the same format:
71+
/// "[prefix]Lmmdd HH:MM:SS.ffffff thread_id file:line] message, fields"
72+
///
73+
/// Normalized to: "L] message, fields" (prefix removed)
74+
fn normalize_glog_line(line: &str) -> String {
75+
// Find the level character position
76+
if let Some(level_pos) = line
77+
.chars()
78+
.position(|c| matches!(c, 'I' | 'D' | 'E' | 'W' | 'T'))
79+
{
80+
// Find the closing bracket that comes AFTER the level character (not the one in the prefix)
81+
if let Some(close_bracket) = line[level_pos..].find(']') {
82+
let actual_bracket_pos = level_pos + close_bracket;
83+
let level = &line[level_pos..=level_pos]; // e.g., "I"
84+
let rest = &line[actual_bracket_pos + 1..].trim_start(); // Everything after the real "]"
85+
// Don't include prefix - just level + content
86+
return format!("{}] {}", level, rest);
87+
}
88+
}
89+
90+
line.to_string()
91+
}
92+
93+
fn compare_glog_files(&self, old_file: &PathBuf, unified_file: &PathBuf) -> Result<()> {
94+
println!("\n[Comparing Glog Files]");
95+
println!(" Old: {}", old_file.display());
96+
println!(" Unified: {}", unified_file.display());
97+
98+
let old_content = std::fs::read_to_string(old_file)?;
99+
let unified_content = std::fs::read_to_string(unified_file)?;
100+
101+
println!(" Old lines: {}", old_content.lines().count());
102+
println!(" Unified lines: {}", unified_content.lines().count());
103+
104+
let old_lines: Vec<String> = old_content.lines().map(Self::normalize_glog_line).collect();
105+
106+
let unified_lines: Vec<String> = unified_content
107+
.lines()
108+
.map(Self::normalize_glog_line)
109+
.collect();
110+
111+
if old_lines.len() != unified_lines.len() {
112+
return Err(anyhow::anyhow!(
113+
"Line count mismatch: old={} unified={}",
114+
old_lines.len(),
115+
unified_lines.len()
116+
));
117+
}
118+
119+
let skip_lines = 1;
120+
121+
for (i, (old_line, unified_line)) in old_lines
122+
.iter()
123+
.zip(unified_lines.iter())
124+
.enumerate()
125+
.skip(skip_lines)
126+
{
127+
if old_line != unified_line {
128+
return Err(anyhow::anyhow!(
129+
"Line #{} mismatch:\n old: {}\n unified: {}",
130+
i,
131+
old_line,
132+
unified_line
133+
));
134+
}
135+
}
136+
137+
println!(
138+
" ✓ All {} lines match (skipped {} initialization lines)!",
139+
old_lines.len() - skip_lines,
140+
skip_lines
141+
);
142+
Ok(())
41143
}
42144
}
43145

@@ -140,10 +242,15 @@ fn main() -> Result<()> {
140242
let mut test_passed = true;
141243

142244
println!("\n[Running OLD implementation...]");
245+
let old_log_suffix = format!("{}_old", test_name);
143246
let old_status = std::process::Command::new(&args[0])
144247
.arg(test_name)
145248
.arg("--old")
146249
.env("TEST_LOG_PREFIX", "test")
250+
.env(
251+
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
252+
&old_log_suffix,
253+
)
147254
.status()?;
148255

149256
if !old_status.success() {
@@ -154,10 +261,15 @@ fn main() -> Result<()> {
154261
}
155262

156263
println!("\n[Running UNIFIED implementation...]");
264+
let unified_log_suffix = format!("{}_unified", test_name);
157265
let unified_status = std::process::Command::new(&args[0])
158266
.arg(test_name)
159267
.arg("--unified")
160268
.env("TEST_LOG_PREFIX", "test")
269+
.env(
270+
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
271+
&unified_log_suffix,
272+
)
161273
.status()?;
162274

163275
if !unified_status.success() {
@@ -167,11 +279,46 @@ fn main() -> Result<()> {
167279
continue;
168280
}
169281

282+
let username = whoami::username();
283+
let harness = CorrectnessTestHarness {};
284+
285+
// Compare glog files
286+
let old_log = PathBuf::from(format!("/tmp/{}/test_{}_old.log", username, test_name));
287+
let unified_log =
288+
PathBuf::from(format!("/tmp/{}/test_{}_unified.log", username, test_name));
289+
290+
if !old_log.exists() || !unified_log.exists() {
291+
println!("\n⚠ Glog files not found, skipping comparison");
292+
if !old_log.exists() {
293+
println!(" Missing: {}", old_log.display());
294+
}
295+
if !unified_log.exists() {
296+
println!(" Missing: {}", unified_log.display());
297+
}
298+
all_passed = false;
299+
test_passed = false;
300+
} else {
301+
match harness.compare_glog_files(&old_log, &unified_log) {
302+
Ok(()) => {
303+
println!("\n✓ Glog files match");
304+
}
305+
Err(e) => {
306+
println!("\n✗ Glog comparison FAILED: {}", e);
307+
all_passed = false;
308+
test_passed = false;
309+
}
310+
}
311+
}
312+
170313
if test_passed {
171314
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
172315
} else {
173316
println!("\n✗ Test FAILED: {}", test_name_to_display(test_name));
174317
}
318+
319+
// Clean up test files
320+
let _ = std::fs::remove_file(&old_log);
321+
let _ = std::fs::remove_file(&unified_log);
175322
}
176323

177324
println!("\n\n{}", "=".repeat(80));
@@ -188,6 +335,32 @@ fn main() -> Result<()> {
188335

189336
/// Called in child process
190337
fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
338+
let impl_suffix = if impl_type == "--old" {
339+
"old"
340+
} else {
341+
"unified"
342+
};
343+
let log_suffix = format!("{}_{}", test_name, impl_suffix);
344+
let username = whoami::username();
345+
let possible_log_paths = vec![
346+
format!("/tmp/{}/monarch_log_{}.log", username, log_suffix),
347+
format!("/tmp/monarch_log_{}.log", log_suffix),
348+
format!("/logs/dedicated_log_monarch_{}.log", log_suffix),
349+
];
350+
351+
for path in &possible_log_paths {
352+
if std::path::Path::new(path).exists() {
353+
let _ = std::fs::remove_file(path);
354+
println!("Cleaned up existing log file: {}", path);
355+
}
356+
}
357+
358+
let target_log_copy = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix);
359+
if std::path::Path::new(&target_log_copy).exists() {
360+
let _ = std::fs::remove_file(&target_log_copy);
361+
println!("Cleaned up existing copy file: {}", target_log_copy);
362+
}
363+
191364
let harness = CorrectnessTestHarness {};
192365

193366
let workload: fn() = match test_name {
@@ -202,7 +375,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
202375
}
203376
};
204377

205-
let _results = match impl_type {
378+
let results = match impl_type {
206379
"--old" => {
207380
println!("Running with OLD implementation...");
208381
harness.run(workload)?
@@ -224,6 +397,13 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
224397
}
225398
};
226399

400+
if let Some(glog_path) = results.glog_path {
401+
let target_path = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix);
402+
403+
std::fs::copy(&glog_path, &target_path)?;
404+
println!("Glog file copied to: {}", target_path);
405+
}
406+
227407
Ok(())
228408
}
229409

0 commit comments

Comments
 (0)