diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index ec7e8c9b6d..129a0da7d8 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -9,7 +9,7 @@ use opentelemetry_sdk::{ }; impl SpanExporter for OtlpHttpClient { - async fn export(&mut self, batch: Vec) -> OTelSdkResult { + async fn export(&self, batch: Vec) -> OTelSdkResult { let client = match self .client .lock() diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index 45e62baf7e..4ddad37b50 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -1,4 +1,5 @@ use core::fmt; +use tokio::sync::Mutex; use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::trace::v1::{ @@ -23,7 +24,7 @@ pub(crate) struct TonicTracesClient { struct ClientInner { client: TraceServiceClient, - interceptor: BoxInterceptor, + interceptor: Mutex, } impl fmt::Debug for TonicTracesClient { @@ -50,7 +51,7 @@ impl TonicTracesClient { TonicTracesClient { inner: Some(ClientInner { client, - interceptor, + interceptor: Mutex::new(interceptor), }), resource: Default::default(), } @@ -58,18 +59,19 @@ impl TonicTracesClient { } impl SpanExporter for TonicTracesClient { - async fn export(&mut self, batch: Vec) -> OTelSdkResult { - let (mut client, metadata, extensions) = match &mut self.inner { + async fn export(&self, batch: Vec) -> OTelSdkResult { + let (mut client, metadata, extensions) = match &self.inner { Some(inner) => { - let (m, e, _) = match inner.interceptor.call(Request::new(())) { - Ok(res) => res.into_parts(), - Err(e) => return Err(OTelSdkError::InternalFailure(e.to_string())), - }; + let (m, e, _) = inner + .interceptor + .lock() + .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here + .call(Request::new(())) + .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))? + .into_parts(); (inner.client.clone(), m, e) } - None => { - return Err(OTelSdkError::AlreadyShutdown); - } + None => return Err(OTelSdkError::AlreadyShutdown), }; let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource); diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 2de7265282..dd1405a795 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -141,8 +141,8 @@ impl SpanExporter { } impl opentelemetry_sdk::trace::SpanExporter for SpanExporter { - async fn export(&mut self, batch: Vec) -> OTelSdkResult { - match &mut self.client { + async fn export(&self, batch: Vec) -> OTelSdkResult { + match &self.client { #[cfg(feature = "grpc-tonic")] SupportedTransportClient::Tonic(client) => client.export(batch).await, #[cfg(any(feature = "http-proto", feature = "http-json"))] diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 2838a84874..bb2cb9a20c 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -33,7 +33,17 @@ } } ``` - +- **Breaking** The SpanExporter::export() method no longer requires a mutable reference to self. + Before: + ```rust + async fn export(&mut self, batch: Vec) -> OTelSdkResult + ``` + After: + ```rust + async fn export(&self, batch: Vec) -> OTelSdkResult + ``` + Custom exporters will need to internally synchronize any mutable state, if applicable. + ## 0.28.0 Released 2025-Feb-10 diff --git a/opentelemetry-sdk/benches/context.rs b/opentelemetry-sdk/benches/context.rs index 3a355c96f0..fc552aa5f2 100644 --- a/opentelemetry-sdk/benches/context.rs +++ b/opentelemetry-sdk/benches/context.rs @@ -136,7 +136,7 @@ fn parent_sampled_tracer(inner_sampler: Sampler) -> (SdkTracerProvider, BoxedTra struct NoopExporter; impl SpanExporter for NoopExporter { - async fn export(&mut self, _spans: Vec) -> OTelSdkResult { + async fn export(&self, _spans: Vec) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/span_builder.rs b/opentelemetry-sdk/benches/span_builder.rs index 0fa91b8d69..1c42042d88 100644 --- a/opentelemetry-sdk/benches/span_builder.rs +++ b/opentelemetry-sdk/benches/span_builder.rs @@ -65,7 +65,7 @@ fn not_sampled_provider() -> (sdktrace::SdkTracerProvider, sdktrace::SdkTracer) struct NoopExporter; impl SpanExporter for NoopExporter { - async fn export(&mut self, _spans: Vec) -> OTelSdkResult { + async fn export(&self, _spans: Vec) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/trace.rs b/opentelemetry-sdk/benches/trace.rs index 7ba6e56964..2dc2dcad8c 100644 --- a/opentelemetry-sdk/benches/trace.rs +++ b/opentelemetry-sdk/benches/trace.rs @@ -59,7 +59,7 @@ fn criterion_benchmark(c: &mut Criterion) { struct VoidExporter; impl SpanExporter for VoidExporter { - async fn export(&mut self, _spans: Vec) -> OTelSdkResult { + async fn export(&self, _spans: Vec) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index 7ca070f70c..0144d9b212 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -41,7 +41,7 @@ pub struct TokioSpanExporter { } impl SpanExporter for TokioSpanExporter { - async fn export(&mut self, batch: Vec) -> OTelSdkResult { + async fn export(&self, batch: Vec) -> OTelSdkResult { batch.into_iter().try_for_each(|span_data| { self.tx_export .send(span_data) @@ -110,7 +110,7 @@ impl NoopSpanExporter { } impl SpanExporter for NoopSpanExporter { - async fn export(&mut self, _: Vec) -> OTelSdkResult { + async fn export(&self, _: Vec) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/trace/export.rs b/opentelemetry-sdk/src/trace/export.rs index 41772d0686..15db1bdaf4 100644 --- a/opentelemetry-sdk/src/trace/export.rs +++ b/opentelemetry-sdk/src/trace/export.rs @@ -28,7 +28,7 @@ pub trait SpanExporter: Send + Sync + Debug { /// Any retry logic that is required by the exporter is the responsibility /// of the exporter. fn export( - &mut self, + &self, batch: Vec, ) -> impl std::future::Future + Send; diff --git a/opentelemetry-sdk/src/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/trace/in_memory_exporter.rs index 22daca1a95..fc22bd0038 100644 --- a/opentelemetry-sdk/src/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/trace/in_memory_exporter.rs @@ -130,7 +130,7 @@ impl InMemorySpanExporter { } impl SpanExporter for InMemorySpanExporter { - async fn export(&mut self, batch: Vec) -> OTelSdkResult { + async fn export(&self, batch: Vec) -> OTelSdkResult { let result = self .spans .lock() diff --git a/opentelemetry-sdk/src/trace/runtime_tests.rs b/opentelemetry-sdk/src/trace/runtime_tests.rs index 739239c6d7..f21284a90a 100644 --- a/opentelemetry-sdk/src/trace/runtime_tests.rs +++ b/opentelemetry-sdk/src/trace/runtime_tests.rs @@ -25,7 +25,7 @@ struct SpanCountExporter { #[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] impl SpanExporter for SpanCountExporter { - async fn export(&mut self, batch: Vec) -> OTelSdkResult { + async fn export(&self, batch: Vec) -> OTelSdkResult { self.span_count.fetch_add(batch.len(), Ordering::SeqCst); Ok(()) } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index edeeff87e6..bb7b59d3f4 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -139,7 +139,7 @@ impl SpanProcessor for SimpleSpanProcessor { .exporter .lock() .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into())) - .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span]))); + .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span]))); if let Err(err) = result { // TODO: check error type, and log `error` only if the error is user-actionable, else log `debug` @@ -1022,7 +1022,7 @@ mod tests { } impl SpanExporter for MockSpanExporter { - async fn export(&mut self, batch: Vec) -> OTelSdkResult { + async fn export(&self, batch: Vec) -> OTelSdkResult { let exported_spans = self.exported_spans.clone(); exported_spans.lock().unwrap().extend(batch); Ok(()) diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index a730a06958..63e8d0498b 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -457,7 +457,7 @@ mod tests { D: Fn(Duration) -> DS + 'static + Send + Sync, DS: Future + Send + Sync + 'static, { - async fn export(&mut self, _batch: Vec) -> OTelSdkResult { + async fn export(&self, _batch: Vec) -> OTelSdkResult { (self.delay_fn)(self.delay_for).await; Ok(()) } diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index 917e4d91bf..ea88ca1f95 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -2,15 +2,15 @@ use chrono::{DateTime, Utc}; use core::fmt; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::trace::SpanData; -use std::sync::atomic; +use std::sync::atomic::{AtomicBool, Ordering}; use opentelemetry_sdk::resource::Resource; /// An OpenTelemetry exporter that writes Spans to stdout on export. pub struct SpanExporter { resource: Resource, - is_shutdown: atomic::AtomicBool, - resource_emitted: bool, + is_shutdown: AtomicBool, + resource_emitted: AtomicBool, } impl fmt::Debug for SpanExporter { @@ -23,23 +23,26 @@ impl Default for SpanExporter { fn default() -> Self { SpanExporter { resource: Resource::builder().build(), - is_shutdown: atomic::AtomicBool::new(false), - resource_emitted: false, + is_shutdown: AtomicBool::new(false), + resource_emitted: AtomicBool::new(false), } } } impl opentelemetry_sdk::trace::SpanExporter for SpanExporter { /// Write Spans to stdout - async fn export(&mut self, batch: Vec) -> OTelSdkResult { - if self.is_shutdown.load(atomic::Ordering::SeqCst) { + async fn export(&self, batch: Vec) -> OTelSdkResult { + if self.is_shutdown.load(Ordering::SeqCst) { Err(OTelSdkError::AlreadyShutdown) } else { println!("Spans"); - if self.resource_emitted { + if self + .resource_emitted + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { print_spans(batch); } else { - self.resource_emitted = true; println!("Resource"); if let Some(schema_url) = self.resource.schema_url() { println!("\tResource SchemaUrl: {:?}", schema_url); @@ -57,7 +60,7 @@ impl opentelemetry_sdk::trace::SpanExporter for SpanExporter { } fn shutdown(&mut self) -> OTelSdkResult { - self.is_shutdown.store(true, atomic::Ordering::SeqCst); + self.is_shutdown.store(true, Ordering::SeqCst); Ok(()) } diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index 755ea8ced4..24fe931c0e 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -126,7 +126,7 @@ async fn zipkin_export( impl trace::SpanExporter for ZipkinExporter { /// Export spans to Zipkin collector. - async fn export(&mut self, batch: Vec) -> OTelSdkResult { + async fn export(&self, batch: Vec) -> OTelSdkResult { zipkin_export(batch, self.uploader.clone(), self.local_endpoint.clone()).await } }