diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs index 55b6c47e9d..5c5a147a41 100644 --- a/linkerd/app/src/trace_collector/otel_collector.rs +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -1,13 +1,6 @@ use super::EnabledCollector; use linkerd_app_core::{control::ControlAddr, proxy::http::Body, Error}; -use linkerd_opentelemetry::{ - self as opentelemetry, metrics, - proto::{ - tonic::common::v1::{any_value, AnyValue, KeyValue}, - transform::common::tonic::ResourceAttributesWithSchema, - }, - semconv, -}; +use linkerd_opentelemetry::{self as opentelemetry, metrics, otel::KeyValue, sdk, semconv}; use std::{ collections::HashMap, time::{SystemTime, UNIX_EPOCH}, @@ -29,7 +22,7 @@ pub(super) fn create_collector( legacy_metrics: metrics::Registry, ) -> EnabledCollector where - S: GrpcService + Clone + Send + 'static, + S: GrpcService + Clone + Send + Sync + 'static, S::Error: Into, S::Future: Send, S::ResponseBody: Body + Send + 'static, @@ -38,37 +31,33 @@ where let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); let spans_rx = ReceiverStream::new(spans_rx); - let mut resources = ResourceAttributesWithSchema::default(); - - resources - .attributes - .0 - .push((std::process::id() as i64).with_key(semconv::attribute::PROCESS_PID)); - - resources.attributes.0.push( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_secs() as i64) - .unwrap_or_else(|e| -(e.duration().as_secs() as i64)) - .with_key("process.start_timestamp"), - ); - resources.attributes.0.push( - attributes - .hostname - .unwrap_or_default() - .with_key(semconv::attribute::HOST_NAME), - ); - - resources.attributes.0.extend( - attributes - .extra - .into_iter() - .map(|(key, value)| value.with_key(&key)), - ); + let resource = sdk::Resource::builder() + .with_attribute(KeyValue::new( + semconv::attribute::PROCESS_PID, + std::process::id() as i64, + )) + .with_attribute(KeyValue::new( + "process.start_timestamp", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or_else(|e| -(e.duration().as_secs() as i64)), + )) + .with_attribute(KeyValue::new( + semconv::attribute::HOST_NAME, + attributes.hostname.unwrap_or_default(), + )) + .with_attributes( + attributes + .extra + .into_iter() + .map(|(k, v)| KeyValue::new(k, v)), + ) + .build(); let addr = addr.clone(); let task = Box::pin( - opentelemetry::export_spans(svc, spans_rx, resources, legacy_metrics) + opentelemetry::export_spans(svc, spans_rx, resource, legacy_metrics) .instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()), ); @@ -78,41 +67,3 @@ where span_sink, } } - -trait IntoAnyValue -where - Self: Sized, -{ - fn into_any_value(self) -> AnyValue; - - fn with_key(self, key: &str) -> KeyValue { - KeyValue { - key: key.to_string(), - value: Some(self.into_any_value()), - } - } -} - -impl IntoAnyValue for String { - fn into_any_value(self) -> AnyValue { - AnyValue { - value: Some(any_value::Value::StringValue(self)), - } - } -} - -impl IntoAnyValue for &str { - fn into_any_value(self) -> AnyValue { - AnyValue { - value: Some(any_value::Value::StringValue(self.to_string())), - } - } -} - -impl IntoAnyValue for i64 { - fn into_any_value(self) -> AnyValue { - AnyValue { - value: Some(any_value::Value::IntValue(self)), - } - } -} diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 51400ae3a0..4bcbc71475 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -15,229 +15,132 @@ use opentelemetry::{ }; pub use opentelemetry_proto as proto; use opentelemetry_proto::{ - tonic::{ - collector::trace::v1::{ - trace_service_client::TraceServiceClient, ExportTraceServiceRequest, - }, - trace::v1::ResourceSpans, - }, - transform::{ - common::tonic::ResourceAttributesWithSchema, - trace::tonic::group_spans_by_resource_and_scope, + tonic::collector::trace::v1::{ + trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }, + transform::trace::tonic::group_spans_by_resource_and_scope, }; -use opentelemetry_sdk::trace::{SpanData, SpanLinks}; -pub use opentelemetry_sdk::{self as sdk}; -pub use opentelemetry_semantic_conventions as semconv; -use tokio::{ - sync::mpsc, - time::{self, Instant, MissedTickBehavior}, +pub use opentelemetry_sdk as sdk; +use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; +use opentelemetry_sdk::trace::{ + BatchConfigBuilder, BatchSpanProcessor, SpanData, SpanLinks, SpanProcessor, }; -use tonic::{self as grpc, body::Body as TonicBody, client::GrpcService}; -use tracing::{debug, info, trace}; +use opentelemetry_sdk::Resource; +pub use opentelemetry_semantic_conventions as semconv; +use std::fmt::{Debug, Formatter}; +use std::time::Duration; +use tonic::{body::Body as TonicBody, client::GrpcService}; +use tracing::{debug, info}; -pub async fn export_spans( - client: T, - spans: S, - resource: ResourceAttributesWithSchema, - metrics: Registry, -) where - T: GrpcService + Clone, +pub async fn export_spans(client: T, spans: S, resource: Resource, metrics: Registry) +where + T: GrpcService + Clone + Send + Sync + 'static, T::Error: Into, + T::Future: Send, T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, S: Stream + Unpin, { debug!("Span exporter running"); - SpanExporter::new(client, spans, resource, metrics) - .run() - .await + + let processor = BatchSpanProcessor::builder(SpanExporter { + client: TraceServiceClient::new(client), + resource, + metrics, + }) + .with_batch_config( + BatchConfigBuilder::default() + .with_max_queue_size(1000) + .with_scheduled_delay(Duration::from_secs(5)) + .build(), + ) + .build(); + + SpanExportTask::new(spans, processor).run().await; } /// SpanExporter sends a Stream of spans to the given TraceService gRPC service. -struct SpanExporter { - client: T, - spans: S, - resource: ResourceAttributesWithSchema, +struct SpanExporter { + client: TraceServiceClient, + resource: Resource, metrics: Registry, } -#[derive(Debug)] -struct SpanRxClosed; - -// === impl SpanExporter === +impl Debug for SpanExporter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SpanExporter").finish_non_exhaustive() + } +} -impl SpanExporter +impl opentelemetry_sdk::trace::SpanExporter for SpanExporter where - T: GrpcService + Clone, + T: GrpcService + Clone + Send + Sync, + >::Future: Send, T::Error: Into, T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, { - const MAX_BATCH_SIZE: usize = 1000; - const BATCH_INTERVAL: time::Duration = time::Duration::from_secs(10); - - fn new(client: T, spans: S, resource: ResourceAttributesWithSchema, metrics: Registry) -> Self { - Self { - client, - spans, - resource, - metrics, - } - } - - async fn run(self) { - let Self { - client, - mut spans, - resource, - mut metrics, - } = self; - - // Holds the batch of pending spans. Cleared as the spans are flushed. - // Contains no more than MAX_BATCH_SIZE spans. - let mut accum = Vec::new(); - - let mut svc = TraceServiceClient::new(client); - loop { - trace!("Establishing new TraceService::export request"); - metrics.start_stream(); - let (tx, mut rx) = mpsc::channel(1); - - let recv_future = async { - while let Some(req) = rx.recv().await { - match svc.export(grpc::Request::new(req)).await { - Ok(rsp) => { - let Some(partial_success) = rsp.into_inner().partial_success else { - continue; - }; - - if !partial_success.error_message.is_empty() { - debug!( - %partial_success.error_message, - rejected_spans = partial_success.rejected_spans, - "Response partially successful", - ); - } - } - Err(error) => { - debug!(%error, "Response future failed; restarting"); - } + async fn export(&self, batch: Vec) -> OTelSdkResult { + let mut metrics = self.metrics.clone(); + metrics.start_stream(); + let span_count = batch.len(); + let mut client = self.client.clone(); + + debug!("Exporting {span_count} spans"); + + let resource_spans = group_spans_by_resource_and_scope(batch, &(&self.resource).into()); + + match client + .export(ExportTraceServiceRequest { resource_spans }) + .await + { + Ok(resp) => { + metrics.send(span_count as u64); + if let Some(partial) = resp.into_inner().partial_success { + if !partial.error_message.is_empty() { + debug!( + %partial.error_message, + rejected_spans = partial.rejected_spans, + "Response partially successful", + ); + return Err(OTelSdkError::InternalFailure(partial.error_message)); } } - }; - - // Drive both the response future and the export stream - // simultaneously. - tokio::select! { - _ = recv_future => {} - res = Self::export(&tx, &mut spans, &resource, &mut accum) => match res { - // The export stream closed; reconnect. - Ok(()) => {}, - // No more spans. - Err(SpanRxClosed) => return, - }, + } + Err(e) => { + return Err(OTelSdkError::InternalFailure(e.to_string())); } } + + Ok(()) } +} - /// Accumulate spans and send them on the export stream. - /// - /// Returns an error when the proxy has closed the span stream. - async fn export( - tx: &mpsc::Sender, - spans: &mut S, - resource: &ResourceAttributesWithSchema, - accum: &mut Vec, - ) -> Result<(), SpanRxClosed> { - loop { - // Collect spans into a batch. - let collect = Self::collect_batch(spans, resource, accum).await; - - // If we collected spans, flush them. - if !accum.is_empty() { - // Once a batch has been accumulated, ensure that the - // request stream is ready to accept the batch. - match tx.reserve().await { - Ok(tx) => { - let msg = ExportTraceServiceRequest { - resource_spans: std::mem::take(accum), - }; - trace!(spans = msg.resource_spans.len(), "Sending batch"); - tx.send(msg); - } - Err(error) => { - // If the channel isn't open, start a new stream - // and retry sending the batch. - debug!(%error, "Request stream lost; restarting"); - return Ok(()); - } - } - } +struct SpanExportTask { + spans: S, + processor: BatchSpanProcessor, +} - // If the span source was closed, end the task. - if let Err(closed) = collect { - debug!("Span channel lost"); - return Err(closed); - } - } +impl SpanExportTask +where + S: Stream + Unpin, +{ + fn new(spans: S, processor: BatchSpanProcessor) -> Self { + Self { spans, processor } } - /// Collects spans from the proxy into `accum`. - /// - /// Returns an error when the span stream has completed. An error may be - /// returned after accumulating spans. - async fn collect_batch( - span_stream: &mut S, - resource: &ResourceAttributesWithSchema, - accum: &mut Vec, - ) -> Result<(), SpanRxClosed> { - let mut input_accum: Vec = vec![]; - - let mut interval = - time::interval_at(Instant::now() + Self::BATCH_INTERVAL, Self::BATCH_INTERVAL); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let res = loop { - if input_accum.len() == Self::MAX_BATCH_SIZE { - trace!(capacity = Self::MAX_BATCH_SIZE, "Batch capacity reached"); - break Ok(()); - } - - tokio::select! { - biased; - - res = span_stream.next() => match res { - Some(span) => { - trace!(?span, "Adding to batch"); - let span = match convert_span(span) { - Ok(span) => span, - Err(error) => { - info!(%error, "Span dropped"); - continue; - } - }; - - input_accum.push(span); - } - None => break Err(SpanRxClosed), - }, - - // Don't hold spans indefinitely. Return if we hit an interval tick and spans have - // been collected. - _ = interval.tick() => { - if !input_accum.is_empty() { - trace!(spans = input_accum.len(), "Flushing spans due to interval tick"); - break Ok(()); - } + async fn run(mut self) { + while let Some(span) = self.spans.next().await { + let s = match convert_span(span) { + Ok(s) => s, + Err(error) => { + info!(%error, "Span dropped"); + continue; } - } - }; - - *accum = group_spans_by_resource_and_scope(input_accum, resource); + }; - res + self.processor.on_end(s); + } } } @@ -282,7 +185,7 @@ fn convert_span(span: ExportSpan) -> Result { mod tests { use super::*; use linkerd_trace_context::{export::SpanKind, Id, Span}; - use opentelemetry_proto::tonic::{common::v1::InstrumentationScope, resource::v1::Resource}; + use opentelemetry_proto::tonic::common::v1::InstrumentationScope; use std::{collections::HashMap, sync::Arc, time::SystemTime}; use tokio::sync::mpsc; use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt, Bytes}; @@ -322,14 +225,25 @@ mod tests { assert_eq!(req.resource_spans.len(), 1); let mut resource_span = req.resource_spans.remove(0); - assert_eq!( - resource_span.resource, - Some(Resource { - attributes: vec![], - dropped_attributes_count: 0, - entity_refs: vec![], - }) - ); + let resource_span_resource = resource_span.resource.expect("must have resource"); + assert_eq!(resource_span_resource.dropped_attributes_count, 0); + assert_eq!(resource_span_resource.entity_refs, vec![]); + resource_span_resource + .attributes + .iter() + .any(|attr| attr.key == "telemetry.sdk.name"); + resource_span_resource + .attributes + .iter() + .any(|attr| attr.key == "telemetry.sdk.version"); + resource_span_resource + .attributes + .iter() + .any(|attr| attr.key == "telemetry.sdk.language"); + resource_span_resource + .attributes + .iter() + .any(|attr| attr.key == "service.name"); assert_eq!(resource_span.schema_url, ""); assert_eq!(resource_span.scope_spans.len(), 1); @@ -392,7 +306,7 @@ mod tests { tokio::spawn(export_spans( inner, ReceiverStream::new(span_rx), - ResourceAttributesWithSchema::default(), + opentelemetry_sdk::Resource::builder().build(), metrics, ));