From 27080106522cdea1989a993f059348aafb0c99e3 Mon Sep 17 00:00:00 2001 From: Scott Fleener Date: Wed, 12 Nov 2025 15:09:26 -0500 Subject: [PATCH] chore(tracing): Use upstream tracing batch exporter The proxy used its own custom trace exporter for some time. It was built before the upstream OpenTelemetry libraries were available, and they have outlived their usefulness. This replaces the custom exporter with a batch exporter configured to export to the same endpoint with the same service configuration. In the future, this exporter can be installed as a global default trace processor, which would decouple it from the service layer where the proxy generates spans for requests. Signed-off-by: Scott Fleener --- .../app/src/trace_collector/otel_collector.rs | 101 ++---- linkerd/opentelemetry/src/lib.rs | 314 +++++++----------- 2 files changed, 140 insertions(+), 275 deletions(-) diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs index 55b6c47e9d..5c5a147a41 100644 --- a/linkerd/app/src/trace_collector/otel_collector.rs +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -1,13 +1,6 @@ use super::EnabledCollector; use linkerd_app_core::{control::ControlAddr, proxy::http::Body, Error}; -use linkerd_opentelemetry::{ - self as opentelemetry, metrics, - proto::{ - tonic::common::v1::{any_value, AnyValue, KeyValue}, - transform::common::tonic::ResourceAttributesWithSchema, - }, - semconv, -}; +use linkerd_opentelemetry::{self as opentelemetry, metrics, otel::KeyValue, sdk, semconv}; use std::{ collections::HashMap, time::{SystemTime, UNIX_EPOCH}, @@ -29,7 +22,7 @@ pub(super) fn create_collector( legacy_metrics: metrics::Registry, ) -> EnabledCollector where - S: GrpcService + Clone + Send + 'static, + S: GrpcService + Clone + Send + Sync + 'static, S::Error: Into, S::Future: Send, S::ResponseBody: Body + Send + 'static, @@ -38,37 +31,33 @@ where let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); let spans_rx = ReceiverStream::new(spans_rx); - let mut resources = ResourceAttributesWithSchema::default(); - - resources - .attributes - .0 - .push((std::process::id() as i64).with_key(semconv::attribute::PROCESS_PID)); - - resources.attributes.0.push( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_secs() as i64) - .unwrap_or_else(|e| -(e.duration().as_secs() as i64)) - .with_key("process.start_timestamp"), - ); - resources.attributes.0.push( - attributes - .hostname - .unwrap_or_default() - .with_key(semconv::attribute::HOST_NAME), - ); - - resources.attributes.0.extend( - attributes - .extra - .into_iter() - .map(|(key, value)| value.with_key(&key)), - ); + let resource = sdk::Resource::builder() + .with_attribute(KeyValue::new( + semconv::attribute::PROCESS_PID, + std::process::id() as i64, + )) + .with_attribute(KeyValue::new( + "process.start_timestamp", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or_else(|e| -(e.duration().as_secs() as i64)), + )) + .with_attribute(KeyValue::new( + semconv::attribute::HOST_NAME, + attributes.hostname.unwrap_or_default(), + )) + .with_attributes( + attributes + .extra + .into_iter() + .map(|(k, v)| KeyValue::new(k, v)), + ) + .build(); let addr = addr.clone(); let task = Box::pin( - opentelemetry::export_spans(svc, spans_rx, resources, legacy_metrics) + opentelemetry::export_spans(svc, spans_rx, resource, legacy_metrics) .instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()), ); @@ -78,41 +67,3 @@ where span_sink, } } - -trait IntoAnyValue -where - Self: Sized, -{ - fn into_any_value(self) -> AnyValue; - - fn with_key(self, key: &str) -> KeyValue { - KeyValue { - key: key.to_string(), - value: Some(self.into_any_value()), - } - } -} - -impl IntoAnyValue for String { - fn into_any_value(self) -> AnyValue { - AnyValue { - value: Some(any_value::Value::StringValue(self)), - } - } -} - -impl IntoAnyValue for &str { - fn into_any_value(self) -> AnyValue { - AnyValue { - value: Some(any_value::Value::StringValue(self.to_string())), - } - } -} - -impl IntoAnyValue for i64 { - fn into_any_value(self) -> AnyValue { - AnyValue { - value: Some(any_value::Value::IntValue(self)), - } - } -} diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 51400ae3a0..4bcbc71475 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -15,229 +15,132 @@ use opentelemetry::{ }; pub use opentelemetry_proto as proto; use opentelemetry_proto::{ - tonic::{ - collector::trace::v1::{ - trace_service_client::TraceServiceClient, ExportTraceServiceRequest, - }, - trace::v1::ResourceSpans, - }, - transform::{ - common::tonic::ResourceAttributesWithSchema, - trace::tonic::group_spans_by_resource_and_scope, + tonic::collector::trace::v1::{ + trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }, + transform::trace::tonic::group_spans_by_resource_and_scope, }; -use opentelemetry_sdk::trace::{SpanData, SpanLinks}; -pub use opentelemetry_sdk::{self as sdk}; -pub use opentelemetry_semantic_conventions as semconv; -use tokio::{ - sync::mpsc, - time::{self, Instant, MissedTickBehavior}, +pub use opentelemetry_sdk as sdk; +use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; +use opentelemetry_sdk::trace::{ + BatchConfigBuilder, BatchSpanProcessor, SpanData, SpanLinks, SpanProcessor, }; -use tonic::{self as grpc, body::Body as TonicBody, client::GrpcService}; -use tracing::{debug, info, trace}; +use opentelemetry_sdk::Resource; +pub use opentelemetry_semantic_conventions as semconv; +use std::fmt::{Debug, Formatter}; +use std::time::Duration; +use tonic::{body::Body as TonicBody, client::GrpcService}; +use tracing::{debug, info}; -pub async fn export_spans( - client: T, - spans: S, - resource: ResourceAttributesWithSchema, - metrics: Registry, -) where - T: GrpcService + Clone, +pub async fn export_spans(client: T, spans: S, resource: Resource, metrics: Registry) +where + T: GrpcService + Clone + Send + Sync + 'static, T::Error: Into, + T::Future: Send, T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, S: Stream + Unpin, { debug!("Span exporter running"); - SpanExporter::new(client, spans, resource, metrics) - .run() - .await + + let processor = BatchSpanProcessor::builder(SpanExporter { + client: TraceServiceClient::new(client), + resource, + metrics, + }) + .with_batch_config( + BatchConfigBuilder::default() + .with_max_queue_size(1000) + .with_scheduled_delay(Duration::from_secs(5)) + .build(), + ) + .build(); + + SpanExportTask::new(spans, processor).run().await; } /// SpanExporter sends a Stream of spans to the given TraceService gRPC service. -struct SpanExporter { - client: T, - spans: S, - resource: ResourceAttributesWithSchema, +struct SpanExporter { + client: TraceServiceClient, + resource: Resource, metrics: Registry, } -#[derive(Debug)] -struct SpanRxClosed; - -// === impl SpanExporter === +impl Debug for SpanExporter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SpanExporter").finish_non_exhaustive() + } +} -impl SpanExporter +impl opentelemetry_sdk::trace::SpanExporter for SpanExporter where - T: GrpcService + Clone, + T: GrpcService + Clone + Send + Sync, + >::Future: Send, T::Error: Into, T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, { - const MAX_BATCH_SIZE: usize = 1000; - const BATCH_INTERVAL: time::Duration = time::Duration::from_secs(10); - - fn new(client: T, spans: S, resource: ResourceAttributesWithSchema, metrics: Registry) -> Self { - Self { - client, - spans, - resource, - metrics, - } - } - - async fn run(self) { - let Self { - client, - mut spans, - resource, - mut metrics, - } = self; - - // Holds the batch of pending spans. Cleared as the spans are flushed. - // Contains no more than MAX_BATCH_SIZE spans. - let mut accum = Vec::new(); - - let mut svc = TraceServiceClient::new(client); - loop { - trace!("Establishing new TraceService::export request"); - metrics.start_stream(); - let (tx, mut rx) = mpsc::channel(1); - - let recv_future = async { - while let Some(req) = rx.recv().await { - match svc.export(grpc::Request::new(req)).await { - Ok(rsp) => { - let Some(partial_success) = rsp.into_inner().partial_success else { - continue; - }; - - if !partial_success.error_message.is_empty() { - debug!( - %partial_success.error_message, - rejected_spans = partial_success.rejected_spans, - "Response partially successful", - ); - } - } - Err(error) => { - debug!(%error, "Response future failed; restarting"); - } + async fn export(&self, batch: Vec) -> OTelSdkResult { + let mut metrics = self.metrics.clone(); + metrics.start_stream(); + let span_count = batch.len(); + let mut client = self.client.clone(); + + debug!("Exporting {span_count} spans"); + + let resource_spans = group_spans_by_resource_and_scope(batch, &(&self.resource).into()); + + match client + .export(ExportTraceServiceRequest { resource_spans }) + .await + { + Ok(resp) => { + metrics.send(span_count as u64); + if let Some(partial) = resp.into_inner().partial_success { + if !partial.error_message.is_empty() { + debug!( + %partial.error_message, + rejected_spans = partial.rejected_spans, + "Response partially successful", + ); + return Err(OTelSdkError::InternalFailure(partial.error_message)); } } - }; - - // Drive both the response future and the export stream - // simultaneously. - tokio::select! { - _ = recv_future => {} - res = Self::export(&tx, &mut spans, &resource, &mut accum) => match res { - // The export stream closed; reconnect. - Ok(()) => {}, - // No more spans. - Err(SpanRxClosed) => return, - }, + } + Err(e) => { + return Err(OTelSdkError::InternalFailure(e.to_string())); } } + + Ok(()) } +} - /// Accumulate spans and send them on the export stream. - /// - /// Returns an error when the proxy has closed the span stream. - async fn export( - tx: &mpsc::Sender, - spans: &mut S, - resource: &ResourceAttributesWithSchema, - accum: &mut Vec, - ) -> Result<(), SpanRxClosed> { - loop { - // Collect spans into a batch. - let collect = Self::collect_batch(spans, resource, accum).await; - - // If we collected spans, flush them. - if !accum.is_empty() { - // Once a batch has been accumulated, ensure that the - // request stream is ready to accept the batch. - match tx.reserve().await { - Ok(tx) => { - let msg = ExportTraceServiceRequest { - resource_spans: std::mem::take(accum), - }; - trace!(spans = msg.resource_spans.len(), "Sending batch"); - tx.send(msg); - } - Err(error) => { - // If the channel isn't open, start a new stream - // and retry sending the batch. - debug!(%error, "Request stream lost; restarting"); - return Ok(()); - } - } - } +struct SpanExportTask { + spans: S, + processor: BatchSpanProcessor, +} - // If the span source was closed, end the task. - if let Err(closed) = collect { - debug!("Span channel lost"); - return Err(closed); - } - } +impl SpanExportTask +where + S: Stream + Unpin, +{ + fn new(spans: S, processor: BatchSpanProcessor) -> Self { + Self { spans, processor } } - /// Collects spans from the proxy into `accum`. - /// - /// Returns an error when the span stream has completed. An error may be - /// returned after accumulating spans. - async fn collect_batch( - span_stream: &mut S, - resource: &ResourceAttributesWithSchema, - accum: &mut Vec, - ) -> Result<(), SpanRxClosed> { - let mut input_accum: Vec = vec![]; - - let mut interval = - time::interval_at(Instant::now() + Self::BATCH_INTERVAL, Self::BATCH_INTERVAL); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let res = loop { - if input_accum.len() == Self::MAX_BATCH_SIZE { - trace!(capacity = Self::MAX_BATCH_SIZE, "Batch capacity reached"); - break Ok(()); - } - - tokio::select! { - biased; - - res = span_stream.next() => match res { - Some(span) => { - trace!(?span, "Adding to batch"); - let span = match convert_span(span) { - Ok(span) => span, - Err(error) => { - info!(%error, "Span dropped"); - continue; - } - }; - - input_accum.push(span); - } - None => break Err(SpanRxClosed), - }, - - // Don't hold spans indefinitely. Return if we hit an interval tick and spans have - // been collected. - _ = interval.tick() => { - if !input_accum.is_empty() { - trace!(spans = input_accum.len(), "Flushing spans due to interval tick"); - break Ok(()); - } + async fn run(mut self) { + while let Some(span) = self.spans.next().await { + let s = match convert_span(span) { + Ok(s) => s, + Err(error) => { + info!(%error, "Span dropped"); + continue; } - } - }; - - *accum = group_spans_by_resource_and_scope(input_accum, resource); + }; - res + self.processor.on_end(s); + } } } @@ -282,7 +185,7 @@ fn convert_span(span: ExportSpan) -> Result { mod tests { use super::*; use linkerd_trace_context::{export::SpanKind, Id, Span}; - use opentelemetry_proto::tonic::{common::v1::InstrumentationScope, resource::v1::Resource}; + use opentelemetry_proto::tonic::common::v1::InstrumentationScope; use std::{collections::HashMap, sync::Arc, time::SystemTime}; use tokio::sync::mpsc; use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt, Bytes}; @@ -322,14 +225,25 @@ mod tests { assert_eq!(req.resource_spans.len(), 1); let mut resource_span = req.resource_spans.remove(0); - assert_eq!( - resource_span.resource, - Some(Resource { - attributes: vec![], - dropped_attributes_count: 0, - entity_refs: vec![], - }) - ); + let resource_span_resource = resource_span.resource.expect("must have resource"); + assert_eq!(resource_span_resource.dropped_attributes_count, 0); + assert_eq!(resource_span_resource.entity_refs, vec![]); + resource_span_resource + .attributes + .iter() + .any(|attr| attr.key == "telemetry.sdk.name"); + resource_span_resource + .attributes + .iter() + .any(|attr| attr.key == "telemetry.sdk.version"); + resource_span_resource + .attributes + .iter() + .any(|attr| attr.key == "telemetry.sdk.language"); + resource_span_resource + .attributes + .iter() + .any(|attr| attr.key == "service.name"); assert_eq!(resource_span.schema_url, ""); assert_eq!(resource_span.scope_spans.len(), 1); @@ -392,7 +306,7 @@ mod tests { tokio::spawn(export_spans( inner, ReceiverStream::new(span_rx), - ResourceAttributesWithSchema::default(), + opentelemetry_sdk::Resource::builder().build(), metrics, ));