Skip to content

Commit 0657ba6

Browse files
thomasywangmeta-codesync[bot]
authored andcommitted
Create Scuba Exporter (#1931)
Summary: Pull Request resolved: #1931 Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff we will create an `Exporter` meant to replace `otel::tracing_layer()` (which is really just an alias for scuba). We log to two different scuba tables: monarch_tracing and monarch_executions. We will test correctness by injecting a mock scuba client that simply appends all samples it intends to log, and ensure that both the old and the unified implementations produce the same samples Reviewed By: mariusae Differential Revision: D87363772
1 parent b89d0ef commit 0657ba6

File tree

4 files changed

+298
-17
lines changed

4 files changed

+298
-17
lines changed

hyperactor_telemetry/benches/correctness_test.rs

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
//! verifies that the outputs are equivalent across all exporters:
1313
//! - Glog: Read log files and compare lines
1414
//! - SQLite: Query database and compare rows
15+
//! - Scuba: Mock client and compare logged samples
1516
//!
1617
//! Usage:
1718
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
1819
1920
#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock`
2021

22+
use std::collections::BTreeMap;
23+
use std::collections::HashSet;
2124
use std::path::PathBuf;
2225

2326
use anyhow::Result;
@@ -28,6 +31,7 @@ struct TestResults {
2831
sqlite_path: Option<PathBuf>,
2932
#[allow(dead_code)]
3033
_sqlite_tracing: Option<hyperactor_telemetry::sqlite::SqliteTracing>,
34+
scuba_tracing_samples: Vec<TelemetrySample>,
3135
}
3236

3337
struct CorrectnessTestHarness {}
@@ -37,7 +41,7 @@ impl CorrectnessTestHarness {
3741
where
3842
F: Fn(),
3943
{
40-
initialize_logging_with_log_prefix(
44+
let test_handle = initialize_logging_with_log_prefix_mock_scuba(
4145
DefaultTelemetryClock {},
4246
Some("TEST_LOG_PREFIX".to_string()),
4347
);
@@ -77,12 +81,117 @@ impl CorrectnessTestHarness {
7781
}
7882
}
7983

84+
let scuba_tracing_samples = test_handle.get_tracing_samples();
85+
8086
Ok(TestResults {
8187
sqlite_path,
8288
glog_path: Self::find_glog_path(),
89+
scuba_tracing_samples,
8390
_sqlite_tracing: sqlite_tracing,
8491
})
8592
}
93+
fn compare_scuba_samples(
94+
&self,
95+
old_samples: &[hyperactor_telemetry::TelemetrySample],
96+
unified_samples: &[hyperactor_telemetry::TelemetrySample],
97+
table_name: &str,
98+
) -> Result<()> {
99+
println!("\n[Comparing {} Scuba Samples]", table_name);
100+
println!(" Old samples: {}", old_samples.len());
101+
println!(" Unified samples: {}", unified_samples.len());
102+
103+
if old_samples.is_empty() && unified_samples.is_empty() {
104+
println!(" SKIP: No samples in either implementation");
105+
return Ok(());
106+
}
107+
108+
if !old_samples.is_empty() {
109+
let mut by_type: BTreeMap<String, usize> = BTreeMap::new();
110+
for sample in old_samples {
111+
if let Some(event_type) = sample.get_string("event_type") {
112+
*by_type.entry(event_type.to_string()).or_insert(0) += 1;
113+
}
114+
}
115+
println!(" Old samples by event_type:");
116+
for (event_type, count) in by_type {
117+
println!(" {}: {}", event_type, count);
118+
}
119+
}
120+
121+
if old_samples.len() != unified_samples.len() {
122+
return Err(anyhow::anyhow!(
123+
"Sample count mismatch: old={} unified={}",
124+
old_samples.len(),
125+
unified_samples.len()
126+
));
127+
}
128+
129+
for (i, (old, unified)) in old_samples.iter().zip(unified_samples.iter()).enumerate() {
130+
let old_json_str = serde_json::to_string(old)?;
131+
let old_parsed: serde_json::Value = serde_json::from_str(&old_json_str)?;
132+
133+
let unified_json_str = serde_json::to_string(unified)?;
134+
let unified_parsed: serde_json::Value = serde_json::from_str(&unified_json_str)?;
135+
136+
let all_fields: HashSet<String> = old_parsed["fields"]
137+
.as_array()
138+
.into_iter()
139+
.flatten()
140+
.chain(unified_parsed["fields"].as_array().into_iter().flatten())
141+
.filter_map(|field| field.as_array()?.first()?.as_str().map(|s| s.to_string()))
142+
.collect();
143+
144+
for field_name in all_fields {
145+
let old_str = old.get_string(&field_name);
146+
let unified_str = unified.get_string(&field_name);
147+
148+
match field_name.as_str() {
149+
"args" => {
150+
// Allow --old vs --unified difference only
151+
match (old_str, unified_str) {
152+
(Some(old_args), Some(unified_args)) => {
153+
let old_normalized = old_args.replace("--old", "--IMPL");
154+
let unified_normalized =
155+
unified_args.replace("--unified", "--IMPL");
156+
157+
if old_normalized != unified_normalized {
158+
return Err(anyhow::anyhow!(
159+
"Sample #{} field 'args' differs in more than just --old/--unified:\n old: {:?}\n unified: {:?}",
160+
i,
161+
old_args,
162+
unified_args
163+
));
164+
}
165+
}
166+
(None, None) => {}
167+
_ => {
168+
return Err(anyhow::anyhow!(
169+
"Sample #{} field 'args' present in only one sample:\n old: {:?}\n unified: {:?}",
170+
i,
171+
old_str,
172+
unified_str
173+
));
174+
}
175+
}
176+
}
177+
_ => {
178+
if old_str != unified_str {
179+
return Err(anyhow::anyhow!(
180+
"Sample #{} field '{}' mismatch:\n old: {:?}\n unified: {:?}",
181+
i,
182+
field_name,
183+
old_str,
184+
unified_str
185+
));
186+
}
187+
}
188+
}
189+
}
190+
}
191+
192+
println!(" ✓ All {} samples match!", old_samples.len());
193+
Ok(())
194+
}
86195

87196
fn find_glog_path() -> Option<PathBuf> {
88197
let username = whoami::username();
@@ -476,6 +585,48 @@ fn main() -> Result<()> {
476585
}
477586
}
478587

588+
let old_tracing = PathBuf::from(format!(
589+
"/tmp/{}/test_{}_old_scuba_tracing.json",
590+
username, test_name
591+
));
592+
let unified_tracing = PathBuf::from(format!(
593+
"/tmp/{}/test_{}_unified_scuba_tracing.json",
594+
username, test_name
595+
));
596+
597+
if !old_tracing.exists() || !unified_tracing.exists() {
598+
println!("\n⚠ Scuba tracing sample files not found, skipping comparison");
599+
if !old_tracing.exists() {
600+
println!(" Missing: {}", old_tracing.display());
601+
}
602+
if !unified_tracing.exists() {
603+
println!(" Missing: {}", unified_tracing.display());
604+
}
605+
all_passed = false;
606+
test_passed = false;
607+
} else {
608+
let old_samples_json = std::fs::read_to_string(&old_tracing)?;
609+
let unified_samples_json = std::fs::read_to_string(&unified_tracing)?;
610+
611+
let old_samples: Vec<TelemetrySample> = serde_json::from_str(&old_samples_json)?;
612+
let unified_samples: Vec<TelemetrySample> =
613+
serde_json::from_str(&unified_samples_json)?;
614+
615+
match harness.compare_scuba_samples(&old_samples, &unified_samples, "Tracing") {
616+
Ok(()) => {
617+
println!("\n✓ Scuba tracing samples match");
618+
}
619+
Err(e) => {
620+
println!("\n✗ Scuba tracing comparison FAILED: {}", e);
621+
all_passed = false;
622+
test_passed = false;
623+
}
624+
}
625+
626+
let _ = std::fs::remove_file(&old_tracing);
627+
let _ = std::fs::remove_file(&unified_tracing);
628+
}
629+
479630
if test_passed {
480631
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
481632
} else {
@@ -607,6 +758,14 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
607758
println!("Warning: No SQLite database path found");
608759
}
609760

761+
let tracing_path = format!(
762+
"/tmp/{}/test_{}_{}_scuba_tracing.json",
763+
username, test_name, impl_suffix
764+
);
765+
let tracing_json = serde_json::to_string_pretty(&results.scuba_tracing_samples)?;
766+
std::fs::write(&tracing_path, tracing_json)?;
767+
println!("Scuba tracing samples saved to: {}", tracing_path);
768+
610769
Ok(())
611770
}
612771

hyperactor_telemetry/benches/telemetry_benchmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ fn benchmark_no_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Durat
141141
}
142142

143143
fn benchmark_with_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Duration)> {
144-
initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None);
144+
initialize_logging_with_log_prefix_mock_scuba(DefaultTelemetryClock {}, None);
145145

146146
let _sqlite_tracing =
147147
hyperactor_telemetry::sqlite::SqliteTracing::new().expect("Failed to create SqliteTracing");

0 commit comments

Comments
 (0)