Skip to content

Commit 670555d

Browse files
thomasywangmeta-codesync[bot]
authored andcommitted
Create Sqlite Exporter (#1930)
Summary: Pull Request resolved: #1930 Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff, we will implement an `Exporter` meant to replace `get_reloadable_sqlite_layer()`. We will update our correctness test to query the tables at the end of each workload and ensure all rows are the same Reviewed By: mariusae Differential Revision: D87363774
1 parent c8f7202 commit 670555d

File tree

6 files changed

+529
-34
lines changed

6 files changed

+529
-34
lines changed

hyperactor_telemetry/benches/correctness_test.rs

Lines changed: 206 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
//! This test harness runs identical workloads through both implementations and
1212
//! verifies that the outputs are equivalent across all exporters:
1313
//! - Glog: Read log files and compare lines
14+
//! - SQLite: Query database and compare rows
1415
//!
1516
//! Usage:
1617
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
@@ -24,12 +25,15 @@ use hyperactor_telemetry::*;
2425

2526
struct TestResults {
2627
glog_path: Option<PathBuf>,
28+
sqlite_path: Option<PathBuf>,
29+
#[allow(dead_code)]
30+
_sqlite_tracing: Option<hyperactor_telemetry::sqlite::SqliteTracing>,
2731
}
2832

2933
struct CorrectnessTestHarness {}
3034

3135
impl CorrectnessTestHarness {
32-
fn run<F>(&self, workload: F) -> Result<TestResults>
36+
fn run<F>(&self, workload: F, unified: bool) -> Result<TestResults>
3337
where
3438
F: Fn(),
3539
{
@@ -38,12 +42,45 @@ impl CorrectnessTestHarness {
3842
Some("TEST_LOG_PREFIX".to_string()),
3943
);
4044

45+
let sqlite_tracing = if unified {
46+
None
47+
} else {
48+
let sqlite_tracing = hyperactor_telemetry::sqlite::SqliteTracing::new()
49+
.expect("Failed to create SqliteTracing");
50+
let db_path = sqlite_tracing.db_path().expect("No db_path");
51+
println!("SqliteTracing created successfully, db_path: {:?}", db_path);
52+
println!("Database exists: {}", db_path.exists());
53+
Some(sqlite_tracing)
54+
};
55+
4156
workload();
4257

4358
std::thread::sleep(std::time::Duration::from_millis(300));
4459

60+
let username = whoami::username();
61+
let possible_paths = vec![
62+
format!(
63+
"/tmp/{}/hyperactor_trace_{}.db",
64+
username,
65+
std::process::id()
66+
),
67+
format!("/tmp/hyperactor_trace_{}.db", std::process::id()),
68+
format!("/tmp/traces/hyperactor_trace_{}.db", std::process::id()),
69+
format!("./hyperactor_trace_{}.db", std::process::id()),
70+
];
71+
72+
let mut sqlite_path = None;
73+
for path in possible_paths {
74+
if std::path::Path::new(&path).exists() {
75+
sqlite_path = Some(PathBuf::from(path));
76+
break;
77+
}
78+
}
79+
4580
Ok(TestResults {
81+
sqlite_path,
4682
glog_path: Self::find_glog_path(),
83+
_sqlite_tracing: sqlite_tracing,
4784
})
4885
}
4986

@@ -141,6 +178,107 @@ impl CorrectnessTestHarness {
141178
);
142179
Ok(())
143180
}
181+
182+
fn compare_sqlite_databases(&self, old_db: &PathBuf, unified_db: &PathBuf) -> Result<()> {
183+
println!("\n[Comparing SQLite Databases]");
184+
println!(" Old: {}", old_db.display());
185+
println!(" Unified: {}", unified_db.display());
186+
187+
let old_conn = rusqlite::Connection::open(old_db)?;
188+
189+
old_conn.execute(&format!("ATTACH '{}' AS unified", unified_db.display()), [])?;
190+
191+
let tables = vec!["log_events", "messages", "actor_lifecycle"];
192+
193+
for table in tables {
194+
println!("\n Comparing table: {}", table);
195+
196+
let old_count: i64 =
197+
old_conn.query_row(&format!("SELECT COUNT(*) FROM main.{}", table), [], |row| {
198+
row.get(0)
199+
})?;
200+
let unified_count: i64 = old_conn.query_row(
201+
&format!("SELECT COUNT(*) FROM unified.{}", table),
202+
[],
203+
|row| row.get(0),
204+
)?;
205+
206+
println!(" Old rows: {}", old_count);
207+
println!(" Unified rows: {}", unified_count);
208+
209+
if old_count != unified_count {
210+
return Err(anyhow::anyhow!(
211+
"Table {} row count mismatch: old={} unified={}",
212+
table,
213+
old_count,
214+
unified_count
215+
));
216+
}
217+
218+
let mut stmt = old_conn.prepare(&format!("PRAGMA table_info({})", table))?;
219+
let columns: Vec<String> = stmt
220+
.query_map([], |row| {
221+
let name: String = row.get(1)?;
222+
Ok(name)
223+
})?
224+
.collect::<Result<Vec<_>, _>>()?
225+
.into_iter()
226+
.filter(|col| col != "time_us") // Ignore time_us column
227+
.collect();
228+
229+
if columns.is_empty() {
230+
continue;
231+
}
232+
233+
let col_list = columns.join(", ");
234+
235+
let diff_query = format!(
236+
"SELECT '-' as diff_type, {cols} FROM main.{table}
237+
EXCEPT
238+
SELECT '-' as diff_type, {cols} FROM unified.{table}
239+
UNION ALL
240+
SELECT '+' as diff_type, {cols} FROM unified.{table}
241+
EXCEPT
242+
SELECT '+' as diff_type, {cols} FROM main.{table}",
243+
table = table,
244+
cols = col_list
245+
);
246+
247+
let mut stmt = old_conn.prepare(&diff_query)?;
248+
let mut rows = stmt.query([])?;
249+
250+
let mut diffs = Vec::new();
251+
while let Some(row) = rows.next()? {
252+
let diff_type: String = row.get(0)?;
253+
let color = if diff_type == "-" {
254+
"\x1b[31m" // red
255+
} else {
256+
"\x1b[32m" // green
257+
};
258+
let mut row_str = format!("{}{} ", color, diff_type);
259+
for i in 1..row.as_ref().column_count() {
260+
let col_name = row.as_ref().column_name(i)?;
261+
let val: Option<String> = row.get(i).ok();
262+
row_str.push_str(&format!("{}={:?}, ", col_name, val));
263+
}
264+
row_str.push_str("\x1b[0m"); // reset color
265+
diffs.push(row_str);
266+
}
267+
268+
if !diffs.is_empty() {
269+
return Err(anyhow::anyhow!(
270+
"Table {} has differences:\n{}",
271+
table,
272+
diffs.join("\n")
273+
));
274+
}
275+
276+
println!(" ✓ {} rows match", old_count);
277+
}
278+
279+
println!("\n ✓ All tables match!");
280+
Ok(())
281+
}
144282
}
145283

146284
// ============================================================================
@@ -251,6 +389,7 @@ fn main() -> Result<()> {
251389
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
252390
&old_log_suffix,
253391
)
392+
.env("ENABLE_SQLITE_TRACING", "1")
254393
.status()?;
255394

256395
if !old_status.success() {
@@ -270,6 +409,7 @@ fn main() -> Result<()> {
270409
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
271410
&unified_log_suffix,
272411
)
412+
.env("ENABLE_SQLITE_TRACING", "1")
273413
.status()?;
274414

275415
if !unified_status.success() {
@@ -310,13 +450,41 @@ fn main() -> Result<()> {
310450
}
311451
}
312452

453+
// Compare SQLite databases
454+
let old_db = PathBuf::from(format!("/tmp/{}/test_{}_old.db", username, test_name));
455+
let unified_db = PathBuf::from(format!("/tmp/{}/test_{}_unified.db", username, test_name));
456+
457+
// SQLite databases are now required - both implementations should create them
458+
if !old_db.exists() {
459+
println!("\n✗ OLD database not found: {}", old_db.display());
460+
all_passed = false;
461+
test_passed = false;
462+
} else if !unified_db.exists() {
463+
println!("\n✗ UNIFIED database not found: {}", unified_db.display());
464+
all_passed = false;
465+
test_passed = false;
466+
} else {
467+
match harness.compare_sqlite_databases(&old_db, &unified_db) {
468+
Ok(()) => {
469+
println!("\n✓ SQLite databases match");
470+
}
471+
Err(e) => {
472+
println!("\n✗ SQLite comparison FAILED: {}", e);
473+
all_passed = false;
474+
test_passed = false;
475+
}
476+
}
477+
}
478+
313479
if test_passed {
314480
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
315481
} else {
316482
println!("\n✗ Test FAILED: {}", test_name_to_display(test_name));
317483
}
318484

319485
// Clean up test files
486+
let _ = std::fs::remove_file(&old_db);
487+
let _ = std::fs::remove_file(&unified_db);
320488
let _ = std::fs::remove_file(&old_log);
321489
let _ = std::fs::remove_file(&unified_log);
322490
}
@@ -378,7 +546,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
378546
let results = match impl_type {
379547
"--old" => {
380548
println!("Running with OLD implementation...");
381-
harness.run(workload)?
549+
harness.run(workload, false)?
382550
}
383551
"--unified" => {
384552
println!("Running with UNIFIED implementation...");
@@ -387,7 +555,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
387555
unsafe {
388556
std::env::set_var("USE_UNIFIED_LAYER", "1");
389557
}
390-
harness.run(workload)?
558+
harness.run(workload, true)?
391559
}
392560
_ => {
393561
return Err(anyhow::anyhow!(
@@ -404,6 +572,41 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
404572
println!("Glog file copied to: {}", target_path);
405573
}
406574

575+
if let Some(db_path) = results.sqlite_path {
576+
let target_path = format!("/tmp/{}/test_{}_{}.db", username, test_name, impl_suffix);
577+
578+
println!(
579+
"Attempting to copy database from {} to {}",
580+
db_path.display(),
581+
target_path
582+
);
583+
std::fs::copy(&db_path, &target_path).map_err(|e| {
584+
anyhow::anyhow!(
585+
"Failed to copy database from {} to {}: {}",
586+
db_path.display(),
587+
target_path,
588+
e
589+
)
590+
})?;
591+
592+
// Also copy WAL files if they exist (SQLite WAL mode)
593+
let wal_path = format!("{}-wal", db_path.display());
594+
let shm_path = format!("{}-shm", db_path.display());
595+
let target_wal = format!("{}-wal", target_path);
596+
let target_shm = format!("{}-shm", target_path);
597+
598+
if std::path::Path::new(&wal_path).exists() {
599+
let _ = std::fs::copy(&wal_path, &target_wal);
600+
}
601+
if std::path::Path::new(&shm_path).exists() {
602+
let _ = std::fs::copy(&shm_path, &target_shm);
603+
}
604+
605+
println!("Database copied to: {}", target_path);
606+
} else {
607+
println!("Warning: No SQLite database path found");
608+
}
609+
407610
Ok(())
408611
}
409612

0 commit comments

Comments
 (0)