Skip to content

Commit 4cb58f2

Browse files
committed
chore(tracing): Use upstream OpenTelemetry for all tracing
This removes all remaining bespoke tracing infrastructure. The main change here is registering a global tracer provider that the trace propagation service can use, replacing the manually wired span data channel. This leaves in some remnants of the old infrastructure, specifically around the span sink types. These types are fairly viral, so a future PR will clean these up. Signed-off-by: Scott Fleener <[email protected]>
1 parent e460b63 commit 4cb58f2

File tree

8 files changed

+167
-261
lines changed

8 files changed

+167
-261
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3024,6 +3024,8 @@ dependencies = [
30243024
"percent-encoding",
30253025
"rand 0.9.2",
30263026
"thiserror",
3027+
"tokio",
3028+
"tokio-stream",
30273029
]
30283030

30293031
[[package]]

linkerd/app/core/src/http_tracing.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,17 @@ use tokio::sync::mpsc;
1111
pub type SpanSink = mpsc::Sender<ExportSpan>;
1212

1313
pub fn server<S>(
14-
sink: Option<SpanSink>,
14+
_sink: Option<SpanSink>,
1515
labels: impl Into<SpanLabels>,
16-
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
17-
TraceContext::layer(sink.map(move |sink| SpanConverter {
18-
kind: SpanKind::Server,
19-
sink,
20-
labels: labels.into(),
21-
}))
16+
) -> impl layer::Layer<S, Service = TraceContext<S>> + Clone {
17+
TraceContext::layer(SpanKind::Server, labels.into())
2218
}
2319

2420
pub fn client<S>(
25-
sink: Option<SpanSink>,
21+
_sink: Option<SpanSink>,
2622
labels: impl Into<SpanLabels>,
27-
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
28-
TraceContext::layer(sink.map(move |sink| SpanConverter {
29-
kind: SpanKind::Client,
30-
sink,
31-
labels: labels.into(),
32-
}))
23+
) -> impl layer::Layer<S, Service = TraceContext<S>> + Clone {
24+
TraceContext::layer(SpanKind::Client, labels.into())
3325
}
3426

3527
#[derive(Clone)]

linkerd/app/src/lib.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,6 @@ impl App {
399399
admin,
400400
drain,
401401
identity,
402-
trace_collector: collector,
403402
start_proxy,
404403
tap,
405404
..
@@ -464,10 +463,6 @@ impl App {
464463
tokio::spawn(serve.instrument(info_span!("tap").or_current()));
465464
}
466465

467-
if let trace_collector::TraceCollector::Enabled(collector) = collector {
468-
tokio::spawn(collector.task.instrument(info_span!("tracing")));
469-
}
470-
471466
// we don't care if the admin shutdown channel is
472467
// dropped or actually triggered.
473468
let _ = admin_shutdown_rx.await;

linkerd/app/src/trace_collector.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ pub enum TraceCollector {
3333
pub struct EnabledCollector {
3434
pub addr: control::ControlAddr,
3535
pub span_sink: SpanSink,
36-
pub task: Task,
3736
}
3837

3938
impl TraceCollector {

linkerd/app/src/trace_collector/otel_collector.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ use std::{
66
time::{SystemTime, UNIX_EPOCH},
77
};
88
use tokio::sync::mpsc;
9-
use tokio_stream::wrappers::ReceiverStream;
109
use tonic::{body::Body as TonicBody, client::GrpcService};
11-
use tracing::Instrument;
1210

1311
pub(super) struct OtelCollectorAttributes {
1412
pub hostname: Option<String>,
@@ -28,8 +26,7 @@ where
2826
S::ResponseBody: Body<Data = tonic::codegen::Bytes> + Send + 'static,
2927
<S::ResponseBody as Body>::Error: Into<Error> + Send,
3028
{
31-
let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY);
32-
let spans_rx = ReceiverStream::new(spans_rx);
29+
let (span_sink, _) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY);
3330

3431
let resource = sdk::Resource::builder()
3532
.with_attribute(KeyValue::new(
@@ -56,14 +53,7 @@ where
5653
.build();
5754

5855
let addr = addr.clone();
59-
let task = Box::pin(
60-
opentelemetry::export_spans(svc, spans_rx, resource, legacy_metrics)
61-
.instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()),
62-
);
56+
opentelemetry::install_opentelemetry_providers(svc, resource, legacy_metrics);
6357

64-
EnabledCollector {
65-
addr,
66-
task,
67-
span_sink,
68-
}
58+
EnabledCollector { addr, span_sink }
6959
}

0 commit comments

Comments
 (0)