Skip to content

Commit 870fbbb

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Create UnifiedLayer (#1928)
Summary: Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results Differential Revision: D87363773
1 parent 2760b20 commit 870fbbb

File tree

5 files changed

+910
-20
lines changed

5 files changed

+910
-20
lines changed

hyperactor_telemetry/Cargo.toml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry
1+
# @generated by autocargo from //monarch/hyperactor_telemetry:[correctness_test,hyperactor_telemetry,telemetry_benchmark]
22

33
[package]
44
name = "hyperactor_telemetry"
@@ -10,13 +10,25 @@ license = "BSD-3-Clause"
1010
[lib]
1111
edition = "2024"
1212

13+
[[bench]]
14+
name = "correctness_test"
15+
edition = "2024"
16+
17+
[[bench]]
18+
name = "telemetry_benchmark"
19+
edition = "2024"
20+
1321
[dependencies]
1422
anyhow = "1.0.98"
1523
chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false }
24+
crossbeam-channel = "0.5.15"
1625
dashmap = { version = "5.5.3", features = ["rayon", "serde"] }
1726
fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
1827
hdrhistogram = "7.5"
28+
humantime = "2.1"
29+
indexmap = { version = "2.9.0", features = ["arbitrary", "rayon", "serde"] }
1930
lazy_static = "1.5"
31+
libc = "0.2.139"
2032
opentelemetry = "0.29"
2133
opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] }
2234
rand = { version = "0.8", features = ["small_rng"] }
@@ -37,6 +49,7 @@ whoami = "1.5"
3749
[dev-dependencies]
3850
quickcheck = "1.0"
3951
quickcheck_macros = "1.0"
52+
tempfile = "3.22"
4053
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
4154

4255
[lints]
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! Correctness test harness comparing old vs unified telemetry implementations.
10+
//!
11+
//! This test harness runs identical workloads through both implementations and
12+
//! verifies that the outputs are equivalent across all exporters
13+
//!
14+
//! Usage:
15+
//! buck2 run //monarch/hyperactor_telemetry:correctness_test
16+
17+
use anyhow::Result;
18+
use hyperactor_telemetry::*;
19+
20+
struct TestResults {}
21+
22+
struct CorrectnessTestHarness {}
23+
24+
impl CorrectnessTestHarness {
25+
fn run<F>(&self, workload: F) -> Result<TestResults>
26+
where
27+
F: Fn(),
28+
{
29+
initialize_logging_with_log_prefix(
30+
DefaultTelemetryClock {},
31+
Some("TEST_LOG_PREFIX".to_string()),
32+
);
33+
34+
workload();
35+
36+
std::thread::sleep(std::time::Duration::from_millis(300));
37+
38+
Ok(TestResults {})
39+
}
40+
}
41+
42+
// ============================================================================
43+
// Test Workloads
44+
// ============================================================================
45+
46+
fn workload_simple_info_events() {
47+
for i in 0..100 {
48+
tracing::info!(iteration = i, "simple info event");
49+
}
50+
}
51+
52+
fn workload_spans_with_fields() {
53+
for i in 0..50 {
54+
let _span = tracing::info_span!(
55+
"test_span",
56+
iteration = i,
57+
actor_id = 42,
58+
message_type = "Request"
59+
)
60+
.entered();
61+
}
62+
}
63+
64+
fn workload_nested_spans() {
65+
for i in 0..20 {
66+
let _outer = tracing::info_span!("outer", iteration = i).entered();
67+
{
68+
let _middle = tracing::info_span!("middle", level = 2).entered();
69+
{
70+
let _inner = tracing::info_span!("inner", level = 3).entered();
71+
tracing::info!("inside nested span");
72+
}
73+
}
74+
}
75+
}
76+
77+
fn workload_events_with_fields() {
78+
for i in 0..100 {
79+
tracing::info!(
80+
iteration = i,
81+
actor_id = 42,
82+
message_type = "Request",
83+
status = "ok",
84+
count = 100,
85+
"event with many fields"
86+
);
87+
}
88+
}
89+
90+
fn workload_mixed_log_levels() {
91+
for _ in 0..25 {
92+
tracing::trace!("trace event");
93+
tracing::debug!(value = 1, "debug event");
94+
tracing::info!(value = 2, "info event");
95+
tracing::warn!(value = 3, "warn event");
96+
tracing::error!(value = 4, "error event");
97+
}
98+
}
99+
100+
fn workload_events_in_spans() {
101+
for i in 0..30 {
102+
let _span = tracing::info_span!("outer_span", iteration = i).entered();
103+
tracing::info!(step = "start", "starting work");
104+
tracing::debug!(step = "middle", "doing work");
105+
tracing::info!(step = "end", "finished work");
106+
}
107+
}
108+
109+
fn main() -> Result<()> {
110+
let args: Vec<String> = std::env::args().collect();
111+
112+
// This script will spawn itself into this branch
113+
if args.len() > 2 {
114+
let test_name = &args[1];
115+
let impl_type = &args[2];
116+
return run_single_test(test_name, impl_type);
117+
}
118+
119+
println!("\n\nHyperactor Telemetry Correctness Test Suite");
120+
println!("Comparing OLD vs UNIFIED implementations\n");
121+
122+
let tests = vec![
123+
"simple_info_events",
124+
"spans_with_fields",
125+
"nested_spans",
126+
"events_with_fields",
127+
"mixed_log_levels",
128+
"events_in_spans",
129+
];
130+
131+
let mut all_passed = true;
132+
133+
for test_name in tests {
134+
println!("\n{}", "=".repeat(80));
135+
println!("Running test: {}", test_name_to_display(test_name));
136+
println!("{}", "=".repeat(80));
137+
138+
let mut test_passed = true;
139+
140+
println!("\n[Running OLD implementation...]");
141+
let old_status = std::process::Command::new(&args[0])
142+
.arg(test_name)
143+
.arg("--old")
144+
.env("TEST_LOG_PREFIX", "test")
145+
.status()?;
146+
147+
if !old_status.success() {
148+
println!("\n✗ OLD implementation FAILED");
149+
all_passed = false;
150+
test_passed = false;
151+
continue;
152+
}
153+
154+
println!("\n[Running UNIFIED implementation...]");
155+
let unified_status = std::process::Command::new(&args[0])
156+
.arg(test_name)
157+
.arg("--unified")
158+
.env("TEST_LOG_PREFIX", "test")
159+
.status()?;
160+
161+
if !unified_status.success() {
162+
println!("\n✗ UNIFIED implementation FAILED");
163+
all_passed = false;
164+
test_passed = false;
165+
continue;
166+
}
167+
168+
if test_passed {
169+
println!("\n✓ Test PASSED: {}", test_name_to_display(test_name));
170+
} else {
171+
println!("\n✗ Test FAILED: {}", test_name_to_display(test_name));
172+
}
173+
}
174+
175+
println!("\n\n{}", "=".repeat(80));
176+
if all_passed {
177+
println!("All tests completed successfully!");
178+
} else {
179+
println!("Some tests FAILED!");
180+
return Err(anyhow::anyhow!("Test failures detected"));
181+
}
182+
println!("{}", "=".repeat(80));
183+
184+
Ok(())
185+
}
186+
187+
/// Called in child process
188+
fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> {
189+
let harness = CorrectnessTestHarness {};
190+
191+
let workload: fn() = match test_name {
192+
"simple_info_events" => workload_simple_info_events,
193+
"spans_with_fields" => workload_spans_with_fields,
194+
"nested_spans" => workload_nested_spans,
195+
"events_with_fields" => workload_events_with_fields,
196+
"mixed_log_levels" => workload_mixed_log_levels,
197+
"events_in_spans" => workload_events_in_spans,
198+
_ => {
199+
return Err(anyhow::anyhow!("Unknown test: {}", test_name));
200+
}
201+
};
202+
203+
let results = match impl_type {
204+
"--old" => {
205+
println!("Running with OLD implementation...");
206+
harness.run(workload)?
207+
}
208+
"--unified" => {
209+
println!("Running with UNIFIED implementation...");
210+
// Set USE_UNIFIED_LAYER to use unified implementation
211+
// SAFETY: Setting before any telemetry initialization
212+
unsafe {
213+
std::env::set_var("USE_UNIFIED_LAYER", "1");
214+
}
215+
harness.run(workload)?
216+
}
217+
_ => {
218+
return Err(anyhow::anyhow!(
219+
"Unknown implementation type: {}",
220+
impl_type
221+
));
222+
}
223+
};
224+
225+
Ok(())
226+
}
227+
228+
fn test_name_to_display(test_name: &str) -> &str {
229+
match test_name {
230+
"simple_info_events" => "Simple info events",
231+
"spans_with_fields" => "Spans with fields",
232+
"nested_spans" => "Nested spans",
233+
"events_with_fields" => "Events with many fields",
234+
"mixed_log_levels" => "Mixed log levels",
235+
"events_in_spans" => "Events in spans",
236+
_ => test_name,
237+
}
238+
}

0 commit comments

Comments
 (0)