diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 0f6023f19b..3a3887cc67 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -34,7 +34,7 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export(&mut self, _: LogBatch<'_>) -> LogResult<()> { + async fn export(&self, _: LogBatch<'_>) -> LogResult<()> { LogResult::Ok(()) } diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 3d1b334d66..0287516c34 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -247,7 +247,7 @@ mod tests { #[async_trait] impl LogExporter for ReentrantLogExporter { - async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> { + async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> { // This will cause a deadlock as the export itself creates a log // while still within the lock of the SimpleLogProcessor. warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io"); diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 985edc96ee..24bb667129 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -9,7 +9,7 @@ use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { + async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { let client = self .client .lock() diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 149c4b51f0..f0b52abf1b 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -10,6 +10,7 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; +use tokio::sync::Mutex; pub(crate) struct TonicLogsClient { inner: Option, @@ -20,7 +21,7 @@ pub(crate) struct TonicLogsClient { struct ClientInner { client: LogsServiceClient, - interceptor: BoxInterceptor, + interceptor: Mutex, } impl fmt::Debug for TonicLogsClient { @@ -45,7 +46,7 @@ impl TonicLogsClient { TonicLogsClient { inner: Some(ClientInner { client, - interceptor, + interceptor: Mutex::new(interceptor), }), resource: Default::default(), } @@ -54,11 +55,13 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { - let (mut client, metadata, extensions) = match &mut self.inner { + async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { + let (mut client, metadata, extensions) = match &self.inner { Some(inner) => { let (m, e, _) = inner .interceptor + .lock() + .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here .call(Request::new(())) .map_err(|e| LogError::Other(Box::new(e)))? .into_parts(); diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index a0c09ab28d..8ae10afb06 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -124,7 +124,7 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { + async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index bec13eb9dc..33ba3b0faa 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -11,6 +11,13 @@ - `ResourceDetector.detect()` no longer supports timeout option. - `opentelemetry::global::shutdown_tracer_provider()` Removed from the API, should now use `tracer_provider.shutdown()` see [#2369](https://github.com/open-telemetry/opentelemetry-rust/pull/2369) for a migration example. "Tracer provider" is cheaply cloneable, so users are encouraged to set a clone of it as the global (ex: `global::set_tracer_provider(provider.clone()))`, so that instrumentations and other components can obtain tracers from `global::tracer()`. The tracer_provider must be kept around to call shutdown on it at the end of application (ex: `tracer_provider.shutdown()`) +- *Breaking* The LogExporter::export() method no longer requires a mutable reference to self.: + Before: + async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> + After: + async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> + Custom exporters will need to internally synchronize any mutable state, if applicable. + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 3340836a57..5e2168a7ce 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -81,7 +81,7 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>; + async fn export(&self, batch: LogBatch<'_>) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b3fa80fd01..6d0bca8040 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -106,7 +106,7 @@ impl LogProcessor for SimpleLogProcessor { .exporter .lock() .map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into())) - .and_then(|mut exporter| { + .and_then(|exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); @@ -586,7 +586,7 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> { + async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> { Ok(()) } @@ -1093,7 +1093,7 @@ mod tests { #[async_trait::async_trait] impl LogExporter for LogExporterThatRequiresTokio { - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { + async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 1b309f2701..5ed0a3721c 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -183,7 +183,7 @@ impl InMemoryLogExporter { #[async_trait] impl LogExporter for InMemoryLogExporter { - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { + async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for (log_record, instrumentation) in batch.iter() { let owned_log = OwnedLogData { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index c5edb880bf..d06723c840 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -5,12 +5,13 @@ use opentelemetry_sdk::export::logs::LogBatch; use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::Resource; use std::sync::atomic; +use std::sync::atomic::Ordering; /// An OpenTelemetry exporter that writes Logs to stdout on export. pub struct LogExporter { resource: Resource, is_shutdown: atomic::AtomicBool, - resource_emitted: bool, + resource_emitted: atomic::AtomicBool, } impl Default for LogExporter { @@ -18,7 +19,7 @@ impl Default for LogExporter { LogExporter { resource: Resource::default(), is_shutdown: atomic::AtomicBool::new(false), - resource_emitted: false, + resource_emitted: atomic::AtomicBool::new(false), } } } @@ -32,15 +33,18 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { + async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { if self.is_shutdown.load(atomic::Ordering::SeqCst) { return Err("exporter is shut down".into()); } else { println!("Logs"); - if self.resource_emitted { + if self + .resource_emitted + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { print_logs(batch); } else { - self.resource_emitted = true; println!("Resource"); if let Some(schema_url) = self.resource.schema_url() { println!("\t Resource SchemaUrl: {:?}", schema_url); diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 6d3dd4dd1f..a4acbcec11 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -52,6 +52,8 @@ tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" sysinfo = { version = "0.32", optional = true } libc = "=0.2.164" # https://github.com/GuillaumeGomez/sysinfo/issues/1392 +async-trait = "0.1.51" +futures-executor = { workspace = true } [features] stats = ["sysinfo"] diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 1ede37b1ee..87d5e2c6ed 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -6,26 +6,38 @@ ~31 M/sec Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, - ~44 M /sec + ~40 M /sec */ use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer; -use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider}; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, LoggerProvider}; use tracing::error; use tracing_subscriber::prelude::*; mod throughput; +use async_trait::async_trait; + +#[derive(Debug, Clone)] +struct MockLogExporter; + +#[async_trait] +impl LogExporter for MockLogExporter { + async fn export(&self, _: LogBatch<'_>) -> LogResult<()> { + LogResult::Ok(()) + } +} #[derive(Debug)] -pub struct NoOpLogProcessor; - -impl LogProcessor for NoOpLogProcessor { - fn emit( - &self, - _record: &mut opentelemetry_sdk::logs::LogRecord, - _scope: &InstrumentationScope, - ) { +pub struct MockLogProcessor { + exporter: MockLogExporter, +} + +impl LogProcessor for MockLogProcessor { + fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) { + let log_tuple = &[(record as &LogRecord, scope)]; + let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); } fn force_flush(&self) -> opentelemetry_sdk::logs::LogResult<()> { @@ -40,7 +52,9 @@ impl LogProcessor for NoOpLogProcessor { fn main() { // LoggerProvider with a no-op processor. let provider: LoggerProvider = LoggerProvider::builder() - .with_log_processor(NoOpLogProcessor {}) + .with_log_processor(MockLogProcessor { + exporter: MockLogExporter {}, + }) .build(); // Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing.