Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 2 additions & 34 deletions linkerd/app/core/src/http_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,15 @@
use linkerd_error::Error;
use linkerd_opentelemetry::otel::trace::SpanKind;
use linkerd_stack::layer;
use linkerd_trace_context::{
self as trace_context,
export::{ExportSpan, SpanKind, SpanLabels},
Span, TraceContext,
};
use std::sync::Arc;
use tokio::sync::mpsc;

pub type SpanSink = mpsc::Sender<ExportSpan>;
use linkerd_trace_context::{export::SpanLabels, TraceContext};

pub fn server<S>(
_sink: Option<SpanSink>,
labels: impl Into<SpanLabels>,
) -> impl layer::Layer<S, Service = TraceContext<S>> + Clone {
TraceContext::layer(SpanKind::Server, labels.into())
}

pub fn client<S>(
_sink: Option<SpanSink>,
labels: impl Into<SpanLabels>,
) -> impl layer::Layer<S, Service = TraceContext<S>> + Clone {
TraceContext::layer(SpanKind::Client, labels.into())
}

#[derive(Clone)]
pub struct SpanConverter {
kind: SpanKind,
sink: SpanSink,
labels: SpanLabels,
}

impl trace_context::SpanSink for SpanConverter {
fn is_enabled(&self) -> bool {
true
}

fn try_send(&mut self, span: Span) -> Result<(), Error> {
self.sink.try_send(ExportSpan {
span,
kind: self.kind,
labels: Arc::clone(&self.labels),
})?;
Ok(())
}
}
1 change: 0 additions & 1 deletion linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub struct ProxyRuntime {
pub identity: identity::creds::Receiver,
pub metrics: metrics::Proxy,
pub tap: proxy::tap::Registry,
pub span_sink: Option<http_tracing::SpanSink>,
pub drain: drain::Watch,
}

Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<C> Inbound<C> {
endpoint_labels(unsafe_authority_labels),
),
)
.push_on_service(http_tracing::client(rt.span_sink.clone(), super::trace_labels()))
.push_on_service(http_tracing::client(super::trace_labels()))
.push_on_service(http::BoxResponse::layer())
.arc_new_http();

Expand Down
5 changes: 1 addition & 4 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,7 @@ impl<H> Inbound<H> {
.push_on_service(svc::MapErr::layer_boxed())
.push(rt.metrics.http_errors.to_layer())
.push(ServerRescue::layer())
.push_on_service(http_tracing::server(
rt.span_sink.clone(),
super::trace_labels(),
))
.push_on_service(http_tracing::server(super::trace_labels()))
// Record when an HTTP/1 URI was in absolute form
.push_on_service(http::normalize_uri::MarkAbsoluteForm::layer())
.push_on_service(http::BoxResponse::layer())
Expand Down
6 changes: 1 addition & 5 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ pub use self::{
};
use linkerd_app_core::{
config::{ProxyConfig, QueueConfig},
drain,
http_tracing::SpanSink,
identity,
drain, identity,
metrics::prom,
proxy::tap,
svc,
Expand Down Expand Up @@ -71,7 +69,6 @@ struct Runtime {
metrics: InboundMetrics,
identity: identity::creds::Receiver,
tap: tap::Registry,
span_sink: Option<SpanSink>,
drain: drain::Watch,
}

Expand Down Expand Up @@ -154,7 +151,6 @@ impl Inbound<()> {
metrics: InboundMetrics::new(runtime.metrics, prom),
identity: runtime.identity,
tap: runtime.tap,
span_sink: runtime.span_sink,
drain: runtime.drain,
};
Self {
Expand Down
1 change: 0 additions & 1 deletion linkerd/app/inbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ pub fn runtime() -> (ProxyRuntime, drain::Signal) {
identity: identity::creds::default_for_test().1,
metrics: metrics.proxy,
tap,
span_sink: None,
drain,
};
(runtime, drain_tx)
Expand Down
5 changes: 1 addition & 4 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ impl<T> Outbound<svc::ArcNewHttp<T, http::BoxBody>> {
.http_endpoint
.to_layer::<classify::Response, _, _>(),
)
.push_on_service(http_tracing::client(
rt.span_sink.clone(),
crate::trace_labels(),
))
.push_on_service(http_tracing::client(crate::trace_labels()))
.push(NewRequireIdentity::layer())
.push(http::NewOverrideAuthority::layer(vec![
"host",
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
.push(ServerRescue::layer(config.emit_headers))
.check_new_service::<T, http::Request<_>>()
// Initiates OpenTelemetry tracing.
.push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels()))
.push_on_service(http_tracing::server(trace_labels()))
.push_on_service(http::BoxResponse::layer())
// Convert origin form HTTP/1 URIs to absolute form for Hyper's
// `Client`.
Expand Down
3 changes: 0 additions & 3 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use linkerd_app_core::{
config::{ProxyConfig, QueueConfig},
drain,
exp_backoff::ExponentialBackoff,
http_tracing::SpanSink,
identity, io,
metrics::prom,
profiles,
Expand Down Expand Up @@ -97,7 +96,6 @@ struct Runtime {
metrics: OutboundMetrics,
identity: identity::NewClient,
tap: tap::Registry,
span_sink: Option<SpanSink>,
drain: drain::Watch,
}

Expand Down Expand Up @@ -127,7 +125,6 @@ impl Outbound<()> {
metrics: OutboundMetrics::new(runtime.metrics, prom),
identity: runtime.identity.new_client(),
tap: runtime.tap,
span_sink: runtime.span_sink,
drain: runtime.drain,
};
Self {
Expand Down
1 change: 0 additions & 1 deletion linkerd/app/outbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub(crate) fn runtime() -> (ProxyRuntime, drain::Signal) {
identity: linkerd_meshtls::creds::default_for_test().1,
metrics: metrics.proxy,
tap,
span_sink: None,
drain,
};
(runtime, drain_tx)
Expand Down
1 change: 0 additions & 1 deletion linkerd/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ impl Config {
identity: identity.receiver(),
metrics: metrics.proxy,
tap: tap.registry(),
span_sink: trace_collector.span_sink(),
drain: drain_rx.clone(),
};
let inbound = Inbound::new(
Expand Down
15 changes: 1 addition & 14 deletions linkerd/app/src/trace_collector.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use linkerd_app_core::{
control, dns, http_tracing::SpanSink, identity, metrics::ControlHttp as HttpMetrics,
opentelemetry, svc::NewService,
control, dns, identity, metrics::ControlHttp as HttpMetrics, opentelemetry, svc::NewService,
};
use linkerd_error::Error;
use otel_collector::OtelCollectorAttributes;
use std::{collections::HashMap, future::Future, pin::Pin};

pub mod otel_collector;

const SPAN_BUFFER_CAPACITY: usize = 100;

#[derive(Clone, Debug)]
pub enum Config {
Disabled,
Expand All @@ -32,16 +29,6 @@ pub enum TraceCollector {

pub struct EnabledCollector {
pub addr: control::ControlAddr,
pub span_sink: SpanSink,
}

impl TraceCollector {
pub fn span_sink(&self) -> Option<SpanSink> {
match self {
TraceCollector::Disabled => None,
TraceCollector::Enabled(inner) => Some(inner.span_sink.clone()),
}
}
}

impl Config {
Expand Down
5 changes: 1 addition & 4 deletions linkerd/app/src/trace_collector/otel_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
collections::HashMap,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::mpsc;
use tonic::{body::Body as TonicBody, client::GrpcService};

pub(super) struct OtelCollectorAttributes {
Expand All @@ -26,8 +25,6 @@ where
S::ResponseBody: Body<Data = tonic::codegen::Bytes> + Send + 'static,
<S::ResponseBody as Body>::Error: Into<Error> + Send,
{
let (span_sink, _) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY);

let resource = sdk::Resource::builder()
.with_attribute(KeyValue::new(
semconv::attribute::PROCESS_PID,
Expand Down Expand Up @@ -55,5 +52,5 @@ where
let addr = addr.clone();
opentelemetry::install_opentelemetry_providers(svc, resource, legacy_metrics);

EnabledCollector { addr, span_sink }
EnabledCollector { addr }
}
23 changes: 0 additions & 23 deletions linkerd/trace-context/src/export.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,4 @@
use crate::Span;
use std::collections::HashMap;
use std::sync::Arc;

#[derive(Copy, Clone, Debug, PartialEq)]
pub enum SpanKind {
Server = 1,
Client = 2,
}

impl From<SpanKind> for opentelemetry::trace::SpanKind {
fn from(value: SpanKind) -> Self {
match value {
SpanKind::Server => opentelemetry::trace::SpanKind::Server,
SpanKind::Client => opentelemetry::trace::SpanKind::Client,
}
}
}

pub type SpanLabels = Arc<HashMap<String, String>>;

#[derive(Debug)]
pub struct ExportSpan {
pub span: Span,
pub kind: SpanKind,
pub labels: SpanLabels,
}
19 changes: 0 additions & 19 deletions linkerd/trace-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod service;

pub use self::service::TraceContext;
use bytes::Bytes;
use linkerd_error::Error;
use opentelemetry::{KeyValue, SpanId, TraceId};
use std::fmt;
use std::time::SystemTime;
Expand Down Expand Up @@ -54,24 +53,6 @@ pub struct Span {
pub labels: Vec<KeyValue>,
}

pub trait SpanSink {
fn is_enabled(&self) -> bool;

fn try_send(&mut self, span: Span) -> Result<(), Error>;
}

impl<K: SpanSink> SpanSink for Option<K> {
#[inline]
fn is_enabled(&self) -> bool {
self.as_ref().map(SpanSink::is_enabled).unwrap_or(false)
}

#[inline]
fn try_send(&mut self, span: Span) -> Result<(), Error> {
self.as_mut().expect("Must be enabled").try_send(span)
}
}

// === impl Id ===

impl From<Id> for Vec<u8> {
Expand Down
8 changes: 4 additions & 4 deletions linkerd/trace-context/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::export::{SpanKind, SpanLabels};
use crate::export::SpanLabels;
use futures::{future::Either, prelude::*};
use http::Uri;
use linkerd_stack::layer;
use opentelemetry::context::FutureExt;
use opentelemetry::trace::{Span, SpanRef, TraceContextExt, Tracer};
use opentelemetry::trace::{Span, SpanKind, SpanRef, TraceContextExt, Tracer};
use opentelemetry::KeyValue;
use opentelemetry_http::{HeaderExtractor, HeaderInjector};
use opentelemetry_semantic_conventions as semconv;
Expand Down Expand Up @@ -39,7 +39,7 @@ impl<S> TraceContext<S> {
) -> impl layer::Layer<S, Service = TraceContext<S>> + Clone {
layer::mk(move |inner| TraceContext {
inner,
kind,
kind: kind.clone(),
labels: labels.clone(),
})
}
Expand Down Expand Up @@ -225,7 +225,7 @@ where
let span_name = req.uri().path().to_owned();
let mut span = tracer
.span_builder(span_name)
.with_kind(self.kind.into())
.with_kind(self.kind.clone())
.start_with_context(&tracer, &parent_ctx);
self.add_request_labels(&mut span, &req);

Expand Down
Loading