diff --git a/Cargo.lock b/Cargo.lock index 02679485a4..e7480fc1b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3018,6 +3018,8 @@ dependencies = [ "percent-encoding", "rand 0.9.2", "thiserror", + "tokio", + "tokio-stream", ] [[package]] diff --git a/linkerd/app/core/src/http_tracing.rs b/linkerd/app/core/src/http_tracing.rs index 960899dc64..984bf0b0ec 100644 --- a/linkerd/app/core/src/http_tracing.rs +++ b/linkerd/app/core/src/http_tracing.rs @@ -11,25 +11,17 @@ use tokio::sync::mpsc; pub type SpanSink = mpsc::Sender; pub fn server( - sink: Option, + _sink: Option, labels: impl Into, -) -> impl layer::Layer, S>> + Clone { - TraceContext::layer(sink.map(move |sink| SpanConverter { - kind: SpanKind::Server, - sink, - labels: labels.into(), - })) +) -> impl layer::Layer> + Clone { + TraceContext::layer(SpanKind::Server, labels.into()) } pub fn client( - sink: Option, + _sink: Option, labels: impl Into, -) -> impl layer::Layer, S>> + Clone { - TraceContext::layer(sink.map(move |sink| SpanConverter { - kind: SpanKind::Client, - sink, - labels: labels.into(), - })) +) -> impl layer::Layer> + Clone { + TraceContext::layer(SpanKind::Client, labels.into()) } #[derive(Clone)] diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index cf607e95c1..7908b57034 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -399,7 +399,6 @@ impl App { admin, drain, identity, - trace_collector: collector, start_proxy, tap, .. @@ -464,10 +463,6 @@ impl App { tokio::spawn(serve.instrument(info_span!("tap").or_current())); } - if let trace_collector::TraceCollector::Enabled(collector) = collector { - tokio::spawn(collector.task.instrument(info_span!("tracing"))); - } - // we don't care if the admin shutdown channel is // dropped or actually triggered. let _ = admin_shutdown_rx.await; diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs index 96e1aaf19a..1fd07baa51 100644 --- a/linkerd/app/src/trace_collector.rs +++ b/linkerd/app/src/trace_collector.rs @@ -33,7 +33,6 @@ pub enum TraceCollector { pub struct EnabledCollector { pub addr: control::ControlAddr, pub span_sink: SpanSink, - pub task: Task, } impl TraceCollector { diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs index 5c5a147a41..824fc05c70 100644 --- a/linkerd/app/src/trace_collector/otel_collector.rs +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -6,9 +6,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; use tonic::{body::Body as TonicBody, client::GrpcService}; -use tracing::Instrument; pub(super) struct OtelCollectorAttributes { pub hostname: Option, @@ -28,8 +26,7 @@ where S::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { - let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); - let spans_rx = ReceiverStream::new(spans_rx); + let (span_sink, _) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); let resource = sdk::Resource::builder() .with_attribute(KeyValue::new( @@ -56,14 +53,7 @@ where .build(); let addr = addr.clone(); - let task = Box::pin( - opentelemetry::export_spans(svc, spans_rx, resource, legacy_metrics) - .instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()), - ); + opentelemetry::install_opentelemetry_providers(svc, resource, legacy_metrics); - EnabledCollector { - addr, - task, - span_sink, - } + EnabledCollector { addr, span_sink } } diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 2491b06051..2123068f01 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -6,15 +6,9 @@ pub mod propagation; use self::metrics::Registry; use crate::propagation::OrderedPropagator; -use futures::stream::{Stream, StreamExt}; use http_body::Body; use linkerd_error::Error; -use linkerd_trace_context::{self as trace_context, export::ExportSpan}; pub use opentelemetry as otel; -use opentelemetry::{ - trace::{SpanContext, SpanKind, Status, TraceFlags, TraceState}, - KeyValue, -}; pub use opentelemetry_proto as proto; use opentelemetry_proto::{ tonic::collector::trace::v1::{ @@ -25,42 +19,44 @@ use opentelemetry_proto::{ pub use opentelemetry_sdk as sdk; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::trace::{ - BatchConfigBuilder, BatchSpanProcessor, SpanData, SpanLinks, SpanProcessor, + BatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider, SpanData, }; use opentelemetry_sdk::Resource; pub use opentelemetry_semantic_conventions as semconv; use std::fmt::{Debug, Formatter}; use std::time::Duration; use tonic::{body::Body as TonicBody, client::GrpcService}; -use tracing::{debug, info}; +use tracing::debug; -pub async fn export_spans(client: T, spans: S, resource: Resource, metrics: Registry) +pub fn install_opentelemetry_providers(client: T, resource: Resource, metrics: Registry) where T: GrpcService + Clone + Send + Sync + 'static, T::Error: Into, T::Future: Send, T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, { debug!("Span exporter running"); - let processor = BatchSpanProcessor::builder(SpanExporter { - client: TraceServiceClient::new(client), - resource, - metrics, - }) - .with_batch_config( - BatchConfigBuilder::default() - .with_max_queue_size(1000) - .with_scheduled_delay(Duration::from_secs(5)) + let provider = SdkTracerProvider::builder() + .with_span_processor( + BatchSpanProcessor::builder(SpanExporter { + client: TraceServiceClient::new(client), + resource, + metrics, + }) + .with_batch_config( + BatchConfigBuilder::default() + .with_max_queue_size(1000) + .with_scheduled_delay(Duration::from_secs(5)) + .build(), + ) .build(), - ) - .build(); + ) + .build(); + opentelemetry::global::set_tracer_provider(provider); opentelemetry::global::set_text_map_propagator(OrderedPropagator::new()); - - SpanExportTask::new(spans, processor).run().await; } /// SpanExporter sends a Stream of spans to the given TraceService gRPC service. @@ -120,113 +116,62 @@ where } } -struct SpanExportTask { - spans: S, - processor: BatchSpanProcessor, -} - -impl SpanExportTask -where - S: Stream + Unpin, -{ - fn new(spans: S, processor: BatchSpanProcessor) -> Self { - Self { spans, processor } - } - - async fn run(mut self) { - while let Some(span) = self.spans.next().await { - let s = match convert_span(span) { - Ok(s) => s, - Err(error) => { - info!(%error, "Span dropped"); - continue; - } - }; - - self.processor.on_end(s); - } - } -} - -fn convert_span(span: ExportSpan) -> Result { - let ExportSpan { span, kind, labels } = span; - - let mut attributes = Vec::::new(); - for (k, v) in labels.iter() { - attributes.push(KeyValue::new(k.clone(), v.clone())); - } - for kv in span.labels.into_iter() { - attributes.push(kv); - } - let is_remote = kind != trace_context::export::SpanKind::Client; - Ok(SpanData { - parent_span_id: span.parent_id, - parent_span_is_remote: true, // we do not originate any spans locally - span_kind: match kind { - trace_context::export::SpanKind::Server => SpanKind::Server, - trace_context::export::SpanKind::Client => SpanKind::Client, - }, - name: span.span_name.into(), - start_time: span.start, - end_time: span.end, - attributes, - dropped_attributes_count: 0, - links: SpanLinks::default(), - status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this - span_context: SpanContext::new( - span.trace_id, - span.span_id, - TraceFlags::default(), - is_remote, - TraceState::NONE, - ), - events: Default::default(), - instrumentation_scope: Default::default(), - }) -} - #[cfg(test)] mod tests { use super::*; - use linkerd_trace_context::{export::SpanKind, Span}; - use opentelemetry::{SpanId, TraceId}; - use opentelemetry_proto::tonic::common::v1::InstrumentationScope; - use std::{sync::Arc, time::SystemTime}; - use tokio::sync::mpsc; - use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt}; + use opentelemetry::trace::{SpanContext, TraceContextExt, TraceState, Tracer}; + use opentelemetry::{Context, SpanId, TraceFlags, TraceId}; + use opentelemetry_proto::tonic::common::v1::any_value::Value; + use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope}; + use std::collections::HashMap; + use tonic::codegen::tokio_stream::StreamExt; use tonic_prost::ProstDecoder; #[tokio::test(flavor = "current_thread")] async fn send_span() { + let _trace = linkerd_tracing::test::trace_init(); + let trace_id = TraceId::from_hex("0123456789abcedffedcba9876543210").expect("trace id"); let parent_id = SpanId::from_hex("fedcba9876543210").expect("parent id"); let span_id = SpanId::from_hex("0123456789abcedf").expect("span id"); - let span_name = "test".to_string(); - - let start = SystemTime::now(); - let end = SystemTime::now(); - - let span = ExportSpan { - span: Span { - trace_id, - span_id, - parent_id, - span_name: span_name.clone(), - start, - end, - labels: Vec::new(), - }, - kind: SpanKind::Server, - labels: Arc::new(Default::default()), - }; + let span_name = "test-span".to_string(); - let mut req = send_mock_request(span).await; + let mut req = send_mock_request(trace_id, parent_id, span_id, span_name.clone()).await; assert_eq!(req.resource_spans.len(), 1); let mut resource_span = req.resource_spans.remove(0); let resource_span_resource = resource_span.resource.expect("must have resource"); assert_eq!(resource_span_resource.dropped_attributes_count, 0); assert_eq!(resource_span_resource.entity_refs, vec![]); + let expected: HashMap<&str, Option<&str>> = HashMap::from_iter([ + ("telemetry.sdk.name", Some("opentelemetry")), + ("telemetry.sdk.version", None), + ("telemetry.sdk.language", Some("rust")), + ("service.name", Some("unknown_service")), + ]); + + assert_eq!(expected.len(), resource_span_resource.attributes.len()); + for (expected_key, expected_value) in expected { + let Some(actual) = resource_span_resource + .attributes + .iter() + .find(|kv| kv.key == expected_key) + else { + panic!("{expected_key} not found in resource attributes"); + }; + + let Some(AnyValue { + value: Some(Value::StringValue(actual_value)), + }) = &actual.value + else { + panic!("Unexpected value type: {:?}", actual.value); + }; + + if let Some(expected_value) = expected_value { + assert_eq!(actual_value, expected_value); + } + } + resource_span_resource .attributes .iter() @@ -251,55 +196,53 @@ mod tests { assert_eq!( scope_span.scope, Some(InstrumentationScope { - name: "".to_string(), + name: "test".to_string(), version: "".to_string(), attributes: vec![], dropped_attributes_count: 0, }) ); - assert_eq!(scope_span.spans.len(), 1); + assert_eq!(scope_span.spans.len(), 1); let span = scope_span.spans.remove(0); + assert_eq!(span.span_id, span_id.to_bytes().to_vec(),); assert_eq!(span.parent_span_id, parent_id.to_bytes().to_vec(),); assert_eq!(span.trace_id, trace_id.to_bytes().to_vec(),); assert_eq!(span.name, span_name); - assert_eq!( - span.start_time_unix_nano, - start - .duration_since(SystemTime::UNIX_EPOCH) - .expect("duration") - .as_nanos() as u64 - ); - assert_eq!( - span.end_time_unix_nano, - end.duration_since(SystemTime::UNIX_EPOCH) - .expect("duration") - .as_nanos() as u64 - ); - assert_eq!(span.flags, 0b11_0000_0000); + assert_eq!(span.flags, 0b11_0000_0001); } - async fn send_mock_request(span: ExportSpan) -> ExportTraceServiceRequest { - let _trace = linkerd_tracing::test::trace_init(); - let (span_tx, span_rx) = mpsc::channel(1); - + async fn send_mock_request( + trace_id: TraceId, + parent_id: SpanId, + span_id: SpanId, + span_name: String, + ) -> ExportTraceServiceRequest { let (inner, mut handle) = tower_test::mock::pair::< http::Request, http::Response, >(); handle.allow(1); - span_tx.try_send(span).expect("Must have space"); - let (metrics, _) = metrics::new(); - tokio::spawn(export_spans( - inner, - ReceiverStream::new(span_rx), - Resource::builder().build(), - metrics, + install_opentelemetry_providers(inner, Resource::builder().build(), metrics); + + let parent_cx = Context::new().with_remote_span_context(SpanContext::new( + trace_id, + parent_id, + TraceFlags::SAMPLED, + true, + TraceState::NONE, )); + let tracer = opentelemetry::global::tracer("test"); + let span = tracer + .span_builder(span_name) + .with_span_id(span_id) + .start_with_context(&tracer, &parent_cx); + drop(span); + let (req, _tx) = handle.next_request().await.expect("next request"); let req = tonic::Request::from_http(req); let mut req = tonic::codec::Streaming::::new_request( diff --git a/linkerd/trace-context/Cargo.toml b/linkerd/trace-context/Cargo.toml index 27b174e041..781a52a9dc 100644 --- a/linkerd/trace-context/Cargo.toml +++ b/linkerd/trace-context/Cargo.toml @@ -30,5 +30,6 @@ linkerd-http-box = { path = "../http/box" } linkerd-io = { path = "../io" } linkerd-tracing = { path = "../tracing" } linkerd-opentelemetry = { path = "../opentelemetry" } +opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace", "testing"] } tokio = { version = "1", features = ["test-util"] } tower-test = { workspace = true } diff --git a/linkerd/trace-context/src/service.rs b/linkerd/trace-context/src/service.rs index 950e8f9a7f..0fba0e21a1 100644 --- a/linkerd/trace-context/src/service.rs +++ b/linkerd/trace-context/src/service.rs @@ -1,19 +1,18 @@ -use crate::{Span, SpanSink}; +use crate::export::{SpanKind, SpanLabels}; use futures::{future::Either, prelude::*}; use http::Uri; use linkerd_stack::layer; -use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider}; +use opentelemetry::context::FutureExt; +use opentelemetry::trace::{Span, SpanRef, TraceContextExt, Tracer}; use opentelemetry::KeyValue; use opentelemetry_http::{HeaderExtractor, HeaderInjector}; -use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider}; use opentelemetry_semantic_conventions as semconv; use std::{ fmt::{Display, Formatter}, pin::Pin, task::{Context, Poll}, - time::SystemTime, }; -use tracing::{info, trace}; +use tracing::trace; /// A layer that adds distributed tracing instrumentation. /// @@ -25,20 +24,23 @@ use tracing::{info, trace}; /// about the span to the given SpanSink when the span is complete, i.e. when /// we receive the response. #[derive(Clone, Debug)] -pub struct TraceContext { +pub struct TraceContext { inner: S, - sink: K, - tracer: SdkTracer, + kind: SpanKind, + labels: SpanLabels, } // === impl TraceContext === -impl TraceContext { - pub fn layer(sink: K) -> impl layer::Layer> + Clone { +impl TraceContext { + pub fn layer( + kind: SpanKind, + labels: SpanLabels, + ) -> impl layer::Layer> + Clone { layer::mk(move |inner| TraceContext { inner, - sink: sink.clone(), - tracer: SdkTracerProvider::builder().build().tracer(""), + kind, + labels: labels.clone(), }) } @@ -47,35 +49,34 @@ impl TraceContext { /// The OpenTelemetry spec defines the semantic conventions that HTTP /// services should use for the labels included in traces: /// https://opentelemetry.io/docs/specs/semconv/http/http-spans/ - fn request_labels(&self, req: &http::Request) -> Vec { - let mut attributes = Vec::with_capacity(13); - attributes.push(KeyValue::new( + fn add_request_labels(&self, span: &mut Sp, req: &http::Request) { + span.set_attribute(KeyValue::new( semconv::trace::HTTP_REQUEST_METHOD, req.method().to_string(), )); let url = req.uri(); if let Some(scheme) = url.scheme_str() { - attributes.push(KeyValue::new( + span.set_attribute(KeyValue::new( semconv::trace::URL_SCHEME, scheme.to_string(), )); } - attributes.push(KeyValue::new( + span.set_attribute(KeyValue::new( semconv::trace::URL_PATH, url.path().to_string(), )); if let Some(query) = url.query() { - attributes.push(KeyValue::new(semconv::trace::URL_QUERY, query.to_string())); + span.set_attribute(KeyValue::new(semconv::trace::URL_QUERY, query.to_string())); } - attributes.push(KeyValue::new( + span.set_attribute(KeyValue::new( semconv::trace::URL_FULL, UrlLabel(url).to_string(), )); // linkerd currently only proxies tcp-based connections - attributes.push(KeyValue::new(semconv::trace::NETWORK_TRANSPORT, "tcp")); + span.set_attribute(KeyValue::new(semconv::trace::NETWORK_TRANSPORT, "tcp")); // This is the order of precedence for host headers, // see https://opentelemetry.io/docs/specs/semconv/http/http-spans/ @@ -89,22 +90,28 @@ impl TraceContext { if let Ok(host) = host.to_str() { if let Ok(uri) = host.parse::() { if let Some(host) = uri.host() { - attributes.push(KeyValue::new( + span.set_attribute(KeyValue::new( semconv::trace::SERVER_ADDRESS, host.to_string(), )); } if let Some(port) = uri.port() { - attributes - .push(KeyValue::new(semconv::trace::SERVER_PORT, port.to_string())); + span.set_attribute(KeyValue::new( + semconv::trace::SERVER_PORT, + port.to_string(), + )); } } } } - Self::populate_header_values(&mut attributes, req); + Self::populate_header_values(span, req); - attributes + span.set_attributes( + self.labels + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ); } /// Populates labels for common header values from the request. @@ -113,7 +120,7 @@ impl TraceContext { /// `http.request.header.
` values, but we shouldn't unconditionally populate all headers /// as they often contain authorization tokens/secrets/etc. Instead, we populate a subset of /// common headers into their respective labels. - fn populate_header_values(labels: &mut Vec, req: &http::Request) { + fn populate_header_values(span: &mut Sp, req: &http::Request) { static HEADER_LABELS: &[(&str, &str)] = &[ ("user-agent", semconv::trace::USER_AGENT_ORIGINAL), // http.request.body.size is available as a semantic convention, but is not stable. @@ -132,7 +139,7 @@ impl TraceContext { ]; for &(header, label) in HEADER_LABELS { if let Some(value) = req.headers().get(header) { - labels.push(KeyValue::new( + span.set_attribute(KeyValue::new( label, String::from_utf8_lossy(value.as_bytes()).to_string(), )); @@ -140,12 +147,11 @@ impl TraceContext { } } - fn add_response_labels(mut labels: Vec, rsp: &http::Response) -> Vec { - labels.push(KeyValue::new( + fn add_response_labels(span: SpanRef<'_>, rsp: &http::Response) { + span.set_attribute(KeyValue::new( semconv::trace::HTTP_RESPONSE_STATUS_CODE, rsp.status().as_str().to_string(), )); - labels } } @@ -182,9 +188,8 @@ impl Display for UrlLabel<'_> { } } -impl tower::Service> for TraceContext +impl tower::Service> for TraceContext where - K: Clone + SpanSink + Send + 'static, S: tower::Service, Response = http::Response>, S::Error: Send, S::Future: Send + 'static, @@ -203,12 +208,11 @@ where fn call(&mut self, mut req: http::Request) -> Self::Future { 'outer: { - if !self.sink.is_enabled() { - trace!("Sink not enabled, skipping"); - break 'outer; - } let parent_ctx = opentelemetry::global::get_text_map_propagator(|prop| { - prop.extract(&HeaderExtractor(req.headers())) + prop.extract_with_context( + &opentelemetry::Context::new(), + &HeaderExtractor(req.headers()), + ) }); if !parent_ctx.span().span_context().is_valid() || !parent_ctx.span().span_context().is_sampled() @@ -217,10 +221,14 @@ where break 'outer; } - let span = self - .tracer - .span_builder("".to_string()) - .start_with_context(&self.tracer, &parent_ctx); + let tracer = opentelemetry::global::tracer(""); + let span_name = req.uri().path().to_owned(); + let mut span = tracer + .span_builder(span_name) + .with_kind(self.kind.into()) + .start_with_context(&tracer, &parent_ctx); + self.add_request_labels(&mut span, &req); + let ctx = parent_ctx.with_span(span); opentelemetry::global::get_text_map_propagator(|prop| { @@ -228,27 +236,18 @@ where }); // If the request has been marked for sampling, record its metadata. - let start = SystemTime::now(); - let req_labels = self.request_labels(&req); - let mut sink = self.sink.clone(); - let span_name = req.uri().path().to_owned(); - return Either::Right(Box::pin(self.inner.call(req).map_ok(move |rsp| { - // Emit the completed span with the response metadata. - let span = Span { - span_id: ctx.span().span_context().span_id(), - trace_id: ctx.span().span_context().trace_id(), - parent_id: parent_ctx.span().span_context().span_id(), - span_name, - start, - end: SystemTime::now(), - labels: Self::add_response_labels(req_labels, &rsp), - }; - trace!(?span); - if let Err(error) = sink.try_send(span) { - info!(%error, "Span dropped"); - } - rsp - }))); + return Either::Right(Box::pin( + self.inner + .call(req) + .map_ok(move |rsp| { + let cx = opentelemetry::Context::current(); + let span = cx.span(); + trace!(?span); + Self::add_response_labels(span, &rsp); + rsp + }) + .with_context(ctx), + )); } // If there's no tracing to be done, just pass on the request to the inner service. @@ -260,11 +259,10 @@ where mod tests { use super::*; use http::HeaderMap; - use linkerd_error::Error; use linkerd_http_box::BoxBody; use opentelemetry::{SpanId, TraceId}; - use std::collections::BTreeMap; - use tokio::sync::mpsc; + use opentelemetry_sdk::trace::{InMemorySpanExporter, SdkTracerProvider, SpanData}; + use std::collections::{BTreeMap, HashMap}; use tower::{Layer, Service, ServiceExt}; const W3C_TRACEPARENT_HEADER: &str = "traceparent"; @@ -313,14 +311,14 @@ mod tests { ); assert_eq!( - exported_span.trace_id, + exported_span.span_context.trace_id(), TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").expect("trace id") ); assert_eq!( - exported_span.parent_id, + exported_span.parent_span_id, SpanId::from_hex("00f067aa0ba902b7").expect("span id") ); - assert_eq!(exported_span.span_id, sent_span_cx.span_id()); + assert_eq!(exported_span.span_context.span_id(), sent_span_cx.span_id()); } #[tokio::test(flavor = "current_thread")] @@ -363,14 +361,14 @@ mod tests { ); assert_eq!( - exported_span.trace_id, + exported_span.span_context.trace_id(), TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").expect("trace id") ); assert_eq!( - exported_span.parent_id, + exported_span.parent_span_id, SpanId::from_hex("00f067aa0ba902b7").expect("span id") ); - assert_eq!(exported_span.span_id, sent_span_cx.span_id()); + assert_eq!(exported_span.span_context.span_id(), sent_span_cx.span_id()); } #[tokio::test(flavor = "current_thread")] @@ -397,7 +395,7 @@ mod tests { .await; let labels = exported_span - .labels + .attributes .into_iter() .map(|kv| (kv.key.to_string(), kv.value.to_string())) .collect::>(); @@ -431,12 +429,18 @@ mod tests { ) } - async fn send_mock_request(req: http::Request) -> (HeaderMap, Span) { - let (span_tx, mut span_rx) = mpsc::channel(1); + async fn send_mock_request(req: http::Request) -> (HeaderMap, SpanData) { + let exporter = InMemorySpanExporter::default(); + opentelemetry::global::set_tracer_provider( + SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(), + ); let (inner, mut handle) = tower_test::mock::pair::, http::Response>(); - let mut stack = TraceContext::::layer(TestSink(span_tx)).layer(inner); + let mut stack = + TraceContext::layer(SpanKind::Server, SpanLabels::new(HashMap::default())).layer(inner); handle.allow(1); let stack = stack.ready().await.expect("ready"); @@ -450,23 +454,9 @@ mod tests { }), }; - ( - req_headers, - span_rx.try_recv().expect("must have exported span"), - ) - } - - #[derive(Clone)] - struct TestSink(mpsc::Sender); - - impl SpanSink for TestSink { - fn is_enabled(&self) -> bool { - true - } + let mut spans = exporter.get_finished_spans().expect("get finished spans"); + assert_eq!(spans.len(), 1); - fn try_send(&mut self, span: Span) -> Result<(), Error> { - self.0.try_send(span)?; - Ok(()) - } + (req_headers, spans.remove(0)) } }