Skip to content

Commit d6e010f

Browse files
Nakshatra SharmaNakshatra Sharma
authored andcommitted
Add OTLP tracing support for daemon and coordinator
Enables Jaeger tracing when DORA_OTLP_ENDPOINT is set. Key changes: - Add service name to OTLP resource configuration - Register global tracer provider for span export - Store OtelGuard to prevent premature shutdown - Initialize OTLP within tokio runtime context Fixes #1313
1 parent d15730e commit d6e010f

File tree

5 files changed

+115
-28
lines changed

5 files changed

+115
-28
lines changed

binaries/cli/src/command/coordinator.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,68 @@ pub struct Coordinator {
3333

3434
impl Executable for Coordinator {
3535
fn execute(self) -> eyre::Result<()> {
36+
let enable_otlp = std::env::var("DORA_OTLP_ENDPOINT").is_ok()
37+
|| std::env::var("DORA_JAEGER_TRACING").is_ok();
38+
3639
#[cfg(feature = "tracing")]
3740
{
38-
let name = "dora-coordinator";
39-
let mut builder = TracingBuilder::new(name);
40-
if !self.quiet {
41-
builder = builder.with_stdout("info", false);
41+
// Only initialize basic tracing if OTLP is NOT requested
42+
// If OTLP is requested, we'll initialize everything inside the runtime
43+
if !enable_otlp {
44+
let name = "dora-coordinator";
45+
let mut builder = TracingBuilder::new(name);
46+
if !self.quiet {
47+
builder = builder.with_stdout("info", false);
48+
}
49+
builder = builder.with_file(name, LevelFilter::INFO)?;
50+
builder
51+
.build()
52+
.wrap_err("failed to set up tracing subscriber")?;
4253
}
43-
builder = builder.with_file(name, LevelFilter::INFO)?;
44-
builder
45-
.build()
46-
.wrap_err("failed to set up tracing subscriber")?;
4754
}
4855

4956
let rt = Builder::new_multi_thread()
5057
.enable_all()
5158
.build()
5259
.context("tokio runtime failed")?;
53-
rt.block_on(async {
60+
61+
let quiet = self.quiet;
62+
63+
rt.block_on(async move {
64+
65+
#[cfg(feature = "tracing")]
66+
let _otel_guard = if enable_otlp {
67+
let name = "dora-coordinator";
68+
let mut builder = TracingBuilder::new(name);
69+
70+
builder = builder
71+
.with_otlp_tracing()
72+
.wrap_err("failed to set up OTLP tracing")?;
73+
74+
if !quiet {
75+
builder = builder.with_stdout("info", false);
76+
}
77+
builder = builder.with_file(name, LevelFilter::INFO)?;
78+
79+
80+
let guard = builder.guard.take();
81+
82+
builder
83+
.build()
84+
.wrap_err("failed to set up tracing subscriber with OTLP")?;
85+
86+
tracing::info!("OTLP tracing enabled for coordinator");
87+
guard
88+
} else {
89+
None
90+
};
91+
5492
let bind = SocketAddr::new(self.interface, self.port);
5593
let bind_control = SocketAddr::new(self.control_interface, self.control_port);
5694
let (port, task) =
5795
dora_coordinator::start(bind, bind_control, futures::stream::empty::<Event>())
5896
.await?;
59-
if !self.quiet {
97+
if !quiet {
6098
println!("Listening for incoming daemon connection on {port}");
6199
}
62100
task.await

binaries/cli/src/command/daemon.rs

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,29 +40,72 @@ pub struct Daemon {
4040

4141
impl Executable for Daemon {
4242
fn execute(self) -> eyre::Result<()> {
43+
let enable_otlp = std::env::var("DORA_OTLP_ENDPOINT").is_ok()
44+
|| std::env::var("DORA_JAEGER_TRACING").is_ok();
45+
4346
#[cfg(feature = "tracing")]
4447
{
45-
let name = "dora-daemon";
46-
let filename = self
47-
.machine_id
48-
.as_ref()
49-
.map(|id| format!("{name}-{id}"))
50-
.unwrap_or(name.to_string());
51-
let mut builder = TracingBuilder::new(name);
52-
if !self.quiet {
53-
builder = builder.with_stdout("info,zenoh=warn", false);
48+
// Only initialize basic tracing if OTLP is NOT requested
49+
// If OTLP is requested, we'll initialize everything inside the runtime
50+
if !enable_otlp {
51+
let name = "dora-daemon";
52+
let filename = self
53+
.machine_id
54+
.as_ref()
55+
.map(|id| format!("{name}-{id}"))
56+
.unwrap_or(name.to_string());
57+
let mut builder = TracingBuilder::new(name);
58+
if !self.quiet {
59+
builder = builder.with_stdout("info,zenoh=warn", false);
60+
}
61+
builder = builder.with_file(filename, LevelFilter::INFO)?;
62+
builder
63+
.build()
64+
.wrap_err("failed to set up tracing subscriber")?;
5465
}
55-
builder = builder.with_file(filename, LevelFilter::INFO)?;
56-
builder
57-
.build()
58-
.wrap_err("failed to set up tracing subscriber")?;
5966
}
6067

6168
let rt = Builder::new_multi_thread()
6269
.enable_all()
6370
.build()
6471
.context("tokio runtime failed")?;
65-
rt.block_on(async {
72+
73+
let machine_id = self.machine_id.clone();
74+
let quiet = self.quiet;
75+
76+
rt.block_on(async move {
77+
78+
#[cfg(feature = "tracing")]
79+
let _otel_guard = if enable_otlp {
80+
let name = "dora-daemon";
81+
let filename = machine_id
82+
.as_ref()
83+
.map(|id| format!("{name}-{id}"))
84+
.unwrap_or(name.to_string());
85+
let mut builder = TracingBuilder::new(name);
86+
87+
builder = builder
88+
.with_otlp_tracing()
89+
.wrap_err("failed to set up OTLP tracing")?;
90+
91+
if !quiet {
92+
builder = builder.with_stdout("info,zenoh=warn", false);
93+
}
94+
builder = builder.with_file(filename.clone(), LevelFilter::INFO)?;
95+
96+
97+
let guard = builder.guard.take();
98+
99+
builder
100+
.build()
101+
.wrap_err("failed to set up tracing subscriber with OTLP")?;
102+
103+
tracing::info!("OTLP tracing enabled for daemon");
104+
guard
105+
} else {
106+
None
107+
};
108+
66109
match self.run_dataflow {
67110
Some(dataflow_path) => {
68111
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
@@ -82,7 +125,7 @@ impl Executable for Daemon {
82125
handle_dataflow_result(result, None)
83126
}
84127
None => {
85-
dora_daemon::Daemon::run(SocketAddr::new(self.coordinator_addr, self.coordinator_port), self.machine_id, self.local_listen_port).await
128+
dora_daemon::Daemon::run(SocketAddr::new(self.coordinator_addr, self.coordinator_port), machine_id, self.local_listen_port).await
86129
}
87130
}
88131
})

dora-hub

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit 6b989343f15745a44fe1892988b193f38aadf107

libraries/extensions/telemetry/tracing/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,14 @@ impl TracingBuilder {
119119
.or_else(|_| std::env::var("DORA_JAEGER_TRACING"))
120120
.wrap_err("DORA_OTLP_ENDPOINT or DORA_JAEGER_TRACING environment variable not set")?;
121121

122-
// Initialize OTLP tracing - this returns a tracer and sets the global provider
123122
let sdk_tracer_provider = crate::telemetry::init_tracing(&self.name, &endpoint);
123+
124+
opentelemetry::global::set_tracer_provider(sdk_tracer_provider.clone());
125+
124126
let meter_provider = metrics::init_meter_provider();
125127

126128
// TODO: Maybe this needs to be removed in favor of application level global.
127129
// global::set_meter_provider(meter_provider.clone());
128-
// Use the specific tracer instance returned from init_tracing
129130
let tracer = sdk_tracer_provider.tracer("tracing-otel-subscriber");
130131

131132
let guard = OtelGuard {

libraries/extensions/telemetry/tracing/src/telemetry.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,12 @@ pub fn init_tracing(name: &str, endpoint: &str) -> sdktrace::SdkTracerProvider {
4646
.build()
4747
.unwrap();
4848

49+
let resource = opentelemetry_sdk::Resource::builder()
50+
.with_service_name(name.to_string())
51+
.build();
52+
4953
SdkTracerProvider::builder()
50-
// Customize sampling strategy
54+
.with_resource(resource)
5155
.with_batch_exporter(exporter)
5256
.build()
5357
}

0 commit comments

Comments
 (0)