diff --git a/Cargo.lock b/Cargo.lock index 1d039d217a..02679485a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2081,6 +2081,7 @@ dependencies = [ "opentelemetry", "opentelemetry-proto", "opentelemetry-semantic-conventions", + "opentelemetry-zipkin", "opentelemetry_sdk", "tokio", "tonic", @@ -2602,11 +2603,17 @@ dependencies = [ "futures", "hex", "http", + "linkerd-app-core", "linkerd-error", "linkerd-http-box", + "linkerd-io", + "linkerd-opentelemetry", "linkerd-stack", "linkerd-tracing", + "opentelemetry", + "opentelemetry-http", "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "rand 0.9.2", "thiserror", "tokio", @@ -2946,6 +2953,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", +] + [[package]] name = "opentelemetry-proto" version = "0.31.0" @@ -2969,6 +2988,23 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" +[[package]] +name = "opentelemetry-zipkin" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27fcd074586dab55936b003c6a499acaabd6debbd539c3f36356bca2ef2fce2" +dependencies = [ + "http", + "once_cell", + "opentelemetry", + "opentelemetry-http", + "opentelemetry_sdk", + "serde", + "serde_json", + "thiserror", + "typed-builder", +] + [[package]] name = "opentelemetry_sdk" version = "0.31.0" @@ -4222,6 +4258,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typed-builder" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd9d30e3a08026c78f246b173243cf07b3696d274debd26680773b6773c2afc7" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c36781cc0e46a83726d9879608e4cf6c2505237e263a8eb8c24502989cfdb28" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "typenum" version = "1.19.0" diff --git a/linkerd/opentelemetry/Cargo.toml b/linkerd/opentelemetry/Cargo.toml index b2e0c81cf9..790e97847d 100644 --- a/linkerd/opentelemetry/Cargo.toml +++ b/linkerd/opentelemetry/Cargo.toml @@ -15,6 +15,7 @@ linkerd-trace-context = { path = "../trace-context" } opentelemetry = { version = "0.31", default-features = false, features = ["trace"] } opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"] } opentelemetry-proto = { version = "0.31" } +opentelemetry-zipkin = { version = "0.31", default-features = false } opentelemetry-semantic-conventions = { version = "0.31", default-features = false, features = ["semconv_experimental"] } tonic = { workspace = true, default-features = false, features = [ "codegen", diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 4bcbc71475..2491b06051 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -2,15 +2,17 @@ #![forbid(unsafe_code)] pub mod metrics; +pub mod propagation; use self::metrics::Registry; +use crate::propagation::OrderedPropagator; use futures::stream::{Stream, StreamExt}; use http_body::Body; use linkerd_error::Error; use linkerd_trace_context::{self as trace_context, export::ExportSpan}; pub use opentelemetry as otel; use opentelemetry::{ - trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState}, + trace::{SpanContext, SpanKind, Status, TraceFlags, TraceState}, KeyValue, }; pub use opentelemetry_proto as proto; @@ -56,6 +58,8 @@ where ) .build(); + opentelemetry::global::set_text_map_propagator(OrderedPropagator::new()); + SpanExportTask::new(spans, processor).run().await; } @@ -151,12 +155,12 @@ fn convert_span(span: ExportSpan) -> Result { for (k, v) in labels.iter() { attributes.push(KeyValue::new(k.clone(), v.clone())); } - for (k, v) in span.labels.into_iter() { - attributes.push(KeyValue::new(k, v)); + for kv in span.labels.into_iter() { + attributes.push(kv); } let is_remote = kind != trace_context::export::SpanKind::Client; Ok(SpanData { - parent_span_id: SpanId::from_bytes(span.parent_id.into_bytes()?), + parent_span_id: span.parent_id, parent_span_is_remote: true, // we do not originate any spans locally span_kind: match kind { trace_context::export::SpanKind::Server => SpanKind::Server, @@ -170,8 +174,8 @@ fn convert_span(span: ExportSpan) -> Result { links: SpanLinks::default(), status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this span_context: SpanContext::new( - TraceId::from_bytes(span.trace_id.into_bytes()?), - SpanId::from_bytes(span.span_id.into_bytes()?), + span.trace_id, + span.span_id, TraceFlags::default(), is_remote, TraceState::NONE, @@ -184,24 +188,19 @@ fn convert_span(span: ExportSpan) -> Result { #[cfg(test)] mod tests { use super::*; - use linkerd_trace_context::{export::SpanKind, Id, Span}; + use linkerd_trace_context::{export::SpanKind, Span}; + use opentelemetry::{SpanId, TraceId}; use opentelemetry_proto::tonic::common::v1::InstrumentationScope; - use std::{collections::HashMap, sync::Arc, time::SystemTime}; + use std::{sync::Arc, time::SystemTime}; use tokio::sync::mpsc; - use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt, Bytes}; + use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt}; use tonic_prost::ProstDecoder; #[tokio::test(flavor = "current_thread")] async fn send_span() { - let trace_id = Id::from(Bytes::from( - hex::decode("0123456789abcedffedcba9876543210").expect("decode"), - )); - let span_id = Id::from(Bytes::from( - hex::decode("fedcba9876543210").expect("decode"), - )); - let parent_id = Id::from(Bytes::from( - hex::decode("0123456789abcedf").expect("decode"), - )); + let trace_id = TraceId::from_hex("0123456789abcedffedcba9876543210").expect("trace id"); + let parent_id = SpanId::from_hex("fedcba9876543210").expect("parent id"); + let span_id = SpanId::from_hex("0123456789abcedf").expect("span id"); let span_name = "test".to_string(); let start = SystemTime::now(); @@ -209,13 +208,13 @@ mod tests { let span = ExportSpan { span: Span { - trace_id: trace_id.clone(), - span_id: span_id.clone(), - parent_id: parent_id.clone(), + trace_id, + span_id, + parent_id, span_name: span_name.clone(), start, end, - labels: HashMap::new(), + labels: Vec::new(), }, kind: SpanKind::Server, labels: Arc::new(Default::default()), @@ -261,18 +260,9 @@ mod tests { assert_eq!(scope_span.spans.len(), 1); let span = scope_span.spans.remove(0); - assert_eq!( - span.span_id, - span_id.into_bytes::<8>().expect("into_bytes").to_vec() - ); - assert_eq!( - span.parent_span_id, - parent_id.into_bytes::<8>().expect("into_bytes").to_vec() - ); - assert_eq!( - span.trace_id, - trace_id.into_bytes::<16>().expect("into_bytes").to_vec() - ); + assert_eq!(span.span_id, span_id.to_bytes().to_vec(),); + assert_eq!(span.parent_span_id, parent_id.to_bytes().to_vec(),); + assert_eq!(span.trace_id, trace_id.to_bytes().to_vec(),); assert_eq!(span.name, span_name); assert_eq!( span.start_time_unix_nano, @@ -306,7 +296,7 @@ mod tests { tokio::spawn(export_spans( inner, ReceiverStream::new(span_rx), - opentelemetry_sdk::Resource::builder().build(), + Resource::builder().build(), metrics, )); diff --git a/linkerd/opentelemetry/src/propagation.rs b/linkerd/opentelemetry/src/propagation.rs new file mode 100644 index 0000000000..c55466091e --- /dev/null +++ b/linkerd/opentelemetry/src/propagation.rs @@ -0,0 +1,79 @@ +use opentelemetry::{ + propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator}, + Context, +}; +use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator}; + +#[derive(Copy, Clone, Eq, PartialEq)] +enum PropagationFormat { + W3C, + B3, +} + +#[derive(Debug)] +pub struct OrderedPropagator { + w3c: TraceContextPropagator, + b3: opentelemetry_zipkin::Propagator, + baggage: BaggagePropagator, + fields: Vec, +} + +impl OrderedPropagator { + pub fn new() -> Self { + let w3c = TraceContextPropagator::new(); + let b3 = opentelemetry_zipkin::Propagator::new(); + let baggage = BaggagePropagator::new(); + + Self { + fields: w3c + .fields() + .chain(b3.fields()) + .chain(baggage.fields()) + .map(|s| s.to_string()) + .collect(), + w3c, + b3, + baggage, + } + } +} + +impl Default for OrderedPropagator { + fn default() -> Self { + Self::new() + } +} + +impl TextMapPropagator for OrderedPropagator { + fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) { + match cx.get::() { + None => {} + Some(PropagationFormat::W3C) => { + self.w3c.inject_context(cx, injector); + } + Some(PropagationFormat::B3) => { + self.b3.inject_context(cx, injector); + } + } + self.baggage.inject_context(cx, injector); + } + + fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context { + let cx = if self.w3c.fields().any(|f| extractor.get(f).is_some()) { + self.w3c + .extract_with_context(cx, extractor) + .with_value(PropagationFormat::W3C) + } else if self.b3.fields().any(|f| extractor.get(f).is_some()) { + self.b3 + .extract_with_context(cx, extractor) + .with_value(PropagationFormat::B3) + } else { + cx.clone() + }; + self.baggage.extract_with_context(&cx, extractor) + } + + fn fields(&self) -> FieldIter<'_> { + FieldIter::new(self.fields.as_slice()) + } +} diff --git a/linkerd/trace-context/Cargo.toml b/linkerd/trace-context/Cargo.toml index 0b28e15b9c..27b174e041 100644 --- a/linkerd/trace-context/Cargo.toml +++ b/linkerd/trace-context/Cargo.toml @@ -15,6 +15,9 @@ hex = "0.4" http = { workspace = true } linkerd-error = { path = "../error" } linkerd-stack = { path = "../stack" } +opentelemetry = { version = "0.31", default-features = false, features = ["trace"] } +opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"] } +opentelemetry-http = { version = "0.31", default-features = false } opentelemetry-semantic-conventions = { version = "0.31", default-features = false, features = ["semconv_experimental"] } rand = "0.9" thiserror = "2" @@ -22,7 +25,10 @@ tower = { workspace = true, default-features = false, features = ["util"] } tracing = { workspace = true } [dev-dependencies] +linkerd-app-core = { path = "../app/core" } linkerd-http-box = { path = "../http/box" } +linkerd-io = { path = "../io" } linkerd-tracing = { path = "../tracing" } +linkerd-opentelemetry = { path = "../opentelemetry" } tokio = { version = "1", features = ["test-util"] } tower-test = { workspace = true } diff --git a/linkerd/trace-context/src/export.rs b/linkerd/trace-context/src/export.rs index 262c98ee7f..31e57a0907 100644 --- a/linkerd/trace-context/src/export.rs +++ b/linkerd/trace-context/src/export.rs @@ -8,6 +8,15 @@ pub enum SpanKind { Client = 2, } +impl From for opentelemetry::trace::SpanKind { + fn from(value: SpanKind) -> Self { + match value { + SpanKind::Server => opentelemetry::trace::SpanKind::Server, + SpanKind::Client => opentelemetry::trace::SpanKind::Client, + } + } +} + pub type SpanLabels = Arc>; #[derive(Debug)] diff --git a/linkerd/trace-context/src/lib.rs b/linkerd/trace-context/src/lib.rs index 841344d58b..6d208ca380 100644 --- a/linkerd/trace-context/src/lib.rs +++ b/linkerd/trace-context/src/lib.rs @@ -2,20 +2,16 @@ #![forbid(unsafe_code)] pub mod export; -mod propagation; mod service; pub use self::service::TraceContext; use bytes::Bytes; use linkerd_error::Error; -use rand::Rng; -use std::collections::HashMap; +use opentelemetry::{KeyValue, SpanId, TraceId}; use std::fmt; use std::time::SystemTime; use thiserror::Error; -const SPAN_ID_LEN: usize = 8; - #[derive(Debug, Default, Clone, Eq, PartialEq)] pub struct Id(Vec); @@ -49,13 +45,13 @@ pub struct InsufficientBytes; #[derive(Debug)] pub struct Span { - pub trace_id: Id, - pub span_id: Id, - pub parent_id: Id, + pub trace_id: TraceId, + pub span_id: SpanId, + pub parent_id: SpanId, pub span_name: String, pub start: SystemTime, pub end: SystemTime, - pub labels: HashMap<&'static str, String>, + pub labels: Vec, } pub trait SpanSink { @@ -78,14 +74,6 @@ impl SpanSink for Option { // === impl Id === -impl Id { - fn new_span_id(rng: &mut R) -> Self { - let mut bytes = vec![0; SPAN_ID_LEN]; - rng.fill(bytes.as_mut_slice()); - Self(bytes) - } -} - impl From for Vec { fn from(Id(bytes): Id) -> Vec { bytes diff --git a/linkerd/trace-context/src/propagation.rs b/linkerd/trace-context/src/propagation.rs deleted file mode 100644 index ac79435962..0000000000 --- a/linkerd/trace-context/src/propagation.rs +++ /dev/null @@ -1,94 +0,0 @@ -use crate::{Flags, Id, InsufficientBytes}; -use bytes::Bytes; - -use thiserror::Error; -use tracing::debug; - -mod b3; -mod w3c; - -#[derive(Debug)] -pub enum Propagation { - B3Http, - B3Grpc, - W3CHttp, -} - -#[derive(Debug)] -pub struct TraceContext { - pub propagation: Propagation, - pub trace_id: Id, - pub parent_id: Id, - pub flags: Flags, -} - -#[derive(Debug, Error)] -#[error("unknown field ID {0}")] -struct UnknownFieldId(u8); - -// === impl TraceContext === - -impl TraceContext { - pub fn is_sampled(&self) -> bool { - self.flags.is_sampled() - } -} - -/// Given an http request, attempt to unpack a distributed tracing context from -/// the headers. Only w3c and b3 context propagation formats are supported. The -/// former is tried first, and if no headers are present, function will attempt -/// to unpack as b3. -pub fn unpack_trace_context(request: &http::Request) -> Option { - // Attempt to parse as w3c first since it's the newest interface in - // distributed tracing ecosystem - w3c::unpack_w3c_trace_context(request) - .or_else(|| b3::unpack_grpc_trace_context(request)) - .or_else(|| b3::unpack_http_trace_context(request)) -} - -// Generates a new span id, writes it to the request in the appropriate -// propagation format and returns the generated span id. -pub fn increment_span_id(request: &mut http::Request, context: &TraceContext) -> Id { - match context.propagation { - Propagation::B3Grpc => b3::increment_grpc_span_id(request, context), - Propagation::B3Http => b3::increment_http_span_id(request), - Propagation::W3CHttp => w3c::increment_http_span_id(request, context), - } -} - -// === Header parse utils === - -fn get_header_str<'a, B>( - request: &'a http::Request, - header: &http::header::HeaderName, -) -> Option<&'a str> { - let hv = request.headers().get(header)?; - hv.to_str() - .map_err(|_| debug!(header_value = %header, "Invalid non-ASCII or control character in header value")) - .ok() -} - -// Attempt to decode a hex value to an id, padding the buffer up to the -// specified argument. Used to decode header values from hex to binary. -fn decode_id_with_padding(value: &str, pad_to: usize) -> Result { - hex::decode(value).map(|mut data| { - if data.len() < pad_to { - let padding = pad_to - data.len(); - let mut padded = vec![0u8; padding]; - padded.append(&mut data); - Id(padded) - } else { - Id(data) - } - }) -} - -/// Attempt to split_to the given index. If there are not enough bytes then -/// Err is returned and the given Bytes is not modified. -fn try_split_to(buf: &mut Bytes, n: usize) -> Result { - if buf.len() >= n { - Ok(buf.split_to(n)) - } else { - Err(InsufficientBytes) - } -} diff --git a/linkerd/trace-context/src/propagation/b3.rs b/linkerd/trace-context/src/propagation/b3.rs deleted file mode 100644 index 60cdad41ff..0000000000 --- a/linkerd/trace-context/src/propagation/b3.rs +++ /dev/null @@ -1,163 +0,0 @@ -use crate::propagation::try_split_to; -use crate::{Flags, Id}; -use base64::Engine; -use bytes::Bytes; -use http::header::{HeaderName, HeaderValue}; -use linkerd_error::Error; - -use tracing::{debug, trace}; - -use super::{decode_id_with_padding, get_header_str, Propagation, TraceContext, UnknownFieldId}; - -static HTTP_TRACE_ID_HEADER: HeaderName = HeaderName::from_static("x-b3-traceid"); -static HTTP_SPAN_ID_HEADER: HeaderName = HeaderName::from_static("x-b3-spanid"); -static HTTP_SAMPLED_HEADER: HeaderName = HeaderName::from_static("x-b3-sampled"); - -static GRPC_TRACE_HEADER: HeaderName = HeaderName::from_static("grpc-trace-bin"); -const GRPC_TRACE_FIELD_TRACE_ID: u8 = 0; -const GRPC_TRACE_FIELD_SPAN_ID: u8 = 1; -const GRPC_TRACE_FIELD_TRACE_OPTIONS: u8 = 2; - -// This code looks significantly weirder if some of the elements are added using -// the `vec![]` macro, despite clippy's suggestions otherwise... -#[allow(clippy::vec_init_then_push)] -pub fn increment_grpc_span_id(request: &mut http::Request, context: &TraceContext) -> Id { - let span_id = Id::new_span_id(&mut rand::rng()); - - trace!(%span_id, "Incremented span id"); - - let mut bytes = Vec::::new(); - - // version - bytes.push(0); - - // trace id - bytes.push(GRPC_TRACE_FIELD_TRACE_ID); - bytes.extend(context.trace_id.0.iter()); - - // span id - bytes.push(GRPC_TRACE_FIELD_SPAN_ID); - bytes.extend(span_id.0.iter()); - - // trace options - bytes.push(GRPC_TRACE_FIELD_TRACE_OPTIONS); - bytes.push(context.flags.0); - - let bytes_b64 = base64::engine::general_purpose::STANDARD.encode(&bytes); - - if let Ok(hv) = HeaderValue::from_str(&bytes_b64) { - request.headers_mut().insert(&GRPC_TRACE_HEADER, hv); - } else { - debug!(header = %GRPC_TRACE_HEADER, header_value = %bytes_b64, "Invalid non-ASCII or control character in header value"); - } - span_id -} - -pub fn increment_http_span_id(request: &mut http::Request) -> Id { - let span_id = Id::new_span_id(&mut rand::rng()); - - trace!(%span_id, "Incremented span id"); - - let span_str = hex::encode(span_id.as_ref()); - - if let Ok(hv) = HeaderValue::from_str(&span_str) { - request.headers_mut().insert(&HTTP_SPAN_ID_HEADER, hv); - } else { - debug!(header = %HTTP_SPAN_ID_HEADER, header_value = %span_str, "Invalid non-ASCII or control character in header value"); - } - span_id -} - -pub fn unpack_grpc_trace_context(request: &http::Request) -> Option { - get_header_str(request, &GRPC_TRACE_HEADER) - .and_then(|header_str| { - base64::engine::general_purpose::STANDARD.decode(header_str) - .map_err(|error| debug!(header = %GRPC_TRACE_HEADER, header_value = %header_str, %error, "Failed to unpack trace context due to invalid base64 encoding")) - .ok() - }) - .and_then(|vec| { - let mut bytes = vec.into(); - parse_grpc_trace_context_fields(&mut bytes) - }) -} - -pub fn unpack_http_trace_context(request: &http::Request) -> Option { - let parent_id = parse_header_id(request, &HTTP_SPAN_ID_HEADER, 8)?; - let trace_id = parse_header_id(request, &HTTP_TRACE_ID_HEADER, 16)?; - let flags = match get_header_str(request, &HTTP_SAMPLED_HEADER) { - Some("1") => Flags(1), - _ => Flags(0), - }; - Some(TraceContext { - propagation: Propagation::B3Http, - trace_id, - parent_id, - flags, - }) -} - -fn parse_grpc_trace_context_fields(buf: &mut Bytes) -> Option { - trace!(?buf, "Reading binary trace context"); - - let _version = try_split_to(buf, 1).ok()?; - - let mut context = TraceContext { - propagation: Propagation::B3Grpc, - trace_id: Default::default(), - parent_id: Default::default(), - flags: Default::default(), - }; - - while !buf.is_empty() { - match parse_grpc_trace_context_field(buf, &mut context) { - Ok(()) => {} - Err(ref e) if e.is::() => break, - Err(error) => { - debug!(header = %GRPC_TRACE_HEADER, %error, "Failed to parse trace context header"); - return None; - } - }; - } - Some(context) -} - -fn parse_grpc_trace_context_field( - buf: &mut Bytes, - context: &mut TraceContext, -) -> Result<(), Error> { - let field_id = try_split_to(buf, 1)?[0]; - match field_id { - GRPC_TRACE_FIELD_SPAN_ID => { - let id = try_split_to(buf, 8)?; - trace!("Reading binary trace field {GRPC_TRACE_FIELD_SPAN_ID:?}: {id:?}"); - context.parent_id = id.into(); - } - GRPC_TRACE_FIELD_TRACE_ID => { - let id = try_split_to(buf, 16)?; - trace!("Reading binary trace field {GRPC_TRACE_FIELD_TRACE_ID:?}: {id:?}",); - context.trace_id = id.into(); - } - GRPC_TRACE_FIELD_TRACE_OPTIONS => { - let flags = try_split_to(buf, 1)?; - trace!("Reading binary trace field {GRPC_TRACE_FIELD_TRACE_OPTIONS:?}: {flags:?}",); - context.flags = flags.try_into()?; - } - id => { - return Err(UnknownFieldId(id).into()); - } - }; - Ok(()) -} - -fn parse_header_id( - request: &http::Request, - header: &HeaderName, - pad_to: usize, -) -> Option { - let header_value = get_header_str(request, header)?; - decode_id_with_padding(header_value, pad_to) - .map_err( - |error| debug!(%header, %header_value, %error, "Id in header value contains invalid hex"), - ) - .ok() -} diff --git a/linkerd/trace-context/src/propagation/w3c.rs b/linkerd/trace-context/src/propagation/w3c.rs deleted file mode 100644 index b2b223a0b2..0000000000 --- a/linkerd/trace-context/src/propagation/w3c.rs +++ /dev/null @@ -1,146 +0,0 @@ -use http::header::HeaderName; -use tracing::{debug, trace}; - -use super::{decode_id_with_padding, get_header_str, Propagation, TraceContext}; -use crate::{Flags, Id}; - -static HTTP_TRACEPARENT: HeaderName = HeaderName::from_static("traceparent"); -const VERSION_00: &str = "00"; - -pub fn unpack_w3c_trace_context(request: &http::Request) -> Option { - get_header_str(request, &HTTP_TRACEPARENT).and_then(parse_context) -} - -/// Given an http request and a w3c trace context, create a new Span ID and -/// assign it to the tracecontext header value, in order to propagate -/// the trace context downstream. -pub fn increment_http_span_id(request: &mut http::Request, context: &TraceContext) -> Id { - let span_id = Id::new_span_id(&mut rand::rng()); - - trace!(%span_id, "Incremented span id"); - - let new_header = { - let mut buf = String::with_capacity(60); - buf.push_str(VERSION_00); - buf.push('-'); - buf.push_str(&hex::encode(context.trace_id.as_ref())); - buf.push('-'); - buf.push_str(&hex::encode(span_id.as_ref())); - buf.push('-'); - buf.push_str(&hex::encode(vec![context.flags.0])); - buf - }; - - if let Ok(hv) = http::HeaderValue::from_str(&new_header) { - request.headers_mut().insert(&HTTP_TRACEPARENT, hv); - } else { - debug!(header = %HTTP_TRACEPARENT, header_value = %new_header, "Invalid non-ASCII or control character in header value"); - } - - span_id -} - -/// Parse a given header value as a w3c TraceContext value. -fn parse_context(header_value: &str) -> Option { - let rest = match header_value.split_once('-') { - Some((version, rest)) => { - if version != VERSION_00 { - debug!(header = %HTTP_TRACEPARENT, %header_value, %version, "Tracecontext header value contains invalid version",); - return None; - } - rest - } - None => { - debug!(header = %HTTP_TRACEPARENT, %header_value, "Tracecontext header value does not contain version"); - return None; - } - }; - - let (trace_id, rest) = parse_header_value(rest, 16)?; - let (parent_id, rest) = parse_header_value(rest, 8)?; - - let flags = match hex::decode(rest) { - // If valid hex, take final bit and AND with 1. W3C only uses one bit - // for flags in version 00, and the bit is used to control sampling - Ok(decoded) => Flags(decoded[0]), - // If invalid hex, invalidate trace - Err(error) => { - debug!(header = %HTTP_TRACEPARENT, flags = %rest, %error, "Failed to hex decode tracecontext flags"); - return None; - } - }; - - Some(TraceContext { - propagation: Propagation::W3CHttp, - trace_id, - parent_id, - flags, - }) -} - -// Parse header value as Id and return the rest after '-' separator. When an id -// is all 0 value it is considered invalid according to the spec. -// -fn parse_header_value(next_header_value: &str, pad_to: usize) -> Option<(Id, &str)> { - let next_parse_result = next_header_value - .split_once('-') - .filter(|(id, _)| !id.chars().all(|c| c == '0')); - if next_parse_result.is_none() { - debug!(header = %HTTP_TRACEPARENT, "Id in header value contains invalid all zeros value"); - } - next_parse_result - .and_then(|(id, rest)| decode_id_with_padding(id, pad_to) - .map_err(|error| debug!(header = %HTTP_TRACEPARENT, %error, %id, "Id in header value contains invalid hex")) - .ok() - .zip(Some(rest))) -} - -#[cfg(test)] -mod tests { - use super::parse_context; - - #[test] - fn w3c_context_parsed_successfully() { - let input = "00-94d7f6ec6b95f3e916179cb6cfd01390-55ccfce77f972614-01"; - let actual = parse_context(input); - - let expected_trace = hex::decode("94d7f6ec6b95f3e916179cb6cfd01390") - .expect("Failed to decode trace parent from hex"); - let expected_parent = - hex::decode("55ccfce77f972614").expect("Failed to decode span parent from hex"); - let expected_flags = 1; - - assert!(actual.is_some()); - let actual = actual.unwrap(); - assert_eq!(expected_trace, actual.trace_id.0); - assert_eq!(expected_parent, actual.parent_id.0); - assert_eq!(expected_flags, actual.flags.0); - } - - #[test] - fn w3c_context_invalid_flags() { - let input = "00-94d7f6ec6b95f3e916179cb6cfd01390-55ccfce77f972614-011"; - let actual = parse_context(input); - assert!(actual.is_none()); - - let input = "00-94d7f6ec6b95f3e916179cb6cfd01390-55ccfce77f972614"; - let actual = parse_context(input); - assert!(actual.is_none()); - } - - #[test] - fn w3c_context_invalid_version() { - let input = "22-94d7f6ec6b95f3e916179cb6cfd01390-55ccfce77f972614-01"; - assert!(parse_context(input).is_none()); - - let input = "94d7f6ec6b95f3e916179cb6cfd01390-55ccfce77f972614-01"; - assert!(parse_context(input).is_none()); - } - - #[test] - fn w3c_context_invalid_hex() { - // length of id 94d(...) is odd, results in invalid hex. - let input = "00-94d7f6ec6b95f3e916179cb6cfd013901-55ccfce77972614-01"; - assert!(parse_context(input).is_none()); - } -} diff --git a/linkerd/trace-context/src/service.rs b/linkerd/trace-context/src/service.rs index a272d5ae0c..950e8f9a7f 100644 --- a/linkerd/trace-context/src/service.rs +++ b/linkerd/trace-context/src/service.rs @@ -1,16 +1,19 @@ -use crate::{propagation, Span, SpanSink}; +use crate::{Span, SpanSink}; use futures::{future::Either, prelude::*}; use http::Uri; use linkerd_stack::layer; +use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider}; +use opentelemetry::KeyValue; +use opentelemetry_http::{HeaderExtractor, HeaderInjector}; +use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider}; use opentelemetry_semantic_conventions as semconv; use std::{ - collections::HashMap, fmt::{Display, Formatter}, pin::Pin, task::{Context, Poll}, time::SystemTime, }; -use tracing::{debug, info, trace}; +use tracing::{info, trace}; /// A layer that adds distributed tracing instrumentation. /// @@ -25,6 +28,7 @@ use tracing::{debug, info, trace}; pub struct TraceContext { inner: S, sink: K, + tracer: SdkTracer, } // === impl TraceContext === @@ -34,6 +38,7 @@ impl TraceContext { layer::mk(move |inner| TraceContext { inner, sink: sink.clone(), + tracer: SdkTracerProvider::builder().build().tracer(""), }) } @@ -42,26 +47,35 @@ impl TraceContext { /// The OpenTelemetry spec defines the semantic conventions that HTTP /// services should use for the labels included in traces: /// https://opentelemetry.io/docs/specs/semconv/http/http-spans/ - fn request_labels(req: &http::Request) -> HashMap<&'static str, String> { - let mut labels = HashMap::with_capacity(13); - labels.insert( + fn request_labels(&self, req: &http::Request) -> Vec { + let mut attributes = Vec::with_capacity(13); + attributes.push(KeyValue::new( semconv::trace::HTTP_REQUEST_METHOD, - format!("{}", req.method()), - ); + req.method().to_string(), + )); let url = req.uri(); if let Some(scheme) = url.scheme_str() { - labels.insert(semconv::trace::URL_SCHEME, scheme.to_string()); + attributes.push(KeyValue::new( + semconv::trace::URL_SCHEME, + scheme.to_string(), + )); } - labels.insert(semconv::trace::URL_PATH, url.path().to_string()); + attributes.push(KeyValue::new( + semconv::trace::URL_PATH, + url.path().to_string(), + )); if let Some(query) = url.query() { - labels.insert(semconv::trace::URL_QUERY, query.to_string()); + attributes.push(KeyValue::new(semconv::trace::URL_QUERY, query.to_string())); } - labels.insert(semconv::trace::URL_FULL, UrlLabel(url).to_string()); + attributes.push(KeyValue::new( + semconv::trace::URL_FULL, + UrlLabel(url).to_string(), + )); // linkerd currently only proxies tcp-based connections - labels.insert(semconv::trace::NETWORK_TRANSPORT, "tcp".to_string()); + attributes.push(KeyValue::new(semconv::trace::NETWORK_TRANSPORT, "tcp")); // This is the order of precedence for host headers, // see https://opentelemetry.io/docs/specs/semconv/http/http-spans/ @@ -75,18 +89,22 @@ impl TraceContext { if let Ok(host) = host.to_str() { if let Ok(uri) = host.parse::() { if let Some(host) = uri.host() { - labels.insert(semconv::trace::SERVER_ADDRESS, host.to_string()); + attributes.push(KeyValue::new( + semconv::trace::SERVER_ADDRESS, + host.to_string(), + )); } if let Some(port) = uri.port() { - labels.insert(semconv::trace::SERVER_PORT, port.to_string()); + attributes + .push(KeyValue::new(semconv::trace::SERVER_PORT, port.to_string())); } } } } - Self::populate_header_values(&mut labels, req); + Self::populate_header_values(&mut attributes, req); - labels + attributes } /// Populates labels for common header values from the request. @@ -95,10 +113,7 @@ impl TraceContext { /// `http.request.header.
` values, but we shouldn't unconditionally populate all headers /// as they often contain authorization tokens/secrets/etc. Instead, we populate a subset of /// common headers into their respective labels. - fn populate_header_values( - labels: &mut HashMap<&'static str, String>, - req: &http::Request, - ) { + fn populate_header_values(labels: &mut Vec, req: &http::Request) { static HEADER_LABELS: &[(&str, &str)] = &[ ("user-agent", semconv::trace::USER_AGENT_ORIGINAL), // http.request.body.size is available as a semantic convention, but is not stable. @@ -117,19 +132,19 @@ impl TraceContext { ]; for &(header, label) in HEADER_LABELS { if let Some(value) = req.headers().get(header) { - labels.insert(label, String::from_utf8_lossy(value.as_bytes()).to_string()); + labels.push(KeyValue::new( + label, + String::from_utf8_lossy(value.as_bytes()).to_string(), + )); } } } - fn add_response_labels( - mut labels: HashMap<&'static str, String>, - rsp: &http::Response, - ) -> HashMap<&'static str, String> { - labels.insert( + fn add_response_labels(mut labels: Vec, rsp: &http::Response) -> Vec { + labels.push(KeyValue::new( semconv::trace::HTTP_RESPONSE_STATUS_CODE, rsp.status().as_str().to_string(), - ); + )); labels } } @@ -187,38 +202,53 @@ where } fn call(&mut self, mut req: http::Request) -> Self::Future { - if self.sink.is_enabled() { - if let Some(context) = propagation::unpack_trace_context(&req) { - // Update the trace ID if the request set one and the proxy is configured to emit - // spans. - let span_id = propagation::increment_span_id(&mut req, &context); - debug!(?span_id, sampled = context.is_sampled()); - - if context.is_sampled() { - // If the request has been marked for sampling, record its metadata. - let start = SystemTime::now(); - let req_labels = Self::request_labels(&req); - let mut sink = self.sink.clone(); - let span_name = req.uri().path().to_owned(); - return Either::Right(Box::pin(self.inner.call(req).map_ok(move |rsp| { - // Emit the completed span with the response metadata. - let span = Span { - span_id, - trace_id: context.trace_id, - parent_id: context.parent_id, - span_name, - start, - end: SystemTime::now(), - labels: Self::add_response_labels(req_labels, &rsp), - }; - trace!(?span); - if let Err(error) = sink.try_send(span) { - info!(%error, "Span dropped"); - } - rsp - }))); - } + 'outer: { + if !self.sink.is_enabled() { + trace!("Sink not enabled, skipping"); + break 'outer; } + let parent_ctx = opentelemetry::global::get_text_map_propagator(|prop| { + prop.extract(&HeaderExtractor(req.headers())) + }); + if !parent_ctx.span().span_context().is_valid() + || !parent_ctx.span().span_context().is_sampled() + { + trace!(?parent_ctx, "Span not valid, skipping"); + break 'outer; + } + + let span = self + .tracer + .span_builder("".to_string()) + .start_with_context(&self.tracer, &parent_ctx); + let ctx = parent_ctx.with_span(span); + + opentelemetry::global::get_text_map_propagator(|prop| { + prop.inject_context(&ctx, &mut HeaderInjector(req.headers_mut())); + }); + + // If the request has been marked for sampling, record its metadata. + let start = SystemTime::now(); + let req_labels = self.request_labels(&req); + let mut sink = self.sink.clone(); + let span_name = req.uri().path().to_owned(); + return Either::Right(Box::pin(self.inner.call(req).map_ok(move |rsp| { + // Emit the completed span with the response metadata. + let span = Span { + span_id: ctx.span().span_context().span_id(), + trace_id: ctx.span().span_context().trace_id(), + parent_id: parent_ctx.span().span_context().span_id(), + span_name, + start, + end: SystemTime::now(), + labels: Self::add_response_labels(req_labels, &rsp), + }; + trace!(?span); + if let Err(error) = sink.try_send(span) { + info!(%error, "Span dropped"); + } + rsp + }))); } // If there's no tracing to be done, just pass on the request to the inner service. @@ -229,11 +259,10 @@ where #[cfg(test)] mod tests { use super::*; - use crate::Id; - use bytes::Bytes; use http::HeaderMap; use linkerd_error::Error; use linkerd_http_box::BoxBody; + use opentelemetry::{SpanId, TraceId}; use std::collections::BTreeMap; use tokio::sync::mpsc; use tower::{Layer, Service, ServiceExt}; @@ -246,6 +275,9 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn w3c_propagation() { let _trace = linkerd_tracing::test::trace_init(); + opentelemetry::global::set_text_map_propagator( + linkerd_opentelemetry::propagation::OrderedPropagator::new(), + ); let (req_headers, exported_span) = send_mock_request( http::Request::builder() @@ -263,29 +295,40 @@ mod tests { assert!(req_headers.get(B3_SPAN_ID_HEADER).is_none()); assert!(req_headers.get(B3_SAMPLED_HEADER).is_none()); + let sent_cx = opentelemetry::global::get_text_map_propagator(|prop| { + prop.extract(&HeaderExtractor(&req_headers)) + }); + let sent_span = sent_cx.span(); + let sent_span_cx = sent_span.span_context(); + assert!(sent_span_cx.is_sampled()); + assert!(sent_span_cx.is_valid()); + + assert_eq!( + sent_span_cx.trace_id(), + TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").expect("trace id") + ); + assert_ne!( + sent_span_cx.span_id(), + SpanId::from_hex("00f067aa0ba902b7").expect("span id") + ); + assert_eq!( exported_span.trace_id, - Id::from(Bytes::from( - hex::decode("4bf92f3577b34da6a3ce929d0e0e4736").expect("decode") - )), + TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").expect("trace id") ); assert_eq!( exported_span.parent_id, - Id::from(Bytes::from( - hex::decode("00f067aa0ba902b7").expect("decode") - )), - ); - assert_ne!( - exported_span.span_id, - Id::from(Bytes::from( - hex::decode("00f067aa0ba902b7").expect("decode") - )), + SpanId::from_hex("00f067aa0ba902b7").expect("span id") ); + assert_eq!(exported_span.span_id, sent_span_cx.span_id()); } #[tokio::test(flavor = "current_thread")] async fn b3_propagation() { let _trace = linkerd_tracing::test::trace_init(); + opentelemetry::global::set_text_map_propagator( + linkerd_opentelemetry::propagation::OrderedPropagator::new(), + ); let (req_headers, exported_span) = send_mock_request( http::Request::builder() @@ -302,29 +345,40 @@ mod tests { assert!(req_headers.get(B3_SPAN_ID_HEADER).is_some()); assert!(req_headers.get(B3_SAMPLED_HEADER).is_some()); + let sent_cx = opentelemetry::global::get_text_map_propagator(|prop| { + prop.extract(&HeaderExtractor(&req_headers)) + }); + let sent_span = sent_cx.span(); + let sent_span_cx = sent_span.span_context(); + assert!(sent_span_cx.is_sampled()); + assert!(sent_span_cx.is_valid()); + + assert_eq!( + sent_span_cx.trace_id(), + TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").expect("trace id") + ); + assert_ne!( + sent_span_cx.span_id(), + SpanId::from_hex("00f067aa0ba902b7").expect("span id") + ); + assert_eq!( exported_span.trace_id, - Id::from(Bytes::from( - hex::decode("4bf92f3577b34da6a3ce929d0e0e4736").expect("decode") - )), + TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").expect("trace id") ); assert_eq!( exported_span.parent_id, - Id::from(Bytes::from( - hex::decode("00f067aa0ba902b7").expect("decode") - )), - ); - assert_ne!( - exported_span.span_id, - Id::from(Bytes::from( - hex::decode("00f067aa0ba902b7").expect("decode") - )), + SpanId::from_hex("00f067aa0ba902b7").expect("span id") ); + assert_eq!(exported_span.span_id, sent_span_cx.span_id()); } #[tokio::test(flavor = "current_thread")] async fn trace_labels() { let _trace = linkerd_tracing::test::trace_init(); + opentelemetry::global::set_text_map_propagator( + linkerd_opentelemetry::propagation::OrderedPropagator::new(), + ); let (_, exported_span) = send_mock_request( http::Request::builder() @@ -342,21 +396,37 @@ mod tests { ) .await; - let labels = exported_span.labels.into_iter().collect::>(); + let labels = exported_span + .labels + .into_iter() + .map(|kv| (kv.key.to_string(), kv.value.to_string())) + .collect::>(); assert_eq!( labels, BTreeMap::from_iter([ - ("http.request.header.content-length", "0".to_string()), - ("http.request.header.content-type", "text/plain".to_string()), - ("http.request.header.l5d-orig-proto", "HTTP/1.1".to_string()), - ("http.request.method", "GET".to_string()), - ("http.response.status_code", "200".to_string()), - ("network.transport", "tcp".to_string()), - ("url.full", "http://example.com:80/foo?bar=baz".to_string()), - ("url.path", "/foo".to_string()), - ("url.query", "bar=baz".to_string()), - ("url.scheme", "http".to_string()), - ("user_agent.original", "tokio-test".to_string()) + ( + "http.request.header.content-length".to_string(), + "0".to_string() + ), + ( + "http.request.header.content-type".to_string(), + "text/plain".to_string() + ), + ( + "http.request.header.l5d-orig-proto".to_string(), + "HTTP/1.1".to_string() + ), + ("http.request.method".to_string(), "GET".to_string()), + ("http.response.status_code".to_string(), "200".to_string()), + ("network.transport".to_string(), "tcp".to_string()), + ( + "url.full".to_string(), + "http://example.com:80/foo?bar=baz".to_string() + ), + ("url.path".to_string(), "/foo".to_string()), + ("url.query".to_string(), "bar=baz".to_string()), + ("url.scheme".to_string(), "http".to_string()), + ("user_agent.original".to_string(), "tokio-test".to_string()) ]) ) }