Skip to content

Commit 2643bcd

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Create Sqlite Exporter (#1930)
Summary: Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff, we will implement an `Exporter` meant to replace `get_reloadable_sqlite_layer()`. We will update our correctness test to query the tables at the end of each workload and ensure all rows are the same Reviewed By: mariusae Differential Revision: D87363774
1 parent 3d36314 commit 2643bcd

File tree

6 files changed

+525
-34
lines changed

6 files changed

+525
-34
lines changed

hyperactor_telemetry/benches/correctness_test.rs

Lines changed: 206 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
//! This test harness runs identical workloads through both implementations and
1212
//! verifies that the outputs are equivalent across all exporters:
1313
//! - Glog: Read log files and compare lines
14+
//! - SQLite: Query database and compare rows
1415
//!
1516
//! Usage:
1617
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
@@ -24,12 +25,15 @@ use hyperactor_telemetry::*;
2425

2526
struct TestResults {
2627
glog_path: Option<PathBuf>,
28+
sqlite_path: Option<PathBuf>,
29+
#[allow(dead_code)]
30+
_sqlite_tracing: Option<hyperactor_telemetry::sqlite::SqliteTracing>,
2731
}
2832

2933
struct CorrectnessTestHarness {}
3034

3135
impl CorrectnessTestHarness {
32-
fn run<F>(&self, workload: F) -> Result<TestResults>
36+
fn run<F>(&self, workload: F, unified: bool) -> Result<TestResults>
3337
where
3438
F: Fn(),
3539
{
@@ -38,12 +42,45 @@ impl CorrectnessTestHarness {
3842
Some("TEST_LOG_PREFIX".to_string()),
3943
);
4044

45+
let sqlite_tracing = if unified {
46+
None
47+
} else {
48+
let sqlite_tracing = hyperactor_telemetry::sqlite::SqliteTracing::new()
49+
.expect("Failed to create SqliteTracing");
50+
let db_path = sqlite_tracing.db_path().expect("No db_path");
51+
println!("SqliteTracing created successfully, db_path: {:?}", db_path);
52+
println!("Database exists: {}", db_path.exists());
53+
Some(sqlite_tracing)
54+
};
55+
4156
workload();
4257

4358
std::thread::sleep(std::time::Duration::from_millis(300));
4459

60+
let username = whoami::username();
61+
let possible_paths = vec![
62+
format!(
63+
"/tmp/{}/hyperactor_trace_{}.db",
64+
username,
65+
std::process::id()
66+
),
67+
format!("/tmp/hyperactor_trace_{}.db", std::process::id()),
68+
format!("/tmp/traces/hyperactor_trace_{}.db", std::process::id()),
69+
format!("./hyperactor_trace_{}.db", std::process::id()),
70+
];
71+
72+
let mut sqlite_path = None;
73+
for path in possible_paths {
74+
if std::path::Path::new(&path).exists() {
75+
sqlite_path = Some(PathBuf::from(path));
76+
break;
77+
}
78+
}
79+
4580
Ok(TestResults {
81+
sqlite_path,
4682
glog_path: Self::find_glog_path(),
83+
_sqlite_tracing: sqlite_tracing,
4784
})
4885
}
4986

@@ -141,6 +178,107 @@ impl CorrectnessTestHarness {
141178
);
142179
Ok(())
143180
}
181+
182+
fn compare_sqlite_databases(&self, old_db: &PathBuf, unified_db: &PathBuf) -> Result<()> {
183+
println!("\n[Comparing SQLite Databases]");
184+
println!(" Old: {}", old_db.display());
185+
println!(" Unified: {}", unified_db.display());
186+
187+
let old_conn = rusqlite::Connection::open(old_db)?;
188+
189+
old_conn.execute(&format!("ATTACH '{}' AS unified", unified_db.display()), [])?;
190+
191+
let tables = vec!["log_events", "messages", "actor_lifecycle"];
192+
193+
for table in tables {
194+
println!("\n Comparing table: {}", table);
195+
196+
let old_count: i64 =
197+
old_conn.query_row(&format!("SELECT COUNT(*) FROM main.{}", table), [], |row| {
198+
row.get(0)
199+
})?;
200+
let unified_count: i64 = old_conn.query_row(
201+
&format!("SELECT COUNT(*) FROM unified.{}", table),
202+
[],
203+
|row| row.get(0),
204+
)?;
205+
206+
println!(" Old rows: {}", old_count);
207+
println!(" Unified rows: {}", unified_count);
208+
209+
if old_count != unified_count {
210+
return Err(anyhow::anyhow!(
211+
"Table {} row count mismatch: old={} unified={}",
212+
table,
213+
old_count,
214+
unified_count
215+
));
216+
}
217+
218+
let mut stmt = old_conn.prepare(&format!("PRAGMA table_info({})", table))?;
219+
let columns: Vec<String> = stmt
220+
.query_map([], |row| {
221+
let name: String = row.get(1)?;
222+
Ok(name)
223+
})?
224+
.collect::<Result<Vec<_>, _>>()?
225+
.into_iter()
226+
.filter(|col| col != "time_us") // Ignore time_us column
227+
.collect();
228+
229+
if columns.is_empty() {
230+
continue;
231+
}
232+
233+
let col_list = columns.join(", ");
234+
235+
let diff_query = format!(
236+
"SELECT '-' as diff_type, {cols} FROM main.{table}
237+
EXCEPT
238+
SELECT '-' as diff_type, {cols} FROM unified.{table}
239+
UNION ALL
240+
SELECT '+' as diff_type, {cols} FROM unified.{table}
241+
EXCEPT
242+
SELECT '+' as diff_type, {cols} FROM main.{table}",
243+
table = table,
244+
cols = col_list
245+
);
246+
247+
let mut stmt = old_conn.prepare(&diff_query)?;
248+
let mut rows = stmt.query([])?;
249+
250+
let mut diffs = Vec::new();
251+
while let Some(row) = rows.next()? {
252+
let diff_type: String = row.get(0)?;
253+
let color = if diff_type == "-" {
254+
"\x1b[31m" // red
255+
} else {
256+
"\x1b[32m" // green
257+
};
258+
let mut row_str = format!("{}{} ", color, diff_type);
259+
for i in 1..row.as_ref().column_count() {
260+
let col_name = row.as_ref().column_name(i)?;
261+
let val: Option<String> = row.get(i).ok();
262+
row_str.push_str(&format!("{}={:?}, ", col_name, val));
263+
}
264+
row_str.push_str("\x1b[0m"); // reset color
265+
diffs.push(row_str);
266+
}
267+
268+
if !diffs.is_empty() {
269+
return Err(anyhow::anyhow!(
270+
"Table {} has differences:\n{}",
271+
table,
272+
diffs.join("\n")
273+
));
274+
}
275+
276+
println!(" ✓ {} rows match", old_count);
277+
}
278+
279+
println!("\n ✓ All tables match!");
280+
Ok(())
281+
}
144282
}
145283

146284
// ============================================================================
@@ -251,6 +389,7 @@ fn main() -> Result<()> {
251389
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
252390
&old_log_suffix,
253391
)
392+
.env("ENABLE_SQLITE_TRACING", "1")
254393
.status()?;
255394

256395
if !old_status.success() {
@@ -269,6 +408,7 @@ fn main() -> Result<()> {
269408
hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV,
270409
&unified_log_suffix,
271410
)
411+
.env("ENABLE_SQLITE_TRACING", "1")
272412
.status()?;
273413

274414
if !unified_status.success() {
@@ -308,13 +448,41 @@ fn main() -> Result<()> {
308448
}
309449
}
310450

451+
// Compare SQLite databases
452+
let old_db = PathBuf::from(format!("/tmp/{}/test_{}_old.db", username, test_name));
453+
let unified_db = PathBuf::from(format!("/tmp/{}/test_{}_unified.db", username, test_name));
454+
455+
// SQLite databases are now required - both implementations should create them
456+
if !old_db.exists() {
457+
println!("\n✗ OLD database not found: {}", old_db.display());
458+
all_passed = false;
459+
test_passed = false;
460+
} else if !unified_db.exists() {
461+
println!("\n✗ UNIFIED database not found: {}", unified_db.display());
462+
all_passed = false;
463+
test_passed = false;
464+
} else {
465+
match harness.compare_sqlite_databases(&old_db, &unified_db) {
466+
Ok(()) => {
467+
println!("\n✓ SQLite databases match");
468+
}
469+
Err(e) => {
470+
println!("\n✗ SQLite comparison FAILED: {}", e);
471+
all_passed = false;
472+
test_passed = false;
473+
}
474+
}
475+
}
476+
311477
if test_passed {
312478
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
313479
} else {
314480
println!("\n✗ Test FAILED: {}", test_name_to_display(test_name));
315481
}
316482

317483
// Clean up test files
484+
let _ = std::fs::remove_file(&old_db);
485+
let _ = std::fs::remove_file(&unified_db);
318486
let _ = std::fs::remove_file(&old_log);
319487
let _ = std::fs::remove_file(&unified_log);
320488
}
@@ -376,7 +544,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
376544
let results = match impl_type {
377545
"--old" => {
378546
println!("Running with OLD implementation...");
379-
harness.run(workload)?
547+
harness.run(workload, false)?
380548
}
381549
"--unified" => {
382550
println!("Running with UNIFIED implementation...");
@@ -385,7 +553,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
385553
unsafe {
386554
std::env::set_var("USE_UNIFIED_LAYER", "1");
387555
}
388-
harness.run(workload)?
556+
harness.run(workload, true)?
389557
}
390558
_ => {
391559
return Err(anyhow::anyhow!(
@@ -402,6 +570,41 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
402570
println!("Glog file copied to: {}", target_path);
403571
}
404572

573+
if let Some(db_path) = results.sqlite_path {
574+
let target_path = format!("/tmp/{}/test_{}_{}.db", username, test_name, impl_suffix);
575+
576+
println!(
577+
"Attempting to copy database from {} to {}",
578+
db_path.display(),
579+
target_path
580+
);
581+
std::fs::copy(&db_path, &target_path).map_err(|e| {
582+
anyhow::anyhow!(
583+
"Failed to copy database from {} to {}: {}",
584+
db_path.display(),
585+
target_path,
586+
e
587+
)
588+
})?;
589+
590+
// Also copy WAL files if they exist (SQLite WAL mode)
591+
let wal_path = format!("{}-wal", db_path.display());
592+
let shm_path = format!("{}-shm", db_path.display());
593+
let target_wal = format!("{}-wal", target_path);
594+
let target_shm = format!("{}-shm", target_path);
595+
596+
if std::path::Path::new(&wal_path).exists() {
597+
let _ = std::fs::copy(&wal_path, &target_wal);
598+
}
599+
if std::path::Path::new(&shm_path).exists() {
600+
let _ = std::fs::copy(&shm_path, &target_shm);
601+
}
602+
603+
println!("Database copied to: {}", target_path);
604+
} else {
605+
println!("Warning: No SQLite database path found");
606+
}
607+
405608
Ok(())
406609
}
407610

0 commit comments

Comments
 (0)