From 3f9a2f9a263f4713436e60f5a78c40207847c089 Mon Sep 17 00:00:00 2001 From: Scott Fleener Date: Fri, 14 Nov 2025 10:37:37 -0500 Subject: [PATCH] chore(tracing): Use upstream OpenTelemetry for all tracing This removes all remaining bespoke tracing infrastructure. The main change here is registering a global tracer provider that the trace propagation service can use, replacing the manually wired span data channel. This leaves in some remnants of the old infrastructure, specifically around the span sink types. These types are fairly viral, so a future PR will clean these up. Signed-off-by: Scott Fleener --- Cargo.lock | 2 + linkerd/app/core/src/http_tracing.rs | 20 +- linkerd/app/src/lib.rs | 5 - linkerd/app/src/trace_collector.rs | 1 - .../app/src/trace_collector/otel_collector.rs | 16 +- linkerd/opentelemetry/src/lib.rs | 223 +++++++----------- linkerd/trace-context/Cargo.toml | 1 + linkerd/trace-context/src/service.rs | 174 +++++++------- 8 files changed, 177 insertions(+), 265 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 02679485a4..e7480fc1b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3018,6 +3018,8 @@ dependencies = [ "percent-encoding", "rand 0.9.2", "thiserror", + "tokio", + "tokio-stream", ] [[package]] diff --git a/linkerd/app/core/src/http_tracing.rs b/linkerd/app/core/src/http_tracing.rs index 960899dc64..984bf0b0ec 100644 --- a/linkerd/app/core/src/http_tracing.rs +++ b/linkerd/app/core/src/http_tracing.rs @@ -11,25 +11,17 @@ use tokio::sync::mpsc; pub type SpanSink = mpsc::Sender; pub fn server( - sink: Option, + _sink: Option, labels: impl Into, -) -> impl layer::Layer, S>> + Clone { - TraceContext::layer(sink.map(move |sink| SpanConverter { - kind: SpanKind::Server, - sink, - labels: labels.into(), - })) +) -> impl layer::Layer> + Clone { + TraceContext::layer(SpanKind::Server, labels.into()) } pub fn client( - sink: Option, + _sink: Option, labels: impl Into, -) -> impl layer::Layer, S>> + Clone { - TraceContext::layer(sink.map(move |sink| SpanConverter { - kind: SpanKind::Client, - sink, - labels: labels.into(), - })) +) -> impl layer::Layer> + Clone { + TraceContext::layer(SpanKind::Client, labels.into()) } #[derive(Clone)] diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index cf607e95c1..7908b57034 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -399,7 +399,6 @@ impl App { admin, drain, identity, - trace_collector: collector, start_proxy, tap, .. @@ -464,10 +463,6 @@ impl App { tokio::spawn(serve.instrument(info_span!("tap").or_current())); } - if let trace_collector::TraceCollector::Enabled(collector) = collector { - tokio::spawn(collector.task.instrument(info_span!("tracing"))); - } - // we don't care if the admin shutdown channel is // dropped or actually triggered. let _ = admin_shutdown_rx.await; diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs index 96e1aaf19a..1fd07baa51 100644 --- a/linkerd/app/src/trace_collector.rs +++ b/linkerd/app/src/trace_collector.rs @@ -33,7 +33,6 @@ pub enum TraceCollector { pub struct EnabledCollector { pub addr: control::ControlAddr, pub span_sink: SpanSink, - pub task: Task, } impl TraceCollector { diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs index 5c5a147a41..824fc05c70 100644 --- a/linkerd/app/src/trace_collector/otel_collector.rs +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -6,9 +6,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; use tonic::{body::Body as TonicBody, client::GrpcService}; -use tracing::Instrument; pub(super) struct OtelCollectorAttributes { pub hostname: Option, @@ -28,8 +26,7 @@ where S::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { - let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); - let spans_rx = ReceiverStream::new(spans_rx); + let (span_sink, _) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); let resource = sdk::Resource::builder() .with_attribute(KeyValue::new( @@ -56,14 +53,7 @@ where .build(); let addr = addr.clone(); - let task = Box::pin( - opentelemetry::export_spans(svc, spans_rx, resource, legacy_metrics) - .instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()), - ); + opentelemetry::install_opentelemetry_providers(svc, resource, legacy_metrics); - EnabledCollector { - addr, - task, - span_sink, - } + EnabledCollector { addr, span_sink } } diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 2491b06051..2123068f01 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -6,15 +6,9 @@ pub mod propagation; use self::metrics::Registry; use crate::propagation::OrderedPropagator; -use futures::stream::{Stream, StreamExt}; use http_body::Body; use linkerd_error::Error; -use linkerd_trace_context::{self as trace_context, export::ExportSpan}; pub use opentelemetry as otel; -use opentelemetry::{ - trace::{SpanContext, SpanKind, Status, TraceFlags, TraceState}, - KeyValue, -}; pub use opentelemetry_proto as proto; use opentelemetry_proto::{ tonic::collector::trace::v1::{ @@ -25,42 +19,44 @@ use opentelemetry_proto::{ pub use opentelemetry_sdk as sdk; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::trace::{ - BatchConfigBuilder, BatchSpanProcessor, SpanData, SpanLinks, SpanProcessor, + BatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider, SpanData, }; use opentelemetry_sdk::Resource; pub use opentelemetry_semantic_conventions as semconv; use std::fmt::{Debug, Formatter}; use std::time::Duration; use tonic::{body::Body as TonicBody, client::GrpcService}; -use tracing::{debug, info}; +use tracing::debug; -pub async fn export_spans(client: T, spans: S, resource: Resource, metrics: Registry) +pub fn install_opentelemetry_providers(client: T, resource: Resource, metrics: Registry) where T: GrpcService + Clone + Send + Sync + 'static, T::Error: Into, T::Future: Send, T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, { debug!("Span exporter running"); - let processor = BatchSpanProcessor::builder(SpanExporter { - client: TraceServiceClient::new(client), - resource, - metrics, - }) - .with_batch_config( - BatchConfigBuilder::default() - .with_max_queue_size(1000) - .with_scheduled_delay(Duration::from_secs(5)) + let provider = SdkTracerProvider::builder() + .with_span_processor( + BatchSpanProcessor::builder(SpanExporter { + client: TraceServiceClient::new(client), + resource, + metrics, + }) + .with_batch_config( + BatchConfigBuilder::default() + .with_max_queue_size(1000) + .with_scheduled_delay(Duration::from_secs(5)) + .build(), + ) .build(), - ) - .build(); + ) + .build(); + opentelemetry::global::set_tracer_provider(provider); opentelemetry::global::set_text_map_propagator(OrderedPropagator::new()); - - SpanExportTask::new(spans, processor).run().await; } /// SpanExporter sends a Stream of spans to the given TraceService gRPC service. @@ -120,113 +116,62 @@ where } } -struct SpanExportTask { - spans: S, - processor: BatchSpanProcessor, -} - -impl SpanExportTask -where - S: Stream + Unpin, -{ - fn new(spans: S, processor: BatchSpanProcessor) -> Self { - Self { spans, processor } - } - - async fn run(mut self) { - while let Some(span) = self.spans.next().await { - let s = match convert_span(span) { - Ok(s) => s, - Err(error) => { - info!(%error, "Span dropped"); - continue; - } - }; - - self.processor.on_end(s); - } - } -} - -fn convert_span(span: ExportSpan) -> Result { - let ExportSpan { span, kind, labels } = span; - - let mut attributes = Vec::::new(); - for (k, v) in labels.iter() { - attributes.push(KeyValue::new(k.clone(), v.clone())); - } - for kv in span.labels.into_iter() { - attributes.push(kv); - } - let is_remote = kind != trace_context::export::SpanKind::Client; - Ok(SpanData { - parent_span_id: span.parent_id, - parent_span_is_remote: true, // we do not originate any spans locally - span_kind: match kind { - trace_context::export::SpanKind::Server => SpanKind::Server, - trace_context::export::SpanKind::Client => SpanKind::Client, - }, - name: span.span_name.into(), - start_time: span.start, - end_time: span.end, - attributes, - dropped_attributes_count: 0, - links: SpanLinks::default(), - status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this - span_context: SpanContext::new( - span.trace_id, - span.span_id, - TraceFlags::default(), - is_remote, - TraceState::NONE, - ), - events: Default::default(), - instrumentation_scope: Default::default(), - }) -} - #[cfg(test)] mod tests { use super::*; - use linkerd_trace_context::{export::SpanKind, Span}; - use opentelemetry::{SpanId, TraceId}; - use opentelemetry_proto::tonic::common::v1::InstrumentationScope; - use std::{sync::Arc, time::SystemTime}; - use tokio::sync::mpsc; - use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt}; + use opentelemetry::trace::{SpanContext, TraceContextExt, TraceState, Tracer}; + use opentelemetry::{Context, SpanId, TraceFlags, TraceId}; + use opentelemetry_proto::tonic::common::v1::any_value::Value; + use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope}; + use std::collections::HashMap; + use tonic::codegen::tokio_stream::StreamExt; use tonic_prost::ProstDecoder; #[tokio::test(flavor = "current_thread")] async fn send_span() { + let _trace = linkerd_tracing::test::trace_init(); + let trace_id = TraceId::from_hex("0123456789abcedffedcba9876543210").expect("trace id"); let parent_id = SpanId::from_hex("fedcba9876543210").expect("parent id"); let span_id = SpanId::from_hex("0123456789abcedf").expect("span id"); - let span_name = "test".to_string(); - - let start = SystemTime::now(); - let end = SystemTime::now(); - - let span = ExportSpan { - span: Span { - trace_id, - span_id, - parent_id, - span_name: span_name.clone(), - start, - end, - labels: Vec::new(), - }, - kind: SpanKind::Server, - labels: Arc::new(Default::default()), - }; + let span_name = "test-span".to_string(); - let mut req = send_mock_request(span).await; + let mut req = send_mock_request(trace_id, parent_id, span_id, span_name.clone()).await; assert_eq!(req.resource_spans.len(), 1); let mut resource_span = req.resource_spans.remove(0); let resource_span_resource = resource_span.resource.expect("must have resource"); assert_eq!(resource_span_resource.dropped_attributes_count, 0); assert_eq!(resource_span_resource.entity_refs, vec![]); + let expected: HashMap<&str, Option<&str>> = HashMap::from_iter([ + ("telemetry.sdk.name", Some("opentelemetry")), + ("telemetry.sdk.version", None), + ("telemetry.sdk.language", Some("rust")), + ("service.name", Some("unknown_service")), + ]); + + assert_eq!(expected.len(), resource_span_resource.attributes.len()); + for (expected_key, expected_value) in expected { + let Some(actual) = resource_span_resource + .attributes + .iter() + .find(|kv| kv.key == expected_key) + else { + panic!("{expected_key} not found in resource attributes"); + }; + + let Some(AnyValue { + value: Some(Value::StringValue(actual_value)), + }) = &actual.value + else { + panic!("Unexpected value type: {:?}", actual.value); + }; + + if let Some(expected_value) = expected_value { + assert_eq!(actual_value, expected_value); + } + } + resource_span_resource .attributes .iter() @@ -251,55 +196,53 @@ mod tests { assert_eq!( scope_span.scope, Some(InstrumentationScope { - name: "".to_string(), + name: "test".to_string(), version: "".to_string(), attributes: vec![], dropped_attributes_count: 0, }) ); - assert_eq!(scope_span.spans.len(), 1); + assert_eq!(scope_span.spans.len(), 1); let span = scope_span.spans.remove(0); + assert_eq!(span.span_id, span_id.to_bytes().to_vec(),); assert_eq!(span.parent_span_id, parent_id.to_bytes().to_vec(),); assert_eq!(span.trace_id, trace_id.to_bytes().to_vec(),); assert_eq!(span.name, span_name); - assert_eq!( - span.start_time_unix_nano, - start - .duration_since(SystemTime::UNIX_EPOCH) - .expect("duration") - .as_nanos() as u64 - ); - assert_eq!( - span.end_time_unix_nano, - end.duration_since(SystemTime::UNIX_EPOCH) - .expect("duration") - .as_nanos() as u64 - ); - assert_eq!(span.flags, 0b11_0000_0000); + assert_eq!(span.flags, 0b11_0000_0001); } - async fn send_mock_request(span: ExportSpan) -> ExportTraceServiceRequest { - let _trace = linkerd_tracing::test::trace_init(); - let (span_tx, span_rx) = mpsc::channel(1); - + async fn send_mock_request( + trace_id: TraceId, + parent_id: SpanId, + span_id: SpanId, + span_name: String, + ) -> ExportTraceServiceRequest { let (inner, mut handle) = tower_test::mock::pair::< http::Request, http::Response, >(); handle.allow(1); - span_tx.try_send(span).expect("Must have space"); - let (metrics, _) = metrics::new(); - tokio::spawn(export_spans( - inner, - ReceiverStream::new(span_rx), - Resource::builder().build(), - metrics, + install_opentelemetry_providers(inner, Resource::builder().build(), metrics); + + let parent_cx = Context::new().with_remote_span_context(SpanContext::new( + trace_id, + parent_id, + TraceFlags::SAMPLED, + true, + TraceState::NONE, )); + let tracer = opentelemetry::global::tracer("test"); + let span = tracer + .span_builder(span_name) + .with_span_id(span_id) + .start_with_context(&tracer, &parent_cx); + drop(span); + let (req, _tx) = handle.next_request().await.expect("next request"); let req = tonic::Request::from_http(req); let mut req = tonic::codec::Streaming::::new_request( diff --git a/linkerd/trace-context/Cargo.toml b/linkerd/trace-context/Cargo.toml index 27b174e041..781a52a9dc 100644 --- a/linkerd/trace-context/Cargo.toml +++ b/linkerd/trace-context/Cargo.toml @@ -30,5 +30,6 @@ linkerd-http-box = { path = "../http/box" } linkerd-io = { path = "../io" } linkerd-tracing = { path = "../tracing" } linkerd-opentelemetry = { path = "../opentelemetry" } +opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace", "testing"] } tokio = { version = "1", features = ["test-util"] } tower-test = { workspace = true } diff --git a/linkerd/trace-context/src/service.rs b/linkerd/trace-context/src/service.rs index 950e8f9a7f..0fba0e21a1 100644 --- a/linkerd/trace-context/src/service.rs +++ b/linkerd/trace-context/src/service.rs @@ -1,19 +1,18 @@ -use crate::{Span, SpanSink}; +use crate::export::{SpanKind, SpanLabels}; use futures::{future::Either, prelude::*}; use http::Uri; use linkerd_stack::layer; -use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider}; +use opentelemetry::context::FutureExt; +use opentelemetry::trace::{Span, SpanRef, TraceContextExt, Tracer}; use opentelemetry::KeyValue; use opentelemetry_http::{HeaderExtractor, HeaderInjector}; -use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider}; use opentelemetry_semantic_conventions as semconv; use std::{ fmt::{Display, Formatter}, pin::Pin, task::{Context, Poll}, - time::SystemTime, }; -use tracing::{info, trace}; +use tracing::trace; /// A layer that adds distributed tracing instrumentation. /// @@ -25,20 +24,23 @@ use tracing::{info, trace}; /// about the span to the given SpanSink when the span is complete, i.e. when /// we receive the response. #[derive(Clone, Debug)] -pub struct TraceContext { +pub struct TraceContext { inner: S, - sink: K, - tracer: SdkTracer, + kind: SpanKind, + labels: SpanLabels, } // === impl TraceContext === -impl TraceContext { - pub fn layer(sink: K) -> impl layer::Layer> + Clone { +impl TraceContext { + pub fn layer( + kind: SpanKind, + labels: SpanLabels, + ) -> impl layer::Layer> + Clone { layer::mk(move |inner| TraceContext { inner, - sink: sink.clone(), - tracer: SdkTracerProvider::builder().build().tracer(""), + kind, + labels: labels.clone(), }) } @@ -47,35 +49,34 @@ impl TraceContext { /// The OpenTelemetry spec defines the semantic conventions that HTTP /// services should use for the labels included in traces: /// https://opentelemetry.io/docs/specs/semconv/http/http-spans/ - fn request_labels(&self, req: &http::Request) -> Vec { - let mut attributes = Vec::with_capacity(13); - attributes.push(KeyValue::new( + fn add_request_labels(&self, span: &mut Sp, req: &http::Request) { + span.set_attribute(KeyValue::new( semconv::trace::HTTP_REQUEST_METHOD, req.method().to_string(), )); let url = req.uri(); if let Some(scheme) = url.scheme_str() { - attributes.push(KeyValue::new( + span.set_attribute(KeyValue::new( semconv::trace::URL_SCHEME, scheme.to_string(), )); } - attributes.push(KeyValue::new( + span.set_attribute(KeyValue::new( semconv::trace::URL_PATH, url.path().to_string(), )); if let Some(query) = url.query() { - attributes.push(KeyValue::new(semconv::trace::URL_QUERY, query.to_string())); + span.set_attribute(KeyValue::new(semconv::trace::URL_QUERY, query.to_string())); } - attributes.push(KeyValue::new( + span.set_attribute(KeyValue::new( semconv::trace::URL_FULL, UrlLabel(url).to_string(), )); // linkerd currently only proxies tcp-based connections - attributes.push(KeyValue::new(semconv::trace::NETWORK_TRANSPORT, "tcp")); + span.set_attribute(KeyValue::new(semconv::trace::NETWORK_TRANSPORT, "tcp")); // This is the order of precedence for host headers, // see https://opentelemetry.io/docs/specs/semconv/http/http-spans/ @@ -89,22 +90,28 @@ impl TraceContext { if let Ok(host) = host.to_str() { if let Ok(uri) = host.parse::() { if let Some(host) = uri.host() { - attributes.push(KeyValue::new( + span.set_attribute(KeyValue::new( semconv::trace::SERVER_ADDRESS, host.to_string(), )); } if let Some(port) = uri.port() { - attributes - .push(KeyValue::new(semconv::trace::SERVER_PORT, port.to_string())); + span.set_attribute(KeyValue::new( + semconv::trace::SERVER_PORT, + port.to_string(), + )); } } } } - Self::populate_header_values(&mut attributes, req); + Self::populate_header_values(span, req); - attributes + span.set_attributes( + self.labels + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ); } /// Populates labels for common header values from the request. @@ -113,7 +120,7 @@ impl TraceContext { /// `http.request.header.
` values, but we shouldn't unconditionally populate all headers /// as they often contain authorization tokens/secrets/etc. Instead, we populate a subset of /// common headers into their respective labels. - fn populate_header_values(labels: &mut Vec, req: &http::Request) { + fn populate_header_values(span: &mut Sp, req: &http::Request) { static HEADER_LABELS: &[(&str, &str)] = &[ ("user-agent", semconv::trace::USER_AGENT_ORIGINAL), // http.request.body.size is available as a semantic convention, but is not stable. @@ -132,7 +139,7 @@ impl TraceContext { ]; for &(header, label) in HEADER_LABELS { if let Some(value) = req.headers().get(header) { - labels.push(KeyValue::new( + span.set_attribute(KeyValue::new( label, String::from_utf8_lossy(value.as_bytes()).to_string(), )); @@ -140,12 +147,11 @@ impl TraceContext { } } - fn add_response_labels(mut labels: Vec, rsp: &http::Response) -> Vec { - labels.push(KeyValue::new( + fn add_response_labels(span: SpanRef<'_>, rsp: &http::Response) { + span.set_attribute(KeyValue::new( semconv::trace::HTTP_RESPONSE_STATUS_CODE, rsp.status().as_str().to_string(), )); - labels } } @@ -182,9 +188,8 @@ impl Display for UrlLabel<'_> { } } -impl tower::Service> for TraceContext +impl tower::Service> for TraceContext where - K: Clone + SpanSink + Send + 'static, S: tower::Service, Response = http::Response>, S::Error: Send, S::Future: Send + 'static, @@ -203,12 +208,11 @@ where fn call(&mut self, mut req: http::Request) -> Self::Future { 'outer: { - if !self.sink.is_enabled() { - trace!("Sink not enabled, skipping"); - break 'outer; - } let parent_ctx = opentelemetry::global::get_text_map_propagator(|prop| { - prop.extract(&HeaderExtractor(req.headers())) + prop.extract_with_context( + &opentelemetry::Context::new(), + &HeaderExtractor(req.headers()), + ) }); if !parent_ctx.span().span_context().is_valid() || !parent_ctx.span().span_context().is_sampled() @@ -217,10 +221,14 @@ where break 'outer; } - let span = self - .tracer - .span_builder("".to_string()) - .start_with_context(&self.tracer, &parent_ctx); + let tracer = opentelemetry::global::tracer(""); + let span_name = req.uri().path().to_owned(); + let mut span = tracer + .span_builder(span_name) + .with_kind(self.kind.into()) + .start_with_context(&tracer, &parent_ctx); + self.add_request_labels(&mut span, &req); + let ctx = parent_ctx.with_span(span); opentelemetry::global::get_text_map_propagator(|prop| { @@ -228,27 +236,18 @@ where }); // If the request has been marked for sampling, record its metadata. - let start = SystemTime::now(); - let req_labels = self.request_labels(&req); - let mut sink = self.sink.clone(); - let span_name = req.uri().path().to_owned(); - return Either::Right(Box::pin(self.inner.call(req).map_ok(move |rsp| { - // Emit the completed span with the response metadata. - let span = Span { - span_id: ctx.span().span_context().span_id(), - trace_id: ctx.span().span_context().trace_id(), - parent_id: parent_ctx.span().span_context().span_id(), - span_name, - start, - end: SystemTime::now(), - labels: Self::add_response_labels(req_labels, &rsp), - }; - trace!(?span); - if let Err(error) = sink.try_send(span) { - info!(%error, "Span dropped"); - } - rsp - }))); + return Either::Right(Box::pin( + self.inner + .call(req) + .map_ok(move |rsp| { + let cx = opentelemetry::Context::current(); + let span = cx.span(); + trace!(?span); + Self::add_response_labels(span, &rsp); + rsp + }) + .with_context(ctx), + )); } // If there's no tracing to be done, just pass on the request to the inner service. @@ -260,11 +259,10 @@ where mod tests { use super::*; use http::HeaderMap; - use linkerd_error::Error; use linkerd_http_box::BoxBody; use opentelemetry::{SpanId, TraceId}; - use std::collections::BTreeMap; - use tokio::sync::mpsc; + use opentelemetry_sdk::trace::{InMemorySpanExporter, SdkTracerProvider, SpanData}; + use std::collections::{BTreeMap, HashMap}; use tower::{Layer, Service, ServiceExt}; const W3C_TRACEPARENT_HEADER: &str = "traceparent"; @@ -313,14 +311,14 @@ mod tests { ); assert_eq!( - exported_span.trace_id, + exported_span.span_context.trace_id(), TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").expect("trace id") ); assert_eq!( - exported_span.parent_id, + exported_span.parent_span_id, SpanId::from_hex("00f067aa0ba902b7").expect("span id") ); - assert_eq!(exported_span.span_id, sent_span_cx.span_id()); + assert_eq!(exported_span.span_context.span_id(), sent_span_cx.span_id()); } #[tokio::test(flavor = "current_thread")] @@ -363,14 +361,14 @@ mod tests { ); assert_eq!( - exported_span.trace_id, + exported_span.span_context.trace_id(), TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").expect("trace id") ); assert_eq!( - exported_span.parent_id, + exported_span.parent_span_id, SpanId::from_hex("00f067aa0ba902b7").expect("span id") ); - assert_eq!(exported_span.span_id, sent_span_cx.span_id()); + assert_eq!(exported_span.span_context.span_id(), sent_span_cx.span_id()); } #[tokio::test(flavor = "current_thread")] @@ -397,7 +395,7 @@ mod tests { .await; let labels = exported_span - .labels + .attributes .into_iter() .map(|kv| (kv.key.to_string(), kv.value.to_string())) .collect::>(); @@ -431,12 +429,18 @@ mod tests { ) } - async fn send_mock_request(req: http::Request) -> (HeaderMap, Span) { - let (span_tx, mut span_rx) = mpsc::channel(1); + async fn send_mock_request(req: http::Request) -> (HeaderMap, SpanData) { + let exporter = InMemorySpanExporter::default(); + opentelemetry::global::set_tracer_provider( + SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(), + ); let (inner, mut handle) = tower_test::mock::pair::, http::Response>(); - let mut stack = TraceContext::::layer(TestSink(span_tx)).layer(inner); + let mut stack = + TraceContext::layer(SpanKind::Server, SpanLabels::new(HashMap::default())).layer(inner); handle.allow(1); let stack = stack.ready().await.expect("ready"); @@ -450,23 +454,9 @@ mod tests { }), }; - ( - req_headers, - span_rx.try_recv().expect("must have exported span"), - ) - } - - #[derive(Clone)] - struct TestSink(mpsc::Sender); - - impl SpanSink for TestSink { - fn is_enabled(&self) -> bool { - true - } + let mut spans = exporter.get_finished_spans().expect("get finished spans"); + assert_eq!(spans.len(), 1); - fn try_send(&mut self, span: Span) -> Result<(), Error> { - self.0.try_send(span)?; - Ok(()) - } + (req_headers, spans.remove(0)) } }