1212//! verifies that the outputs are equivalent across all exporters:
1313//! - Glog: Read log files and compare lines
1414//! - SQLite: Query database and compare rows
15+ //! - Scuba: Mock client and compare logged samples
1516//!
1617//! Usage:
1718//! buck2 run //monarch/hyperactor_telemetry:correctness_test
@@ -26,6 +27,8 @@ struct TestResults {
2627 sqlite_path : Option < PathBuf > ,
2728 #[ allow( dead_code) ]
2829 _sqlite_tracing : Option < hyperactor_telemetry:: sqlite:: SqliteTracing > ,
30+ scuba_tracing_samples : Vec < TelemetrySample > ,
31+ scuba_executions_samples : Vec < TelemetrySample > ,
2932}
3033
3134/// Record from log_events table (timestamps excluded for comparison)
@@ -66,7 +69,7 @@ impl CorrectnessTestHarness {
6669 where
6770 F : Fn ( ) ,
6871 {
69- initialize_logging_with_log_prefix (
72+ let test_handle = initialize_logging_with_log_prefix_mock_scuba (
7073 DefaultTelemetryClock { } ,
7174 Some ( "TEST_LOG_PREFIX" . to_string ( ) ) ,
7275 ) ;
@@ -106,12 +109,112 @@ impl CorrectnessTestHarness {
106109 }
107110 }
108111
112+ let scuba_tracing_samples = test_handle. get_tracing_samples ( ) ;
113+ let scuba_executions_samples = test_handle. get_execution_samples ( ) ;
114+
109115 Ok ( TestResults {
110116 sqlite_path,
111117 glog_path : Self :: find_glog_path ( ) ,
118+ scuba_tracing_samples,
119+ scuba_executions_samples,
112120 _sqlite_tracing : sqlite_tracing,
113121 } )
114122 }
123+ fn compare_scuba_samples (
124+ & self ,
125+ old_samples : & [ hyperactor_telemetry:: TelemetrySample ] ,
126+ unified_samples : & [ hyperactor_telemetry:: TelemetrySample ] ,
127+ table_name : & str ,
128+ ) -> Result < ( ) > {
129+ use std:: collections:: BTreeMap ;
130+
131+ println ! ( "\n [Comparing {} Scuba Samples]" , table_name) ;
132+ println ! ( " Old samples: {}" , old_samples. len( ) ) ;
133+ println ! ( " Unified samples: {}" , unified_samples. len( ) ) ;
134+
135+ if old_samples. is_empty ( ) && unified_samples. is_empty ( ) {
136+ println ! ( " SKIP: No samples in either implementation" ) ;
137+ return Ok ( ( ) ) ;
138+ }
139+
140+ if !old_samples. is_empty ( ) {
141+ let mut by_type: BTreeMap < String , usize > = BTreeMap :: new ( ) ;
142+ for sample in old_samples {
143+ if let Some ( event_type) = sample. get_string ( "event_type" ) {
144+ * by_type. entry ( event_type. to_string ( ) ) . or_insert ( 0 ) += 1 ;
145+ }
146+ }
147+ println ! ( " Old samples by event_type:" ) ;
148+ for ( event_type, count) in by_type {
149+ println ! ( " {}: {}" , event_type, count) ;
150+ }
151+ }
152+
153+ if old_samples. len ( ) != unified_samples. len ( ) {
154+ return Err ( anyhow:: anyhow!(
155+ "Sample count mismatch: old={} unified={}" ,
156+ old_samples. len( ) ,
157+ unified_samples. len( )
158+ ) ) ;
159+ }
160+
161+ for ( i, ( old, unified) ) in old_samples. iter ( ) . zip ( unified_samples. iter ( ) ) . enumerate ( ) {
162+ let old_event_type = old. get_string ( "event_type" ) ;
163+ let unified_event_type = unified. get_string ( "event_type" ) ;
164+ if old_event_type != unified_event_type {
165+ return Err ( anyhow:: anyhow!(
166+ "Sample #{} event_type mismatch: old={:?} unified={:?}" ,
167+ i,
168+ old_event_type,
169+ unified_event_type
170+ ) ) ;
171+ }
172+
173+ let old_name = old. get_string ( "name" ) ;
174+ let unified_name = unified. get_string ( "name" ) ;
175+
176+ let skip_name_comparison = old_name
177+ . map ( |s| s. starts_with ( "event fbcode/" ) )
178+ . unwrap_or ( false )
179+ && unified_name
180+ . map ( |s| s. starts_with ( "event fbcode/" ) )
181+ . unwrap_or ( false ) ;
182+
183+ if !skip_name_comparison && old_name != unified_name {
184+ return Err ( anyhow:: anyhow!(
185+ "Sample #{} name mismatch: old={:?} unified={:?}" ,
186+ i,
187+ old_name,
188+ unified_name
189+ ) ) ;
190+ }
191+
192+ let old_level = old. get_string ( "level" ) ;
193+ let unified_level = unified. get_string ( "level" ) ;
194+ if old_level != unified_level {
195+ return Err ( anyhow:: anyhow!(
196+ "Sample #{} level mismatch: old={:?} unified={:?}" ,
197+ i,
198+ old_level,
199+ unified_level
200+ ) ) ;
201+ }
202+
203+ let old_target = old. get_string ( "target" ) ;
204+ let unified_target = unified. get_string ( "target" ) ;
205+ if old_target != unified_target {
206+ return Err ( anyhow:: anyhow!(
207+ "Sample #{} target mismatch: old={:?} unified={:?}" ,
208+ i,
209+ old_target,
210+ unified_target
211+ ) ) ;
212+ }
213+ }
214+
215+ println ! ( " ✓ All {} samples match!" , old_samples. len( ) ) ;
216+ Ok ( ( ) )
217+ }
115218
116219 fn find_glog_path ( ) -> Option < PathBuf > {
117220 let username = whoami:: username ( ) ;
@@ -623,6 +726,90 @@ fn main() -> Result<()> {
623726 }
624727 }
625728
729+ let old_tracing = PathBuf :: from ( format ! (
730+ "/tmp/{}/test_{}_old_scuba_tracing.json" ,
731+ username, test_name
732+ ) ) ;
733+ let unified_tracing = PathBuf :: from ( format ! (
734+ "/tmp/{}/test_{}_unified_scuba_tracing.json" ,
735+ username, test_name
736+ ) ) ;
737+
738+ if !old_tracing. exists ( ) || !unified_tracing. exists ( ) {
739+ println ! ( "\n ⚠ Scuba tracing sample files not found, skipping comparison" ) ;
740+ if !old_tracing. exists ( ) {
741+ println ! ( " Missing: {}" , old_tracing. display( ) ) ;
742+ }
743+ if !unified_tracing. exists ( ) {
744+ println ! ( " Missing: {}" , unified_tracing. display( ) ) ;
745+ }
746+ all_passed = false ;
747+ test_passed = false ;
748+ } else {
749+ let old_samples_json = std:: fs:: read_to_string ( & old_tracing) ?;
750+ let unified_samples_json = std:: fs:: read_to_string ( & unified_tracing) ?;
751+
752+ let old_samples: Vec < TelemetrySample > = serde_json:: from_str ( & old_samples_json) ?;
753+ let unified_samples: Vec < TelemetrySample > =
754+ serde_json:: from_str ( & unified_samples_json) ?;
755+
756+ match harness. compare_scuba_samples ( & old_samples, & unified_samples, "Tracing" ) {
757+ Ok ( ( ) ) => {
758+ println ! ( "\n ✓ Scuba tracing samples match" ) ;
759+ }
760+ Err ( e) => {
761+ println ! ( "\n ✗ Scuba tracing comparison FAILED: {}" , e) ;
762+ all_passed = false ;
763+ test_passed = false ;
764+ }
765+ }
766+
767+ let _ = std:: fs:: remove_file ( & old_tracing) ;
768+ let _ = std:: fs:: remove_file ( & unified_tracing) ;
769+ }
770+
771+ let old_executions = PathBuf :: from ( format ! (
772+ "/tmp/{}/test_{}_old_scuba_executions.json" ,
773+ username, test_name
774+ ) ) ;
775+ let unified_executions = PathBuf :: from ( format ! (
776+ "/tmp/{}/test_{}_unified_scuba_executions.json" ,
777+ username, test_name
778+ ) ) ;
779+
780+ if !old_executions. exists ( ) || !unified_executions. exists ( ) {
781+ println ! ( "\n ⚠ Scuba executions sample files not found, skipping comparison" ) ;
782+ if !old_executions. exists ( ) {
783+ println ! ( " Missing: {}" , old_executions. display( ) ) ;
784+ }
785+ if !unified_executions. exists ( ) {
786+ println ! ( " Missing: {}" , unified_executions. display( ) ) ;
787+ }
788+ all_passed = false ;
789+ test_passed = false ;
790+ } else {
791+ let old_samples_json = std:: fs:: read_to_string ( & old_executions) ?;
792+ let unified_samples_json = std:: fs:: read_to_string ( & unified_executions) ?;
793+
794+ let old_samples: Vec < TelemetrySample > = serde_json:: from_str ( & old_samples_json) ?;
795+ let unified_samples: Vec < TelemetrySample > =
796+ serde_json:: from_str ( & unified_samples_json) ?;
797+
798+ match harness. compare_scuba_samples ( & old_samples, & unified_samples, "Executions" ) {
799+ Ok ( ( ) ) => {
800+ println ! ( "\n ✓ Scuba executions samples match" ) ;
801+ }
802+ Err ( e) => {
803+ println ! ( "\n ✗ Scuba executions comparison FAILED: {}" , e) ;
804+ all_passed = false ;
805+ test_passed = false ;
806+ }
807+ }
808+
809+ let _ = std:: fs:: remove_file ( & old_executions) ;
810+ let _ = std:: fs:: remove_file ( & unified_executions) ;
811+ }
812+
626813 if test_passed {
627814 println ! ( "\n ✓ Test PASSED: {}" , test_name_to_display( test_name) ) ;
628815 } else {
@@ -754,6 +941,22 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
754941 println ! ( "Warning: No SQLite database path found" ) ;
755942 }
756943
944+ let tracing_path = format ! (
945+ "/tmp/{}/test_{}_{}_scuba_tracing.json" ,
946+ username, test_name, impl_suffix
947+ ) ;
948+ let tracing_json = serde_json:: to_string_pretty ( & results. scuba_tracing_samples ) ?;
949+ std:: fs:: write ( & tracing_path, tracing_json) ?;
950+ println ! ( "Scuba tracing samples saved to: {}" , tracing_path) ;
951+
952+ let executions_path = format ! (
953+ "/tmp/{}/test_{}_{}_scuba_executions.json" ,
954+ username, test_name, impl_suffix
955+ ) ;
956+ let executions_json = serde_json:: to_string_pretty ( & results. scuba_executions_samples ) ?;
957+ std:: fs:: write ( & executions_path, executions_json) ?;
958+ println ! ( "Scuba executions samples saved to: {}" , executions_path) ;
959+
757960 Ok ( ( ) )
758961}
759962
0 commit comments