diff --git a/linkerd/app/core/src/http_tracing.rs b/linkerd/app/core/src/http_tracing.rs index 984bf0b0ec..7e3b2fa687 100644 --- a/linkerd/app/core/src/http_tracing.rs +++ b/linkerd/app/core/src/http_tracing.rs @@ -1,47 +1,15 @@ -use linkerd_error::Error; +use linkerd_opentelemetry::otel::trace::SpanKind; use linkerd_stack::layer; -use linkerd_trace_context::{ - self as trace_context, - export::{ExportSpan, SpanKind, SpanLabels}, - Span, TraceContext, -}; -use std::sync::Arc; -use tokio::sync::mpsc; - -pub type SpanSink = mpsc::Sender; +use linkerd_trace_context::{export::SpanLabels, TraceContext}; pub fn server( - _sink: Option, labels: impl Into, ) -> impl layer::Layer> + Clone { TraceContext::layer(SpanKind::Server, labels.into()) } pub fn client( - _sink: Option, labels: impl Into, ) -> impl layer::Layer> + Clone { TraceContext::layer(SpanKind::Client, labels.into()) } - -#[derive(Clone)] -pub struct SpanConverter { - kind: SpanKind, - sink: SpanSink, - labels: SpanLabels, -} - -impl trace_context::SpanSink for SpanConverter { - fn is_enabled(&self) -> bool { - true - } - - fn try_send(&mut self, span: Span) -> Result<(), Error> { - self.sink.try_send(ExportSpan { - span, - kind: self.kind, - labels: Arc::clone(&self.labels), - })?; - Ok(()) - } -} diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index 2cc487bae8..5d89c1e460 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -65,7 +65,6 @@ pub struct ProxyRuntime { pub identity: identity::creds::Receiver, pub metrics: metrics::Proxy, pub tap: proxy::tap::Registry, - pub span_sink: Option, pub drain: drain::Watch, } diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index 5739b0f36e..bfe599c768 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -134,7 +134,7 @@ impl Inbound { endpoint_labels(unsafe_authority_labels), ), ) - .push_on_service(http_tracing::client(rt.span_sink.clone(), super::trace_labels())) + .push_on_service(http_tracing::client(super::trace_labels())) .push_on_service(http::BoxResponse::layer()) .arc_new_http(); diff --git a/linkerd/app/inbound/src/http/server.rs b/linkerd/app/inbound/src/http/server.rs index 6316e3ea06..0885edf5ad 100644 --- a/linkerd/app/inbound/src/http/server.rs +++ b/linkerd/app/inbound/src/http/server.rs @@ -78,10 +78,7 @@ impl Inbound { .push_on_service(svc::MapErr::layer_boxed()) .push(rt.metrics.http_errors.to_layer()) .push(ServerRescue::layer()) - .push_on_service(http_tracing::server( - rt.span_sink.clone(), - super::trace_labels(), - )) + .push_on_service(http_tracing::server(super::trace_labels())) // Record when an HTTP/1 URI was in absolute form .push_on_service(http::normalize_uri::MarkAbsoluteForm::layer()) .push_on_service(http::BoxResponse::layer()) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 7e6a810194..529bc1f45c 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -25,9 +25,7 @@ pub use self::{ }; use linkerd_app_core::{ config::{ProxyConfig, QueueConfig}, - drain, - http_tracing::SpanSink, - identity, + drain, identity, metrics::prom, proxy::tap, svc, @@ -71,7 +69,6 @@ struct Runtime { metrics: InboundMetrics, identity: identity::creds::Receiver, tap: tap::Registry, - span_sink: Option, drain: drain::Watch, } @@ -154,7 +151,6 @@ impl Inbound<()> { metrics: InboundMetrics::new(runtime.metrics, prom), identity: runtime.identity, tap: runtime.tap, - span_sink: runtime.span_sink, drain: runtime.drain, }; Self { diff --git a/linkerd/app/inbound/src/test_util.rs b/linkerd/app/inbound/src/test_util.rs index 1fb420351c..4db4b246a0 100644 --- a/linkerd/app/inbound/src/test_util.rs +++ b/linkerd/app/inbound/src/test_util.rs @@ -99,7 +99,6 @@ pub fn runtime() -> (ProxyRuntime, drain::Signal) { identity: identity::creds::default_for_test().1, metrics: metrics.proxy, tap, - span_sink: None, drain, }; (runtime, drain_tx) diff --git a/linkerd/app/outbound/src/http/endpoint.rs b/linkerd/app/outbound/src/http/endpoint.rs index 8556f96b7f..1e0713aa47 100644 --- a/linkerd/app/outbound/src/http/endpoint.rs +++ b/linkerd/app/outbound/src/http/endpoint.rs @@ -117,10 +117,7 @@ impl Outbound> { .http_endpoint .to_layer::(), ) - .push_on_service(http_tracing::client( - rt.span_sink.clone(), - crate::trace_labels(), - )) + .push_on_service(http_tracing::client(crate::trace_labels())) .push(NewRequireIdentity::layer()) .push(http::NewOverrideAuthority::layer(vec![ "host", diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index 70cc87cbee..b27b116521 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -48,7 +48,7 @@ impl Outbound> { .push(ServerRescue::layer(config.emit_headers)) .check_new_service::>() // Initiates OpenTelemetry tracing. - .push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels())) + .push_on_service(http_tracing::server(trace_labels())) .push_on_service(http::BoxResponse::layer()) // Convert origin form HTTP/1 URIs to absolute form for Hyper's // `Client`. diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 1d6adb631a..1dff6146c8 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -10,7 +10,6 @@ use linkerd_app_core::{ config::{ProxyConfig, QueueConfig}, drain, exp_backoff::ExponentialBackoff, - http_tracing::SpanSink, identity, io, metrics::prom, profiles, @@ -97,7 +96,6 @@ struct Runtime { metrics: OutboundMetrics, identity: identity::NewClient, tap: tap::Registry, - span_sink: Option, drain: drain::Watch, } @@ -127,7 +125,6 @@ impl Outbound<()> { metrics: OutboundMetrics::new(runtime.metrics, prom), identity: runtime.identity.new_client(), tap: runtime.tap, - span_sink: runtime.span_sink, drain: runtime.drain, }; Self { diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index b77a3909f1..b005991cf8 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -63,7 +63,6 @@ pub(crate) fn runtime() -> (ProxyRuntime, drain::Signal) { identity: linkerd_meshtls::creds::default_for_test().1, metrics: metrics.proxy, tap, - span_sink: None, drain, }; (runtime, drain_tx) diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 7908b57034..5444656669 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -215,7 +215,6 @@ impl Config { identity: identity.receiver(), metrics: metrics.proxy, tap: tap.registry(), - span_sink: trace_collector.span_sink(), drain: drain_rx.clone(), }; let inbound = Inbound::new( diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs index 1fd07baa51..655c84347d 100644 --- a/linkerd/app/src/trace_collector.rs +++ b/linkerd/app/src/trace_collector.rs @@ -1,6 +1,5 @@ use linkerd_app_core::{ - control, dns, http_tracing::SpanSink, identity, metrics::ControlHttp as HttpMetrics, - opentelemetry, svc::NewService, + control, dns, identity, metrics::ControlHttp as HttpMetrics, opentelemetry, svc::NewService, }; use linkerd_error::Error; use otel_collector::OtelCollectorAttributes; @@ -8,8 +7,6 @@ use std::{collections::HashMap, future::Future, pin::Pin}; pub mod otel_collector; -const SPAN_BUFFER_CAPACITY: usize = 100; - #[derive(Clone, Debug)] pub enum Config { Disabled, @@ -32,16 +29,6 @@ pub enum TraceCollector { pub struct EnabledCollector { pub addr: control::ControlAddr, - pub span_sink: SpanSink, -} - -impl TraceCollector { - pub fn span_sink(&self) -> Option { - match self { - TraceCollector::Disabled => None, - TraceCollector::Enabled(inner) => Some(inner.span_sink.clone()), - } - } } impl Config { diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs index 824fc05c70..24c0c526e3 100644 --- a/linkerd/app/src/trace_collector/otel_collector.rs +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -5,7 +5,6 @@ use std::{ collections::HashMap, time::{SystemTime, UNIX_EPOCH}, }; -use tokio::sync::mpsc; use tonic::{body::Body as TonicBody, client::GrpcService}; pub(super) struct OtelCollectorAttributes { @@ -26,8 +25,6 @@ where S::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { - let (span_sink, _) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); - let resource = sdk::Resource::builder() .with_attribute(KeyValue::new( semconv::attribute::PROCESS_PID, @@ -55,5 +52,5 @@ where let addr = addr.clone(); opentelemetry::install_opentelemetry_providers(svc, resource, legacy_metrics); - EnabledCollector { addr, span_sink } + EnabledCollector { addr } } diff --git a/linkerd/trace-context/src/export.rs b/linkerd/trace-context/src/export.rs index 31e57a0907..24054d5738 100644 --- a/linkerd/trace-context/src/export.rs +++ b/linkerd/trace-context/src/export.rs @@ -1,27 +1,4 @@ -use crate::Span; use std::collections::HashMap; use std::sync::Arc; -#[derive(Copy, Clone, Debug, PartialEq)] -pub enum SpanKind { - Server = 1, - Client = 2, -} - -impl From for opentelemetry::trace::SpanKind { - fn from(value: SpanKind) -> Self { - match value { - SpanKind::Server => opentelemetry::trace::SpanKind::Server, - SpanKind::Client => opentelemetry::trace::SpanKind::Client, - } - } -} - pub type SpanLabels = Arc>; - -#[derive(Debug)] -pub struct ExportSpan { - pub span: Span, - pub kind: SpanKind, - pub labels: SpanLabels, -} diff --git a/linkerd/trace-context/src/lib.rs b/linkerd/trace-context/src/lib.rs index 6d208ca380..0399516f4e 100644 --- a/linkerd/trace-context/src/lib.rs +++ b/linkerd/trace-context/src/lib.rs @@ -6,7 +6,6 @@ mod service; pub use self::service::TraceContext; use bytes::Bytes; -use linkerd_error::Error; use opentelemetry::{KeyValue, SpanId, TraceId}; use std::fmt; use std::time::SystemTime; @@ -54,24 +53,6 @@ pub struct Span { pub labels: Vec, } -pub trait SpanSink { - fn is_enabled(&self) -> bool; - - fn try_send(&mut self, span: Span) -> Result<(), Error>; -} - -impl SpanSink for Option { - #[inline] - fn is_enabled(&self) -> bool { - self.as_ref().map(SpanSink::is_enabled).unwrap_or(false) - } - - #[inline] - fn try_send(&mut self, span: Span) -> Result<(), Error> { - self.as_mut().expect("Must be enabled").try_send(span) - } -} - // === impl Id === impl From for Vec { diff --git a/linkerd/trace-context/src/service.rs b/linkerd/trace-context/src/service.rs index 0fba0e21a1..44dfd55a24 100644 --- a/linkerd/trace-context/src/service.rs +++ b/linkerd/trace-context/src/service.rs @@ -1,9 +1,9 @@ -use crate::export::{SpanKind, SpanLabels}; +use crate::export::SpanLabels; use futures::{future::Either, prelude::*}; use http::Uri; use linkerd_stack::layer; use opentelemetry::context::FutureExt; -use opentelemetry::trace::{Span, SpanRef, TraceContextExt, Tracer}; +use opentelemetry::trace::{Span, SpanKind, SpanRef, TraceContextExt, Tracer}; use opentelemetry::KeyValue; use opentelemetry_http::{HeaderExtractor, HeaderInjector}; use opentelemetry_semantic_conventions as semconv; @@ -39,7 +39,7 @@ impl TraceContext { ) -> impl layer::Layer> + Clone { layer::mk(move |inner| TraceContext { inner, - kind, + kind: kind.clone(), labels: labels.clone(), }) } @@ -225,7 +225,7 @@ where let span_name = req.uri().path().to_owned(); let mut span = tracer .span_builder(span_name) - .with_kind(self.kind.into()) + .with_kind(self.kind.clone()) .start_with_context(&tracer, &parent_ctx); self.add_request_labels(&mut span, &req);