Skip to content

Commit 56b9674

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Create Scuba Exporter (#1931)
Summary: Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff we will create an `Exporter` meant to replace `otel::tracing_layer()` (which is really just an alias for scuba). We log to two different scuba tables: monarch_tracing and monarch_executions. We will test correctness by injecting a mock scuba client that simply appends all samples it intends to log, and ensure that both the old and the unified implementations produce the same samples Reviewed By: mariusae Differential Revision: D87363772
1 parent 0f37fe1 commit 56b9674

File tree

4 files changed

+284
-17
lines changed

4 files changed

+284
-17
lines changed

hyperactor_telemetry/benches/correctness_test.rs

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
//! verifies that the outputs are equivalent across all exporters:
1313
//! - Glog: Read log files and compare lines
1414
//! - SQLite: Query database and compare rows
15+
//! - Scuba: Mock client and compare logged samples
1516
//!
1617
//! Usage:
1718
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
1819
1920
#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock`
2021

22+
use std::collections::HashSet;
2123
use std::path::PathBuf;
2224

2325
use anyhow::Result;
@@ -28,6 +30,7 @@ struct TestResults {
2830
sqlite_path: Option<PathBuf>,
2931
#[allow(dead_code)]
3032
_sqlite_tracing: Option<hyperactor_telemetry::sqlite::SqliteTracing>,
33+
scuba_tracing_samples: Vec<TelemetrySample>,
3134
}
3235

3336
struct CorrectnessTestHarness {}
@@ -37,7 +40,7 @@ impl CorrectnessTestHarness {
3740
where
3841
F: Fn(),
3942
{
40-
initialize_logging_with_log_prefix(
43+
let test_handle = initialize_logging_with_log_prefix_mock_scuba(
4144
DefaultTelemetryClock {},
4245
Some("TEST_LOG_PREFIX".to_string()),
4346
);
@@ -77,12 +80,103 @@ impl CorrectnessTestHarness {
7780
}
7881
}
7982

83+
let scuba_tracing_samples = test_handle.get_tracing_samples();
84+
8085
Ok(TestResults {
8186
sqlite_path,
8287
glog_path: Self::find_glog_path(),
88+
scuba_tracing_samples,
8389
_sqlite_tracing: sqlite_tracing,
8490
})
8591
}
92+
fn compare_scuba_samples(
93+
&self,
94+
old_samples: &[hyperactor_telemetry::TelemetrySample],
95+
unified_samples: &[hyperactor_telemetry::TelemetrySample],
96+
table_name: &str,
97+
) -> Result<()> {
98+
println!("\n[Comparing {} Scuba Samples]", table_name);
99+
println!(" Old samples: {}", old_samples.len());
100+
println!(" Unified samples: {}", unified_samples.len());
101+
102+
if old_samples.is_empty() && unified_samples.is_empty() {
103+
return Err(anyhow::anyhow!("No samples in either implementation"));
104+
}
105+
106+
if old_samples.len() != unified_samples.len() {
107+
return Err(anyhow::anyhow!(
108+
"Sample count mismatch: old={} unified={}",
109+
old_samples.len(),
110+
unified_samples.len()
111+
));
112+
}
113+
114+
for (i, (old, unified)) in old_samples.iter().zip(unified_samples.iter()).enumerate() {
115+
let old_json_str = serde_json::to_string(old)?;
116+
let old_parsed: serde_json::Value = serde_json::from_str(&old_json_str)?;
117+
118+
let unified_json_str = serde_json::to_string(unified)?;
119+
let unified_parsed: serde_json::Value = serde_json::from_str(&unified_json_str)?;
120+
121+
let all_fields: HashSet<String> = old_parsed["fields"]
122+
.as_array()
123+
.into_iter()
124+
.flatten()
125+
.chain(unified_parsed["fields"].as_array().into_iter().flatten())
126+
.filter_map(|field| field.as_array()?.first()?.as_str().map(|s| s.to_string()))
127+
.collect();
128+
129+
for field_name in all_fields {
130+
let old_str = old.get_string(&field_name);
131+
let unified_str = unified.get_string(&field_name);
132+
133+
match field_name.as_str() {
134+
"args" => {
135+
// Allow --old vs --unified difference only
136+
match (old_str, unified_str) {
137+
(Some(old_args), Some(unified_args)) => {
138+
let old_normalized = old_args.replace("--old", "--IMPL");
139+
let unified_normalized =
140+
unified_args.replace("--unified", "--IMPL");
141+
142+
if old_normalized != unified_normalized {
143+
return Err(anyhow::anyhow!(
144+
"Sample #{} field 'args' differs in more than just --old/--unified:\n old: {:?}\n unified: {:?}",
145+
i,
146+
old_args,
147+
unified_args
148+
));
149+
}
150+
}
151+
(None, None) => {}
152+
_ => {
153+
return Err(anyhow::anyhow!(
154+
"Sample #{} field 'args' present in only one sample:\n old: {:?}\n unified: {:?}",
155+
i,
156+
old_str,
157+
unified_str
158+
));
159+
}
160+
}
161+
}
162+
_ => {
163+
if old_str != unified_str {
164+
return Err(anyhow::anyhow!(
165+
"Sample #{} field '{}' mismatch:\n old: {:?}\n unified: {:?}",
166+
i,
167+
field_name,
168+
old_str,
169+
unified_str
170+
));
171+
}
172+
}
173+
}
174+
}
175+
}
176+
177+
println!(" ✓ All {} samples match!", old_samples.len());
178+
Ok(())
179+
}
86180

87181
fn find_glog_path() -> Option<PathBuf> {
88182
let username = whoami::username();
@@ -474,6 +568,48 @@ fn main() -> Result<()> {
474568
}
475569
}
476570

571+
let old_tracing = PathBuf::from(format!(
572+
"/tmp/{}/test_{}_old_scuba_tracing.json",
573+
username, test_name
574+
));
575+
let unified_tracing = PathBuf::from(format!(
576+
"/tmp/{}/test_{}_unified_scuba_tracing.json",
577+
username, test_name
578+
));
579+
580+
if !old_tracing.exists() || !unified_tracing.exists() {
581+
println!("\n⚠ Scuba tracing sample files not found, skipping comparison");
582+
if !old_tracing.exists() {
583+
println!(" Missing: {}", old_tracing.display());
584+
}
585+
if !unified_tracing.exists() {
586+
println!(" Missing: {}", unified_tracing.display());
587+
}
588+
all_passed = false;
589+
test_passed = false;
590+
} else {
591+
let old_samples_json = std::fs::read_to_string(&old_tracing)?;
592+
let unified_samples_json = std::fs::read_to_string(&unified_tracing)?;
593+
594+
let old_samples: Vec<TelemetrySample> = serde_json::from_str(&old_samples_json)?;
595+
let unified_samples: Vec<TelemetrySample> =
596+
serde_json::from_str(&unified_samples_json)?;
597+
598+
match harness.compare_scuba_samples(&old_samples, &unified_samples, "Tracing") {
599+
Ok(()) => {
600+
println!("\n✓ Scuba tracing samples match");
601+
}
602+
Err(e) => {
603+
println!("\n✗ Scuba tracing comparison FAILED: {}", e);
604+
all_passed = false;
605+
test_passed = false;
606+
}
607+
}
608+
609+
let _ = std::fs::remove_file(&old_tracing);
610+
let _ = std::fs::remove_file(&unified_tracing);
611+
}
612+
477613
if test_passed {
478614
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
479615
} else {
@@ -605,6 +741,14 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
605741
println!("Warning: No SQLite database path found");
606742
}
607743

744+
let tracing_path = format!(
745+
"/tmp/{}/test_{}_{}_scuba_tracing.json",
746+
username, test_name, impl_suffix
747+
);
748+
let tracing_json = serde_json::to_string_pretty(&results.scuba_tracing_samples)?;
749+
std::fs::write(&tracing_path, tracing_json)?;
750+
println!("Scuba tracing samples saved to: {}", tracing_path);
751+
608752
Ok(())
609753
}
610754

hyperactor_telemetry/benches/telemetry_benchmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ fn benchmark_no_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Durat
141141
}
142142

143143
fn benchmark_with_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Duration)> {
144-
initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None);
144+
initialize_logging_with_log_prefix_mock_scuba(DefaultTelemetryClock {}, None);
145145

146146
let _sqlite_tracing =
147147
hyperactor_telemetry::sqlite::SqliteTracing::new().expect("Failed to create SqliteTracing");

0 commit comments

Comments
 (0)