Skip to content

Commit bc7dc5f

Browse files
Nakshatra SharmaNakshatra Sharma
authored andcommitted
Add OTLP tracing support for daemon and coordinator
Enables Jaeger tracing when DORA_OTLP_ENDPOINT is set. Key changes: - Add service name to OTLP resource configuration - Register global tracer provider for span export - Store OtelGuard to prevent premature shutdown - Initialize OTLP within tokio runtime context - Move TracingBuilder import to top-level with proper feature gate Fixes #1313
1 parent 6f5269b commit bc7dc5f

File tree

4 files changed

+102
-17
lines changed

4 files changed

+102
-17
lines changed

binaries/cli/src/command/coordinator.rs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,66 @@ pub struct Coordinator {
3333

3434
impl Executable for Coordinator {
3535
fn execute(self) -> eyre::Result<()> {
36+
let enable_otlp = std::env::var("DORA_OTLP_ENDPOINT").is_ok()
37+
|| std::env::var("DORA_JAEGER_TRACING").is_ok();
38+
3639
#[cfg(feature = "tracing")]
3740
{
38-
let name = "dora-coordinator";
39-
let mut builder = TracingBuilder::new(name);
40-
if !self.quiet {
41-
builder = builder.with_stdout("info", false);
41+
// Only initialize basic tracing if OTLP is NOT requested
42+
// If OTLP is requested, we'll initialize everything inside the runtime
43+
if !enable_otlp {
44+
let name = "dora-coordinator";
45+
let mut builder = TracingBuilder::new(name);
46+
if !self.quiet {
47+
builder = builder.with_stdout("info", false);
48+
}
49+
builder = builder.with_file(name, LevelFilter::INFO)?;
50+
builder
51+
.build()
52+
.wrap_err("failed to set up tracing subscriber")?;
4253
}
43-
builder = builder.with_file(name, LevelFilter::INFO)?;
44-
builder
45-
.build()
46-
.wrap_err("failed to set up tracing subscriber")?;
4754
}
4855

4956
let rt = Builder::new_multi_thread()
5057
.enable_all()
5158
.build()
5259
.context("tokio runtime failed")?;
53-
rt.block_on(async {
60+
61+
let quiet = self.quiet;
62+
63+
rt.block_on(async move {
64+
#[cfg(feature = "tracing")]
65+
let _otel_guard = if enable_otlp {
66+
let name = "dora-coordinator";
67+
let mut builder = TracingBuilder::new(name);
68+
69+
builder = builder
70+
.with_otlp_tracing()
71+
.wrap_err("failed to set up OTLP tracing")?;
72+
73+
if !quiet {
74+
builder = builder.with_stdout("info", false);
75+
}
76+
builder = builder.with_file(name, LevelFilter::INFO)?;
77+
78+
let guard = builder.guard.take();
79+
80+
builder
81+
.build()
82+
.wrap_err("failed to set up tracing subscriber with OTLP")?;
83+
84+
tracing::info!("OTLP tracing enabled for coordinator");
85+
guard
86+
} else {
87+
None
88+
};
89+
5490
let bind = SocketAddr::new(self.interface, self.port);
5591
let bind_control = SocketAddr::new(self.control_interface, self.control_port);
5692
let (port, task) =
5793
dora_coordinator::start(bind, bind_control, futures::stream::empty::<Event>())
5894
.await?;
59-
if !self.quiet {
95+
if !quiet {
6096
println!("Listening for incoming daemon connection on {port}");
6197
}
6298
task.await

binaries/cli/src/command/daemon.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ use std::{
1313
use tokio::runtime::Builder;
1414
use tracing::level_filters::LevelFilter;
1515

16+
#[cfg(feature = "tracing")]
17+
use dora_tracing::TracingBuilder;
18+
1619
#[derive(Debug, clap::Args)]
1720
/// Run daemon
1821
pub struct Daemon {
@@ -37,13 +40,16 @@ pub struct Daemon {
3740

3841
impl Executable for Daemon {
3942
fn execute(self) -> eyre::Result<()> {
43+
let enable_otlp = std::env::var("DORA_OTLP_ENDPOINT").is_ok()
44+
|| std::env::var("DORA_JAEGER_TRACING").is_ok();
45+
4046
let rt = Builder::new_multi_thread()
4147
.enable_all()
4248
.build()
4349
.context("tokio runtime failed")?;
4450

4551
#[cfg(feature = "tracing")]
46-
let _guard = {
52+
let _guard = if !enable_otlp {
4753
let _enter = rt.enter();
4854

4955
let name = "dora-daemon";
@@ -67,8 +73,46 @@ impl Executable for Daemon {
6773
LevelFilter::INFO,
6874
)
6975
.context("failed to initialize tracing")?
76+
} else {
77+
None
7078
};
71-
rt.block_on(async {
79+
80+
let machine_id = self.machine_id.clone();
81+
let quiet = self.quiet;
82+
83+
rt.block_on(async move {
84+
#[cfg(feature = "tracing")]
85+
let _otel_guard = if enable_otlp {
86+
87+
let name = "dora-daemon";
88+
let filename = machine_id
89+
.as_ref()
90+
.map(|id| format!("{name}-{id}"))
91+
.unwrap_or(name.to_string());
92+
let mut builder = TracingBuilder::new(name);
93+
94+
builder = builder
95+
.with_otlp_tracing()
96+
.wrap_err("failed to set up OTLP tracing")?;
97+
98+
if !quiet {
99+
builder = builder.with_stdout("info,zenoh=warn", false);
100+
}
101+
builder = builder.with_file(filename.clone(), LevelFilter::INFO)?;
102+
103+
104+
let guard = builder.guard.take();
105+
106+
builder
107+
.build()
108+
.wrap_err("failed to set up tracing subscriber with OTLP")?;
109+
110+
tracing::info!("OTLP tracing enabled for daemon");
111+
guard
112+
} else {
113+
None
114+
};
115+
72116
match self.run_dataflow {
73117
Some(dataflow_path) => {
74118
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
@@ -88,7 +132,7 @@ impl Executable for Daemon {
88132
handle_dataflow_result(result, None)
89133
}
90134
None => {
91-
dora_daemon::Daemon::run(SocketAddr::new(self.coordinator_addr, self.coordinator_port), self.machine_id, self.local_listen_port).await
135+
dora_daemon::Daemon::run(SocketAddr::new(self.coordinator_addr, self.coordinator_port), machine_id, self.local_listen_port).await
92136
}
93137
}
94138
})

libraries/extensions/telemetry/tracing/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,14 @@ impl TracingBuilder {
131131
.or_else(|_| std::env::var("DORA_JAEGER_TRACING"))
132132
.wrap_err("DORA_OTLP_ENDPOINT or DORA_JAEGER_TRACING environment variable not set")?;
133133

134-
// Initialize OTLP tracing - this returns a tracer and sets the global provider
135134
let sdk_tracer_provider = crate::telemetry::init_tracing(&self.name, &endpoint);
135+
136+
opentelemetry::global::set_tracer_provider(sdk_tracer_provider.clone());
137+
136138
let meter_provider = metrics::init_meter_provider();
137139

138140
// TODO: Maybe this needs to be removed in favor of application level global.
139141
// global::set_meter_provider(meter_provider.clone());
140-
// Use the specific tracer instance returned from init_tracing
141142
let tracer = sdk_tracer_provider.tracer("tracing-otel-subscriber");
142143

143144
let guard = OtelGuard {

libraries/extensions/telemetry/tracing/src/telemetry.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,19 @@ impl Extractor for MetadataMap<'_> {
3939
/// docker run -d -p 4317:4317 -p 4318:4318 -p 16686:16686 jaegertracing/all-in-one:latest
4040
/// ```
4141
///
42-
pub fn init_tracing(_name: &str, endpoint: &str) -> sdktrace::SdkTracerProvider {
42+
pub fn init_tracing(name: &str, endpoint: &str) -> sdktrace::SdkTracerProvider {
4343
let exporter = opentelemetry_otlp::SpanExporter::builder()
4444
.with_tonic()
4545
.with_endpoint(endpoint)
4646
.build()
4747
.unwrap();
4848

49+
let resource = opentelemetry_sdk::Resource::builder()
50+
.with_service_name(name.to_string())
51+
.build();
52+
4953
SdkTracerProvider::builder()
50-
// Customize sampling strategy
54+
.with_resource(resource)
5155
.with_batch_exporter(exporter)
5256
.build()
5357
}

0 commit comments

Comments
 (0)