Skip to content

Commit 414b7fc

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Create Scuba Exporter (meta-pytorch#1931)
Summary: Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff we will create an `Exporter` meant to replace `otel::tracing_layer()` (which is really just an alias for scuba). We log to two different scuba tables: monarch_tracing and monarch_executions. We will test correctness by injecting a mock scuba client that simply appends all samples it intends to log, and ensure that both the old and the unified implementations produce the same samples Differential Revision: D87363772
1 parent f169068 commit 414b7fc

File tree

3 files changed

+399
-28
lines changed

3 files changed

+399
-28
lines changed

hyperactor_telemetry/benches/correctness_test.rs

Lines changed: 204 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//! verifies that the outputs are equivalent across all exporters:
1313
//! - Glog: Read log files and compare lines
1414
//! - SQLite: Query database and compare rows
15+
//! - Scuba: Mock client and compare logged samples
1516
//!
1617
//! Usage:
1718
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
@@ -26,6 +27,8 @@ struct TestResults {
2627
sqlite_path: Option<PathBuf>,
2728
#[allow(dead_code)]
2829
_sqlite_tracing: Option<hyperactor_telemetry::sqlite::SqliteTracing>,
30+
scuba_tracing_samples: Vec<TelemetrySample>,
31+
scuba_executions_samples: Vec<TelemetrySample>,
2932
}
3033

3134
/// Record from log_events table (timestamps excluded for comparison)
@@ -66,7 +69,7 @@ impl CorrectnessTestHarness {
6669
where
6770
F: Fn(),
6871
{
69-
initialize_logging_with_log_prefix(
72+
let test_handle = initialize_logging_with_log_prefix_mock_scuba(
7073
DefaultTelemetryClock {},
7174
Some("TEST_LOG_PREFIX".to_string()),
7275
);
@@ -106,12 +109,112 @@ impl CorrectnessTestHarness {
106109
}
107110
}
108111

112+
let scuba_tracing_samples = test_handle.get_tracing_samples();
113+
let scuba_executions_samples = test_handle.get_execution_samples();
114+
109115
Ok(TestResults {
110116
sqlite_path,
111117
glog_path: Self::find_glog_path(),
118+
scuba_tracing_samples,
119+
scuba_executions_samples,
112120
_sqlite_tracing: sqlite_tracing,
113121
})
114122
}
123+
fn compare_scuba_samples(
124+
&self,
125+
old_samples: &[hyperactor_telemetry::TelemetrySample],
126+
unified_samples: &[hyperactor_telemetry::TelemetrySample],
127+
table_name: &str,
128+
) -> Result<()> {
129+
use std::collections::BTreeMap;
130+
131+
println!("\n[Comparing {} Scuba Samples]", table_name);
132+
println!(" Old samples: {}", old_samples.len());
133+
println!(" Unified samples: {}", unified_samples.len());
134+
135+
if old_samples.is_empty() && unified_samples.is_empty() {
136+
println!(" SKIP: No samples in either implementation");
137+
return Ok(());
138+
}
139+
140+
if !old_samples.is_empty() {
141+
let mut by_type: BTreeMap<String, usize> = BTreeMap::new();
142+
for sample in old_samples {
143+
if let Some(event_type) = sample.get_string("event_type") {
144+
*by_type.entry(event_type.to_string()).or_insert(0) += 1;
145+
}
146+
}
147+
println!(" Old samples by event_type:");
148+
for (event_type, count) in by_type {
149+
println!(" {}: {}", event_type, count);
150+
}
151+
}
152+
153+
if old_samples.len() != unified_samples.len() {
154+
return Err(anyhow::anyhow!(
155+
"Sample count mismatch: old={} unified={}",
156+
old_samples.len(),
157+
unified_samples.len()
158+
));
159+
}
160+
161+
for (i, (old, unified)) in old_samples.iter().zip(unified_samples.iter()).enumerate() {
162+
let old_event_type = old.get_string("event_type");
163+
let unified_event_type = unified.get_string("event_type");
164+
if old_event_type != unified_event_type {
165+
return Err(anyhow::anyhow!(
166+
"Sample #{} event_type mismatch: old={:?} unified={:?}",
167+
i,
168+
old_event_type,
169+
unified_event_type
170+
));
171+
}
172+
173+
let old_name = old.get_string("name");
174+
let unified_name = unified.get_string("name");
175+
176+
let skip_name_comparison = old_name
177+
.map(|s| s.starts_with("event fbcode/"))
178+
.unwrap_or(false)
179+
&& unified_name
180+
.map(|s| s.starts_with("event fbcode/"))
181+
.unwrap_or(false);
182+
183+
if !skip_name_comparison && old_name != unified_name {
184+
return Err(anyhow::anyhow!(
185+
"Sample #{} name mismatch: old={:?} unified={:?}",
186+
i,
187+
old_name,
188+
unified_name
189+
));
190+
}
191+
192+
let old_level = old.get_string("level");
193+
let unified_level = unified.get_string("level");
194+
if old_level != unified_level {
195+
return Err(anyhow::anyhow!(
196+
"Sample #{} level mismatch: old={:?} unified={:?}",
197+
i,
198+
old_level,
199+
unified_level
200+
));
201+
}
202+
203+
let old_target = old.get_string("target");
204+
let unified_target = unified.get_string("target");
205+
if old_target != unified_target {
206+
return Err(anyhow::anyhow!(
207+
"Sample #{} target mismatch: old={:?} unified={:?}",
208+
i,
209+
old_target,
210+
unified_target
211+
));
212+
}
213+
}
214+
215+
println!(" ✓ All {} samples match!", old_samples.len());
216+
Ok(())
217+
}
115218

116219
fn find_glog_path() -> Option<PathBuf> {
117220
let username = whoami::username();
@@ -623,6 +726,90 @@ fn main() -> Result<()> {
623726
}
624727
}
625728

729+
let old_tracing = PathBuf::from(format!(
730+
"/tmp/{}/test_{}_old_scuba_tracing.json",
731+
username, test_name
732+
));
733+
let unified_tracing = PathBuf::from(format!(
734+
"/tmp/{}/test_{}_unified_scuba_tracing.json",
735+
username, test_name
736+
));
737+
738+
if !old_tracing.exists() || !unified_tracing.exists() {
739+
println!("\n⚠ Scuba tracing sample files not found, skipping comparison");
740+
if !old_tracing.exists() {
741+
println!(" Missing: {}", old_tracing.display());
742+
}
743+
if !unified_tracing.exists() {
744+
println!(" Missing: {}", unified_tracing.display());
745+
}
746+
all_passed = false;
747+
test_passed = false;
748+
} else {
749+
let old_samples_json = std::fs::read_to_string(&old_tracing)?;
750+
let unified_samples_json = std::fs::read_to_string(&unified_tracing)?;
751+
752+
let old_samples: Vec<TelemetrySample> = serde_json::from_str(&old_samples_json)?;
753+
let unified_samples: Vec<TelemetrySample> =
754+
serde_json::from_str(&unified_samples_json)?;
755+
756+
match harness.compare_scuba_samples(&old_samples, &unified_samples, "Tracing") {
757+
Ok(()) => {
758+
println!("\n✓ Scuba tracing samples match");
759+
}
760+
Err(e) => {
761+
println!("\n✗ Scuba tracing comparison FAILED: {}", e);
762+
all_passed = false;
763+
test_passed = false;
764+
}
765+
}
766+
767+
let _ = std::fs::remove_file(&old_tracing);
768+
let _ = std::fs::remove_file(&unified_tracing);
769+
}
770+
771+
let old_executions = PathBuf::from(format!(
772+
"/tmp/{}/test_{}_old_scuba_executions.json",
773+
username, test_name
774+
));
775+
let unified_executions = PathBuf::from(format!(
776+
"/tmp/{}/test_{}_unified_scuba_executions.json",
777+
username, test_name
778+
));
779+
780+
if !old_executions.exists() || !unified_executions.exists() {
781+
println!("\n⚠ Scuba executions sample files not found, skipping comparison");
782+
if !old_executions.exists() {
783+
println!(" Missing: {}", old_executions.display());
784+
}
785+
if !unified_executions.exists() {
786+
println!(" Missing: {}", unified_executions.display());
787+
}
788+
all_passed = false;
789+
test_passed = false;
790+
} else {
791+
let old_samples_json = std::fs::read_to_string(&old_executions)?;
792+
let unified_samples_json = std::fs::read_to_string(&unified_executions)?;
793+
794+
let old_samples: Vec<TelemetrySample> = serde_json::from_str(&old_samples_json)?;
795+
let unified_samples: Vec<TelemetrySample> =
796+
serde_json::from_str(&unified_samples_json)?;
797+
798+
match harness.compare_scuba_samples(&old_samples, &unified_samples, "Executions") {
799+
Ok(()) => {
800+
println!("\n✓ Scuba executions samples match");
801+
}
802+
Err(e) => {
803+
println!("\n✗ Scuba executions comparison FAILED: {}", e);
804+
all_passed = false;
805+
test_passed = false;
806+
}
807+
}
808+
809+
let _ = std::fs::remove_file(&old_executions);
810+
let _ = std::fs::remove_file(&unified_executions);
811+
}
812+
626813
if test_passed {
627814
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
628815
} else {
@@ -754,6 +941,22 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
754941
println!("Warning: No SQLite database path found");
755942
}
756943

944+
let tracing_path = format!(
945+
"/tmp/{}/test_{}_{}_scuba_tracing.json",
946+
username, test_name, impl_suffix
947+
);
948+
let tracing_json = serde_json::to_string_pretty(&results.scuba_tracing_samples)?;
949+
std::fs::write(&tracing_path, tracing_json)?;
950+
println!("Scuba tracing samples saved to: {}", tracing_path);
951+
952+
let executions_path = format!(
953+
"/tmp/{}/test_{}_{}_scuba_executions.json",
954+
username, test_name, impl_suffix
955+
);
956+
let executions_json = serde_json::to_string_pretty(&results.scuba_executions_samples)?;
957+
std::fs::write(&executions_path, executions_json)?;
958+
println!("Scuba executions samples saved to: {}", executions_path);
959+
757960
Ok(())
758961
}
759962

hyperactor_telemetry/benches/telemetry_benchmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ fn benchmark_no_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Durat
139139
}
140140

141141
fn benchmark_with_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Duration)> {
142-
initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None);
142+
initialize_logging_with_log_prefix_mock_scuba(DefaultTelemetryClock {}, None);
143143

144144
let _sqlite_tracing =
145145
hyperactor_telemetry::sqlite::SqliteTracing::new().expect("Failed to create SqliteTracing");

0 commit comments

Comments
 (0)