Skip to content

Commit 9126765

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Create TraceDispatcher (#1928)
Summary: Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results Differential Revision: D87363773
1 parent ceb43af commit 9126765

File tree

5 files changed

+951
-20
lines changed

5 files changed

+951
-20
lines changed

hyperactor_telemetry/Cargo.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry
1+
# @generated by autocargo from //monarch/hyperactor_telemetry:[correctness_test,hyperactor_telemetry,telemetry_benchmark]
22

33
[package]
44
name = "hyperactor_telemetry"
@@ -10,13 +10,23 @@ license = "BSD-3-Clause"
1010
[lib]
1111
edition = "2024"
1212

13+
[[bench]]
14+
name = "correctness_test"
15+
edition = "2024"
16+
17+
[[bench]]
18+
name = "telemetry_benchmark"
19+
edition = "2024"
20+
1321
[dependencies]
1422
anyhow = "1.0.98"
1523
chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false }
1624
dashmap = { version = "5.5.3", features = ["rayon", "serde"] }
1725
fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
1826
hdrhistogram = "7.5"
27+
indexmap = { version = "2.9.0", features = ["arbitrary", "rayon", "serde"] }
1928
lazy_static = "1.5"
29+
libc = "0.2.139"
2030
opentelemetry = "0.29"
2131
opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] }
2232
rand = { version = "0.8", features = ["small_rng"] }
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! Correctness test harness comparing old vs unified telemetry implementations.
10+
//!
11+
//! This test harness runs identical workloads through both implementations and
12+
//! verifies that the outputs are equivalent across all exporters
13+
//!
14+
//! Usage:
15+
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
16+
17+
#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock`
18+
19+
use anyhow::Result;
20+
use hyperactor_telemetry::*;
21+
22+
struct TestResults {}
23+
24+
struct CorrectnessTestHarness {}
25+
26+
impl CorrectnessTestHarness {
27+
fn run<F>(&self, workload: F) -> Result<TestResults>
28+
where
29+
F: Fn(),
30+
{
31+
initialize_logging_with_log_prefix(
32+
DefaultTelemetryClock {},
33+
Some("TEST_LOG_PREFIX".to_string()),
34+
);
35+
36+
workload();
37+
38+
std::thread::sleep(std::time::Duration::from_millis(300));
39+
40+
Ok(TestResults {})
41+
}
42+
}
43+
44+
// ============================================================================
45+
// Test Workloads
46+
// ============================================================================
47+
48+
fn workload_simple_info_events() {
49+
for i in 0..100 {
50+
tracing::info!(iteration = i, "simple info event");
51+
}
52+
}
53+
54+
fn workload_spans_with_fields() {
55+
for i in 0..50 {
56+
let _span = tracing::info_span!(
57+
"test_span",
58+
iteration = i,
59+
foo = 42,
60+
message_type = "Request"
61+
)
62+
.entered();
63+
}
64+
}
65+
66+
fn workload_nested_spans() {
67+
for i in 0..20 {
68+
let _outer = tracing::info_span!("outer", iteration = i).entered();
69+
{
70+
let _middle = tracing::info_span!("middle", level = 2).entered();
71+
{
72+
let _inner = tracing::info_span!("inner", level = 3).entered();
73+
tracing::info!("inside nested span");
74+
}
75+
}
76+
}
77+
}
78+
79+
fn workload_events_with_fields() {
80+
for i in 0..100 {
81+
tracing::info!(
82+
iteration = i,
83+
foo = 42,
84+
message_type = "Request",
85+
status = "ok",
86+
count = 100,
87+
"event with many fields"
88+
);
89+
}
90+
}
91+
92+
fn workload_mixed_log_levels() {
93+
for _ in 0..25 {
94+
tracing::trace!("trace event");
95+
tracing::debug!(value = 1, "debug event");
96+
tracing::info!(value = 2, "info event");
97+
tracing::warn!(value = 3, "warn event");
98+
tracing::error!(value = 4, "error event");
99+
}
100+
}
101+
102+
fn workload_events_in_spans() {
103+
for i in 0..30 {
104+
let _span = tracing::info_span!("outer_span", iteration = i).entered();
105+
tracing::info!(step = "start", "starting work");
106+
tracing::debug!(step = "middle", "doing work");
107+
tracing::info!(step = "end", "finished work");
108+
}
109+
}
110+
111+
fn main() -> Result<()> {
112+
let args: Vec<String> = std::env::args().collect();
113+
114+
// This script will spawn itself into this branch
115+
if args.len() > 2 {
116+
let test_name = &args[1];
117+
let impl_type = &args[2];
118+
return run_single_test(test_name, impl_type);
119+
}
120+
121+
println!("\n\nHyperactor Telemetry Correctness Test Suite");
122+
println!("Comparing OLD vs UNIFIED implementations\n");
123+
124+
let tests = vec![
125+
"simple_info_events",
126+
"spans_with_fields",
127+
"nested_spans",
128+
"events_with_fields",
129+
"mixed_log_levels",
130+
"events_in_spans",
131+
];
132+
133+
let mut all_passed = true;
134+
135+
for test_name in tests {
136+
println!("\n{}", "=".repeat(80));
137+
println!("Running test: {}", test_name_to_display(test_name));
138+
println!("{}", "=".repeat(80));
139+
140+
let mut test_passed = true;
141+
142+
println!("\n[Running OLD implementation...]");
143+
let old_status = std::process::Command::new(&args[0])
144+
.arg(test_name)
145+
.arg("--old")
146+
.env("TEST_LOG_PREFIX", "test")
147+
.status()?;
148+
149+
if !old_status.success() {
150+
println!("\n✗ OLD implementation FAILED");
151+
all_passed = false;
152+
test_passed = false;
153+
}
154+
155+
println!("\n[Running UNIFIED implementation...]");
156+
let unified_status = std::process::Command::new(&args[0])
157+
.arg(test_name)
158+
.arg("--unified")
159+
.env("TEST_LOG_PREFIX", "test")
160+
.status()?;
161+
162+
if !unified_status.success() {
163+
println!("\n✗ UNIFIED implementation FAILED");
164+
all_passed = false;
165+
test_passed = false;
166+
}
167+
168+
if test_passed {
169+
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
170+
} else {
171+
println!("\n✗ Test FAILED: {}", test_name_to_display(test_name));
172+
}
173+
}
174+
175+
println!("\n\n{}", "=".repeat(80));
176+
if all_passed {
177+
println!("All tests completed successfully!");
178+
} else {
179+
println!("Some tests FAILED!");
180+
return Err(anyhow::anyhow!("Test failures detected"));
181+
}
182+
println!("{}", "=".repeat(80));
183+
184+
Ok(())
185+
}
186+
187+
/// Called in child process
188+
fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
189+
let harness = CorrectnessTestHarness {};
190+
191+
let workload: fn() = match test_name {
192+
"simple_info_events" => workload_simple_info_events,
193+
"spans_with_fields" => workload_spans_with_fields,
194+
"nested_spans" => workload_nested_spans,
195+
"events_with_fields" => workload_events_with_fields,
196+
"mixed_log_levels" => workload_mixed_log_levels,
197+
"events_in_spans" => workload_events_in_spans,
198+
_ => {
199+
return Err(anyhow::anyhow!("Unknown test: {}", test_name));
200+
}
201+
};
202+
203+
let _results = match impl_type {
204+
"--old" => {
205+
println!("Running with OLD implementation...");
206+
harness.run(workload)?
207+
}
208+
"--unified" => {
209+
println!("Running with UNIFIED implementation...");
210+
// Set USE_UNIFIED_LAYER to use unified implementation
211+
// SAFETY: Setting before any telemetry initialization
212+
unsafe {
213+
std::env::set_var("USE_UNIFIED_LAYER", "1");
214+
}
215+
harness.run(workload)?
216+
}
217+
_ => {
218+
return Err(anyhow::anyhow!(
219+
"Unknown implementation type: {}",
220+
impl_type
221+
));
222+
}
223+
};
224+
225+
Ok(())
226+
}
227+
228+
fn test_name_to_display(test_name: &str) -> &str {
229+
match test_name {
230+
"simple_info_events" => "Simple info events",
231+
"spans_with_fields" => "Spans with fields",
232+
"nested_spans" => "Nested spans",
233+
"events_with_fields" => "Events with many fields",
234+
"mixed_log_levels" => "Mixed log levels",
235+
"events_in_spans" => "Events in spans",
236+
_ => test_name,
237+
}
238+
}

0 commit comments

Comments
 (0)