From a76c4c6db9bd8cb0c0de5ce6c3d963d588330ebc Mon Sep 17 00:00:00 2001 From: Braden Steffaniak Date: Tue, 11 Feb 2025 15:01:16 -0500 Subject: [PATCH 1/4] Upgrade opentelemetry-prometheus to 0.28 (#2650) --- opentelemetry-otlp/src/exporter/http/mod.rs | 2 +- opentelemetry-otlp/src/exporter/mod.rs | 4 +- opentelemetry-otlp/src/exporter/tonic/mod.rs | 2 +- opentelemetry-prometheus/CHANGELOG.md | 8 +- opentelemetry-prometheus/Cargo.toml | 8 +- opentelemetry-prometheus/src/lib.rs | 7 +- .../tests/integration_test.rs | 88 +++++++------------ 7 files changed, 51 insertions(+), 68 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 650358533e..10342a858a 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -120,7 +120,7 @@ impl HttpExporterBuilder { .or(env::var(OTEL_EXPORTER_OTLP_TIMEOUT).ok()) { Some(val) => match val.parse() { - Ok(seconds) => Duration::from_secs(seconds), + Ok(seconds) => Duration::from_millis(seconds), Err(_) => self.exporter_config.timeout, }, None => self.exporter_config.timeout, diff --git a/opentelemetry-otlp/src/exporter/mod.rs b/opentelemetry-otlp/src/exporter/mod.rs index bec1c809bb..219ea76387 100644 --- a/opentelemetry-otlp/src/exporter/mod.rs +++ b/opentelemetry-otlp/src/exporter/mod.rs @@ -52,7 +52,7 @@ const OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON: &str = "http/json"; /// Max waiting time for the backend to process each signal batch, defaults to 10 seconds. pub const OTEL_EXPORTER_OTLP_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TIMEOUT"; /// Default max waiting time for the backend to process each signal batch. -pub const OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT: u64 = 10; +pub const OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT: u64 = 10000; // Endpoints per protocol https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md #[cfg(feature = "grpc-tonic")] @@ -87,7 +87,7 @@ impl Default for ExportConfig { // don't use default_endpoint(protocol) here otherwise we // won't know if user provided a value protocol, - timeout: Duration::from_secs(OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT), + timeout: Duration::from_millis(OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT), } } } diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 9e2b54c631..3b4bf03d42 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -200,7 +200,7 @@ impl TonicExporterBuilder { .or(env::var(OTEL_EXPORTER_OTLP_TIMEOUT).ok()) { Some(val) => match val.parse() { - Ok(seconds) => Duration::from_secs(seconds), + Ok(seconds) => Duration::from_millis(seconds), Err(_) => config.timeout, }, None => config.timeout, diff --git a/opentelemetry-prometheus/CHANGELOG.md b/opentelemetry-prometheus/CHANGELOG.md index b45f2b51dc..676cb24cff 100644 --- a/opentelemetry-prometheus/CHANGELOG.md +++ b/opentelemetry-prometheus/CHANGELOG.md @@ -2,8 +2,12 @@ ## vNext -- Bump msrv to 1.75.0. +## v0.28.0 +- Update `opentelemetry` dependency version to 0.28 +- Update `opentelemetry_sdk` dependency version to 0.28 +- Update `opentelemetry-semantic-conventions` dependency version to 0.28 +- Bump msrv to 1.75.0. ## v0.27.0 @@ -11,7 +15,6 @@ - Update `opentelemetry_sdk` dependency version to 0.27 - Update `opentelemetry-semantic-conventions` dependency version to 0.27 - ## v0.17.0 ### Changed @@ -66,6 +69,7 @@ ## v0.12.0 ### Changed + - [Breaking] Add `_total` suffix for all counters [#952](https://github.com/open-telemetry/opentelemetry-rust/pull/952). - Update to `opentelemetry` v0.19. - Bump MSRV to 1.57 [#953](https://github.com/open-telemetry/opentelemetry-rust/pull/953). diff --git a/opentelemetry-prometheus/Cargo.toml b/opentelemetry-prometheus/Cargo.toml index 4ace0c2a84..938fe3535d 100644 --- a/opentelemetry-prometheus/Cargo.toml +++ b/opentelemetry-prometheus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opentelemetry-prometheus" -version = "0.27.0" +version = "0.28.0" description = "Prometheus exporter for OpenTelemetry" homepage = "https://github.com/open-telemetry/opentelemetry-rust" repository = "https://github.com/open-telemetry/opentelemetry-rust" @@ -21,14 +21,14 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] once_cell = { version = "1.13" } -opentelemetry = { version = "0.27", default-features = false, features = ["metrics"] } -opentelemetry_sdk = { version = "0.27", default-features = false, features = ["metrics"] } +opentelemetry = { version = "0.28", default-features = false, features = ["metrics"] } +opentelemetry_sdk = { version = "0.28", default-features = false, features = ["metrics"] } prometheus = "0.13" protobuf = "2.14" tracing = {version = ">=0.1.40", default-features = false, optional = true} # optional for opentelemetry internal logging [dev-dependencies] -opentelemetry-semantic-conventions = { version = "0.27" } +opentelemetry-semantic-conventions = { version = "0.28" } http-body-util = { version = "0.1" } hyper = { version = "1.3", features = ["full"] } hyper-util = { version = "0.1", features = ["full"] } diff --git a/opentelemetry-prometheus/src/lib.rs b/opentelemetry-prometheus/src/lib.rs index 968635a715..e68243fe2c 100644 --- a/opentelemetry-prometheus/src/lib.rs +++ b/opentelemetry-prometheus/src/lib.rs @@ -99,6 +99,7 @@ use once_cell::sync::{Lazy, OnceCell}; use opentelemetry::{otel_error, otel_warn, InstrumentationScope, Key, Value}; use opentelemetry_sdk::{ + error::OTelSdkResult, metrics::{ data::{self, ResourceMetrics}, reader::MetricReader, @@ -157,11 +158,11 @@ impl MetricReader for PrometheusExporter { self.reader.collect(rm) } - fn force_flush(&self) -> MetricResult<()> { + fn force_flush(&self) -> OTelSdkResult { self.reader.force_flush() } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> OTelSdkResult { self.reader.shutdown() } @@ -284,7 +285,7 @@ impl prometheus::core::Collector for Collector { }; let mut metrics = ResourceMetrics { - resource: Resource::empty(), + resource: Resource::builder_empty().build(), scope_metrics: vec![], }; if let Err(err) = self.reader.collect(&mut metrics) { diff --git a/opentelemetry-prometheus/tests/integration_test.rs b/opentelemetry-prometheus/tests/integration_test.rs index 286f8e0392..ccc3b1edc7 100644 --- a/opentelemetry-prometheus/tests/integration_test.rs +++ b/opentelemetry-prometheus/tests/integration_test.rs @@ -1,16 +1,12 @@ use std::collections::HashSet; use std::fs; use std::path::Path; -use std::time::Duration; use opentelemetry::metrics::{Meter, MeterProvider as _}; use opentelemetry::KeyValue; use opentelemetry::{InstrumentationScope, Key}; use opentelemetry_prometheus::{ExporterBuilder, ResourceSelector}; use opentelemetry_sdk::metrics::SdkMeterProvider; -use opentelemetry_sdk::resource::{ - EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector, -}; use opentelemetry_sdk::Resource; use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, TELEMETRY_SDK_VERSION}; use prometheus::{Encoder, TextEncoder}; @@ -360,26 +356,20 @@ fn prometheus_exporter_integration() { let exporter = tc.builder.with_registry(registry.clone()).build().unwrap(); let res = if tc.empty_resource { - Resource::empty() + Resource::builder_empty().build() } else { - Resource::from_detectors( - Duration::from_secs(0), - vec![ - Box::new(SdkProvidedResourceDetector), - Box::new(EnvResourceDetector::new()), - Box::new(TelemetryResourceDetector), - ], - ) - .merge(&mut Resource::new( - vec![ - // always specify service.name because the default depends on the running OS - KeyValue::new(SERVICE_NAME, "prometheus_test"), - // Overwrite the semconv.TelemetrySDKVersionKey value so we don't need to update every version - KeyValue::new(TELEMETRY_SDK_VERSION, "latest"), - ] - .into_iter() - .chain(tc.custom_resource_attrs.into_iter()), - )) + Resource::builder() + .with_attributes( + vec![ + // always specify service.name because the default depends on the running OS + KeyValue::new(SERVICE_NAME, "prometheus_test"), + // Overwrite the semconv.TelemetrySDKVersionKey value so we don't need to update every version + KeyValue::new(TELEMETRY_SDK_VERSION, "latest"), + ] + .into_iter() + .chain(tc.custom_resource_attrs.into_iter()), + ) + .build() }; let provider = SdkMeterProvider::builder() @@ -431,20 +421,14 @@ fn multiple_scopes() { .build() .unwrap(); - let resource = Resource::from_detectors( - Duration::from_secs(0), - vec![ - Box::new(SdkProvidedResourceDetector), - Box::new(EnvResourceDetector::new()), - Box::new(TelemetryResourceDetector), - ], - ) - .merge(&mut Resource::new(vec![ - // always specify service.name because the default depends on the running OS - KeyValue::new(SERVICE_NAME, "prometheus_test"), - // Overwrite the semconv.TelemetrySDKVersionKey value so we don't need to update every version - KeyValue::new(TELEMETRY_SDK_VERSION, "latest"), - ])); + let resource = Resource::builder() + .with_attributes([ + // always specify service.name because the default depends on the running OS + KeyValue::new(SERVICE_NAME, "prometheus_test"), + // Overwrite the semconv.TelemetrySDKVersionKey value so we don't need to update every version + KeyValue::new(TELEMETRY_SDK_VERSION, "latest"), + ]) + .build(); let provider = SdkMeterProvider::builder() .with_reader(exporter) @@ -781,24 +765,18 @@ fn duplicate_metrics() { let registry = prometheus::Registry::new(); let exporter = tc.builder.with_registry(registry.clone()).build().unwrap(); - let resource = Resource::from_detectors( - Duration::from_secs(0), - vec![ - Box::new(SdkProvidedResourceDetector), - Box::new(EnvResourceDetector::new()), - Box::new(TelemetryResourceDetector), - ], - ) - .merge(&mut Resource::new( - vec![ - // always specify service.name because the default depends on the running OS - KeyValue::new(SERVICE_NAME, "prometheus_test"), - // Overwrite the semconv.TelemetrySDKVersionKey value so we don't need to update every version - KeyValue::new(TELEMETRY_SDK_VERSION, "latest"), - ] - .into_iter() - .chain(tc.custom_resource_attrs.into_iter()), - )); + let resource = Resource::builder() + .with_attributes( + vec![ + // always specify service.name because the default depends on the running OS + KeyValue::new(SERVICE_NAME, "prometheus_test"), + // Overwrite the semconv.TelemetrySDKVersionKey value so we don't need to update every version + KeyValue::new(TELEMETRY_SDK_VERSION, "latest"), + ] + .into_iter() + .chain(tc.custom_resource_attrs.into_iter()), + ) + .build(); let provider = SdkMeterProvider::builder() .with_resource(resource) From 015252a93a12207f67e821c49a1c67a9e432fbf3 Mon Sep 17 00:00:00 2001 From: mohammadVatandoost Date: Thu, 6 Mar 2025 22:10:43 +0100 Subject: [PATCH 2/4] add timeout duration to shutdown --- opentelemetry-otlp/src/exporter/http/logs.rs | 4 +++- opentelemetry-otlp/src/exporter/tonic/logs.rs | 3 ++- opentelemetry-sdk/benches/log.rs | 3 ++- opentelemetry-sdk/benches/log_exporter.rs | 5 +++-- opentelemetry-sdk/benches/log_processor.rs | 8 +++---- .../src/logs/batch_log_processor.rs | 20 ++++++++--------- opentelemetry-sdk/src/logs/export.rs | 3 ++- .../src/logs/in_memory_exporter.rs | 3 ++- opentelemetry-sdk/src/logs/log_processor.rs | 10 +++++---- opentelemetry-sdk/src/logs/logger_provider.rs | 22 ++++++++++++------- .../src/logs/simple_log_processor.rs | 7 +++--- opentelemetry-stdout/src/logs/exporter.rs | 3 ++- stress/src/logs.rs | 4 +++- 13 files changed, 56 insertions(+), 39 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index d108e59c5c..9ef51e2d6d 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use super::OtlpHttpClient; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; @@ -46,7 +48,7 @@ impl LogExporter for OtlpHttpClient { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { let mut client_guard = self.client.lock().map_err(|e| { OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e)) })?; diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index cdbed23be2..56629f4fce 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,4 +1,5 @@ use core::fmt; +use std::time::Duration; use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, @@ -86,7 +87,7 @@ impl LogExporter for TonicLogsClient { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { match self.inner.take() { Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown. None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down. diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index fe78faabdd..39cf35c6db 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -17,6 +17,7 @@ RAM: 64.0 GB use opentelemetry::time::now; use std::collections::HashMap; +use std::time::Duration; use criterion::{criterion_group, criterion_main, Criterion}; @@ -38,7 +39,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 5fd4ac40dc..520b0f21c0 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -13,6 +13,7 @@ use opentelemetry::time::now; use opentelemetry_sdk::error::OTelSdkResult; use std::sync::Mutex; +use std::time::Duration; use criterion::{criterion_group, criterion_main, Criterion}; @@ -73,7 +74,7 @@ impl LogProcessor for ExportingProcessorWithFuture { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -104,7 +105,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index b2b53aa013..2c7a2c7716 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -14,7 +14,7 @@ use opentelemetry::time::now; use std::{ sync::{Arc, Mutex}, - thread::sleep, + thread::sleep, time::Duration, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -54,7 +54,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -71,7 +71,7 @@ impl LogProcessor for CloningProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -117,7 +117,7 @@ impl LogProcessor for SendToChannelProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index cb475a7574..3d33ca1563 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -132,7 +132,6 @@ pub struct BatchLogProcessor { message_sender: SyncSender, // Control channel to store control messages for the worker thread handle: Mutex>>, forceflush_timeout: Duration, - shutdown_timeout: Duration, export_log_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, @@ -256,7 +255,7 @@ impl LogProcessor for BatchLogProcessor { } } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -272,7 +271,7 @@ impl LogProcessor for BatchLogProcessor { match self.message_sender.try_send(BatchMessage::Shutdown(sender)) { Ok(_) => { receiver - .recv_timeout(self.shutdown_timeout) + .recv_timeout(timeout) .map(|_| { // join the background thread after receiving back the // shutdown signal @@ -287,7 +286,7 @@ impl LogProcessor for BatchLogProcessor { name: "BatchLogProcessor.Shutdown.Timeout", message = "BatchLogProcessor shutdown timing out." ); - OTelSdkError::Timeout(self.shutdown_timeout) + OTelSdkError::Timeout(timeout) } _ => { otel_error!( @@ -436,7 +435,7 @@ impl BatchLogProcessor { ¤t_batch_size, &config, ); - let _ = exporter.shutdown(); + let _ = exporter.shutdown(Duration::from_secs(5)); let _ = sender.send(result); otel_debug!( @@ -488,7 +487,6 @@ impl BatchLogProcessor { message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable - shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), @@ -922,7 +920,7 @@ mod tests { processor.emit(&mut record, &instrumentation); processor.force_flush().unwrap(); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()); @@ -934,27 +932,27 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } } diff --git a/opentelemetry-sdk/src/logs/export.rs b/opentelemetry-sdk/src/logs/export.rs index 4074c6edd8..d0b3a719b2 100644 --- a/opentelemetry-sdk/src/logs/export.rs +++ b/opentelemetry-sdk/src/logs/export.rs @@ -6,6 +6,7 @@ use crate::Resource; use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; use std::fmt::Debug; +use std::time::Duration; /// A batch of log records to be exported by a `LogExporter`. /// @@ -136,7 +137,7 @@ pub trait LogExporter: Send + Sync + Debug { ) -> impl std::future::Future + Send; /// Shuts down the exporter. - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { Ok(()) } #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/logs/in_memory_exporter.rs index d066aa4654..d89fb77261 100644 --- a/opentelemetry-sdk/src/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/logs/in_memory_exporter.rs @@ -6,6 +6,7 @@ use opentelemetry::InstrumentationScope; use std::borrow::Cow; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; +use std::time::Duration; type LogResult = Result; @@ -211,7 +212,7 @@ impl LogExporter for InMemoryLogExporter { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { self.shutdown_called .store(true, std::sync::atomic::Ordering::Relaxed); if self.should_reset_on_shutdown { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6054aa3415..04835a5242 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -34,6 +34,7 @@ use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; use std::fmt::Debug; +use std::time::Duration; /// The interface for plugging into a [`SdkLogger`]. /// @@ -56,7 +57,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// Shuts down the processor. /// After shutdown returns the log processor should stop processing any logs. /// It's up to the implementation on when to drop the LogProcessor. - fn shutdown(&self) -> OTelSdkResult; + fn shutdown(&self, timeout: Duration) -> OTelSdkResult; #[cfg(feature = "spec_unstable_logs_enabled")] /// Check if logging is enabled fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { @@ -81,6 +82,7 @@ pub(crate) mod tests { use opentelemetry::logs::{Logger, LoggerProvider}; use opentelemetry::{InstrumentationScope, Key}; use std::sync::{Arc, Mutex}; + use std::time::Duration; #[derive(Debug, Clone)] pub(crate) struct MockLogExporter { @@ -92,7 +94,7 @@ pub(crate) mod tests { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -138,7 +140,7 @@ pub(crate) mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -168,7 +170,7 @@ pub(crate) mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index 612412f36e..47b355685c 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -3,6 +3,7 @@ use crate::error::{OTelSdkError, OTelSdkResult}; use crate::logs::LogExporter; use crate::Resource; use opentelemetry::{otel_debug, otel_info, InstrumentationScope}; +use std::time::Duration; use std::{ borrow::Cow, sync::{ @@ -100,8 +101,13 @@ impl SdkLoggerProvider { } } - /// Shuts down this `LoggerProvider` + /// Shuts down this `LoggerProvider` with default timeout. pub fn shutdown(&self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_secs(5)) // TODO: make this configurable + } + + /// Shuts down this `LoggerProvider` with a timeout. + pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { otel_debug!( name: "LoggerProvider.ShutdownInvokedByUser", ); @@ -112,7 +118,7 @@ impl SdkLoggerProvider { .is_ok() { // propagate the shutdown signal to processors - let result = self.inner.shutdown(); + let result = self.inner.shutdown(timeout); if result.iter().all(|res| res.is_ok()) { Ok(()) } else { @@ -139,10 +145,10 @@ struct LoggerProviderInner { impl LoggerProviderInner { /// Shuts down the `LoggerProviderInner` and returns any errors. - pub(crate) fn shutdown(&self) -> Vec { + pub(crate) fn shutdown(&self, timeout: Duration) -> Vec { let mut results = vec![]; for processor in &self.processors { - let result = processor.shutdown(); + let result = processor.shutdown(timeout); if let Err(err) = &result { // Log at debug level because: // - The error is also returned to the user for handling (if applicable) @@ -164,7 +170,7 @@ impl Drop for LoggerProviderInner { name: "LoggerProvider.Drop", message = "Last reference of LoggerProvider dropped, initiating shutdown." ); - let _ = self.shutdown(); // errors are handled within shutdown + let _ = self.shutdown(Duration::from_secs(5)); // errors are handled within shutdown } else { otel_debug!( name: "LoggerProvider.Drop.AlreadyShutdown", @@ -331,7 +337,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { self.is_shutdown .lock() .map(|mut is_shutdown| *is_shutdown = true) @@ -776,7 +782,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { *self.shutdown_called.lock().unwrap() = true; Ok(()) } @@ -807,7 +813,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let mut count = self.shutdown_count.lock().unwrap(); *count += 1; Ok(()) diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index 9a2fada3dc..3651cc9270 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -28,6 +28,7 @@ use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; use std::fmt::Debug; use std::sync::atomic::AtomicBool; use std::sync::Mutex; +use std::time::Duration; /// A [`LogProcessor`] designed for testing and debugging purpose, that immediately /// exports log records as they are emitted. Log records are exported synchronously @@ -114,11 +115,11 @@ impl LogProcessor for SimpleLogProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { self.is_shutdown .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown() + exporter.shutdown(timeout) } else { Err(OTelSdkError::InternalFailure( "SimpleLogProcessor mutex poison at shutdown".into(), @@ -214,7 +215,7 @@ mod tests { processor.emit(&mut record, &instrumentation); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); let is_shutdown = processor .is_shutdown diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 518dbfe2a3..6958070861 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,5 +1,6 @@ use chrono::{DateTime, Utc}; use core::fmt; +use std::time::Duration; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::logs::LogBatch; use opentelemetry_sdk::Resource; @@ -57,7 +58,7 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter { } } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { self.is_shutdown.store(true, atomic::Ordering::SeqCst); Ok(()) } diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 65c0943b35..96e067b798 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -9,6 +9,8 @@ ~40 M /sec */ +use std::time::Duration; + use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::error::OTelSdkResult; @@ -48,7 +50,7 @@ impl LogProcessor for MockLogProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } From 41b620d6ca461e1c36dcbe9f840e925b5eb5b295 Mon Sep 17 00:00:00 2001 From: mohammadVatandoost Date: Mon, 10 Mar 2025 22:27:07 +0100 Subject: [PATCH 3/4] add shutdown with timeout for metric provider --- .../benches/logs.rs | 4 +-- opentelemetry-otlp/src/exporter/http/logs.rs | 4 +-- opentelemetry-otlp/src/exporter/tonic/logs.rs | 3 +- opentelemetry-proto/src/transform/logs.rs | 3 +- opentelemetry-sdk/benches/log_processor.rs | 3 +- opentelemetry-sdk/benches/metric.rs | 10 +++---- .../src/logs/batch_log_processor.rs | 2 +- opentelemetry-sdk/src/logs/export.rs | 3 +- .../src/logs/in_memory_exporter.rs | 3 +- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- .../logs/log_processor_with_async_runtime.rs | 28 +++++++++---------- opentelemetry-sdk/src/logs/logger_provider.rs | 2 +- .../src/logs/simple_log_processor.rs | 4 +-- .../src/metrics/manual_reader.rs | 6 ++-- .../src/metrics/meter_provider.rs | 24 +++++++++------- .../src/metrics/periodic_reader.rs | 12 ++++---- .../periodic_reader_with_async_runtime.rs | 2 +- opentelemetry-sdk/src/metrics/pipeline.rs | 12 ++++---- opentelemetry-sdk/src/metrics/reader.rs | 6 ++-- .../src/testing/metrics/metric_reader.rs | 6 ++-- opentelemetry-stdout/src/logs/exporter.rs | 3 +- 21 files changed, 70 insertions(+), 72 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 0ec47c1863..d78bbf4166 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -12,7 +12,6 @@ | ot_layer_disabled | 19 ns | | ot_layer_enabled | 196 ns | */ - use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer as tracing_layer; @@ -21,6 +20,7 @@ use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}; use opentelemetry_sdk::Resource; #[cfg(not(target_os = "windows"))] use pprof::criterion::{Output, PProfProfiler}; +use std::time::Duration; use tracing::error; use tracing_subscriber::prelude::*; use tracing_subscriber::Layer; @@ -44,7 +44,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 9ef51e2d6d..d108e59c5c 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use super::OtlpHttpClient; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; @@ -48,7 +46,7 @@ impl LogExporter for OtlpHttpClient { Ok(()) } - fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { + fn shutdown(&mut self) -> OTelSdkResult { let mut client_guard = self.client.lock().map_err(|e| { OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e)) })?; diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 56629f4fce..cdbed23be2 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,5 +1,4 @@ use core::fmt; -use std::time::Duration; use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, @@ -87,7 +86,7 @@ impl LogExporter for TonicLogsClient { Ok(()) } - fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { + fn shutdown(&mut self) -> OTelSdkResult { match self.inner.take() { Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown. None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down. diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index f1a992fc9a..a206c18cb2 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -229,6 +229,7 @@ mod tests { use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource}; + use std::time; #[derive(Debug)] struct MockProcessor; @@ -240,7 +241,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: time::Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index 2c7a2c7716..242e4911d0 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -14,7 +14,8 @@ use opentelemetry::time::now; use std::{ sync::{Arc, Mutex}, - thread::sleep, time::Duration, + thread::sleep, + time::Duration, }; use criterion::{criterion_group, criterion_main, Criterion}; diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 3706998e2c..867ca095bb 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -1,6 +1,3 @@ -use rand::Rng; -use std::sync::{Arc, Weak}; - use criterion::{criterion_group, criterion_main, Bencher, Criterion}; use opentelemetry::{ metrics::{Counter, Histogram, MeterProvider as _}, @@ -15,6 +12,9 @@ use opentelemetry_sdk::{ }, Resource, }; +use rand::Rng; +use std::sync::{Arc, Weak}; +use std::time::Duration; #[derive(Clone, Debug)] struct SharedReader(Arc); @@ -32,8 +32,8 @@ impl MetricReader for SharedReader { self.0.force_flush() } - fn shutdown(&self) -> OTelSdkResult { - self.0.shutdown() + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { + self.0.shutdown(timeout) } fn temporality(&self, kind: InstrumentKind) -> Temporality { diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index 3d33ca1563..679329b518 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -435,7 +435,7 @@ impl BatchLogProcessor { ¤t_batch_size, &config, ); - let _ = exporter.shutdown(Duration::from_secs(5)); + let _ = exporter.shutdown(); let _ = sender.send(result); otel_debug!( diff --git a/opentelemetry-sdk/src/logs/export.rs b/opentelemetry-sdk/src/logs/export.rs index d0b3a719b2..4074c6edd8 100644 --- a/opentelemetry-sdk/src/logs/export.rs +++ b/opentelemetry-sdk/src/logs/export.rs @@ -6,7 +6,6 @@ use crate::Resource; use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; use std::fmt::Debug; -use std::time::Duration; /// A batch of log records to be exported by a `LogExporter`. /// @@ -137,7 +136,7 @@ pub trait LogExporter: Send + Sync + Debug { ) -> impl std::future::Future + Send; /// Shuts down the exporter. - fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { + fn shutdown(&mut self) -> OTelSdkResult { Ok(()) } #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/logs/in_memory_exporter.rs index d89fb77261..d066aa4654 100644 --- a/opentelemetry-sdk/src/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/logs/in_memory_exporter.rs @@ -6,7 +6,6 @@ use opentelemetry::InstrumentationScope; use std::borrow::Cow; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; -use std::time::Duration; type LogResult = Result; @@ -212,7 +211,7 @@ impl LogExporter for InMemoryLogExporter { Ok(()) } - fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { + fn shutdown(&mut self) -> OTelSdkResult { self.shutdown_called .store(true, std::sync::atomic::Ordering::Relaxed); if self.should_reset_on_shutdown { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 04835a5242..fec758f518 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -94,7 +94,7 @@ pub(crate) mod tests { Ok(()) } - fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { + fn shutdown(&mut self) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index bb7f7fa2ea..130ece994b 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -87,7 +87,7 @@ impl LogProcessor for BatchLogProcessor { .and_then(std::convert::identity) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -546,7 +546,7 @@ mod tests { processor.emit(&mut record, &instrumentation); processor.force_flush().unwrap(); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) @@ -561,7 +561,7 @@ mod tests { runtime::TokioCurrentThread, ); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "current_thread")] @@ -578,7 +578,7 @@ mod tests { // // deadlock happens in shutdown with tokio current_thread runtime // - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "current_thread")] @@ -589,7 +589,7 @@ mod tests { BatchConfig::default(), runtime::TokioCurrentThread, ); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -597,7 +597,7 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -605,7 +605,7 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[derive(Debug)] @@ -633,7 +633,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -663,7 +663,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -808,7 +808,7 @@ mod tests { processor.emit(&mut record, &instrumentation); processor.force_flush().unwrap(); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) @@ -824,7 +824,7 @@ mod tests { // // deadlock happens in shutdown with tokio current_thread runtime // - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "current_thread")] @@ -837,7 +837,7 @@ mod tests { runtime::TokioCurrentThread, ); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -846,7 +846,7 @@ mod tests { let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -858,6 +858,6 @@ mod tests { runtime::TokioCurrentThread, ); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } } diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index 47b355685c..e6ea180bec 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -107,7 +107,7 @@ impl SdkLoggerProvider { } /// Shuts down this `LoggerProvider` with a timeout. - pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { otel_debug!( name: "LoggerProvider.ShutdownInvokedByUser", ); diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index 3651cc9270..8b74d786d7 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -115,11 +115,11 @@ impl LogProcessor for SimpleLogProcessor { Ok(()) } - fn shutdown(&self, timeout: Duration) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { self.is_shutdown .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown(timeout) + exporter.shutdown() } else { Err(OTelSdkError::InternalFailure( "SimpleLogProcessor mutex poison at shutdown".into(), diff --git a/opentelemetry-sdk/src/metrics/manual_reader.rs b/opentelemetry-sdk/src/metrics/manual_reader.rs index 9a9f8915ae..ecb1585d0c 100644 --- a/opentelemetry-sdk/src/metrics/manual_reader.rs +++ b/opentelemetry-sdk/src/metrics/manual_reader.rs @@ -1,10 +1,10 @@ +use opentelemetry::otel_debug; +use std::time::Duration; use std::{ fmt, sync::{Mutex, Weak}, }; -use opentelemetry::otel_debug; - use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{MetricError, MetricResult, Temporality}, @@ -110,7 +110,7 @@ impl MetricReader for ManualReader { } /// Closes any connections and frees any resources used by the reader. - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let mut inner = self .inner .lock() diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index 1e38f627ce..fb113e02ea 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -1,4 +1,9 @@ use core::fmt; +use opentelemetry::{ + metrics::{Meter, MeterProvider}, + otel_debug, otel_error, otel_info, InstrumentationScope, +}; +use std::time::Duration; use std::{ collections::HashMap, sync::{ @@ -7,11 +12,6 @@ use std::{ }, }; -use opentelemetry::{ - metrics::{Meter, MeterProvider}, - otel_debug, otel_error, otel_info, InstrumentationScope, -}; - use crate::error::OTelSdkResult; use crate::Resource; @@ -109,12 +109,16 @@ impl SdkMeterProvider { /// /// There is no guaranteed that all telemetry be flushed or all resources have /// been released on error. - pub fn shutdown(&self) -> OTelSdkResult { + pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { otel_debug!( name: "MeterProvider.Shutdown", message = "User initiated shutdown of MeterProvider." ); - self.inner.shutdown() + self.inner.shutdown(timeout) + } + /// Shuts down with Default timeout of 5 seconds. + pub fn shutdown(&self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_secs(5)) } } @@ -130,7 +134,7 @@ impl SdkMeterProviderInner { } } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { if self .shutdown_invoked .swap(true, std::sync::atomic::Ordering::SeqCst) @@ -138,7 +142,7 @@ impl SdkMeterProviderInner { // If the previous value was true, shutdown was already invoked. Err(crate::error::OTelSdkError::AlreadyShutdown) } else { - self.pipes.shutdown() + self.pipes.shutdown(timeout) } } } @@ -157,7 +161,7 @@ impl Drop for SdkMeterProviderInner { name: "MeterProvider.Drop", message = "Last reference of MeterProvider dropped, initiating shutdown." ); - if let Err(err) = self.shutdown() { + if let Err(err) = self.shutdown(Duration::from_secs(5)) { otel_error!( name: "MeterProvider.Drop.ShutdownFailed", message = "Shutdown attempt failed during drop of MeterProvider.", diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 1e9f5bd16f..b50737b603 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -441,7 +441,7 @@ impl PeriodicReaderInner { } } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { // TODO: See if this is better to be created upfront. let (response_tx, response_rx) = mpsc::channel(); self.message_sender @@ -449,7 +449,7 @@ impl PeriodicReaderInner { .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; // TODO: Make this timeout configurable. - match response_rx.recv_timeout(Duration::from_secs(5)) { + match response_rx.recv_timeout(timeout) { Ok(response) => { if response { Ok(()) @@ -457,9 +457,7 @@ impl PeriodicReaderInner { Err(OTelSdkError::InternalFailure("Failed to shutdown".into())) } } - Err(mpsc::RecvTimeoutError::Timeout) => { - Err(OTelSdkError::Timeout(Duration::from_secs(5))) - } + Err(mpsc::RecvTimeoutError::Timeout) => Err(OTelSdkError::Timeout(timeout)), Err(mpsc::RecvTimeoutError::Disconnected) => { Err(OTelSdkError::InternalFailure("Failed to shutdown".into())) } @@ -490,8 +488,8 @@ impl MetricReader for PeriodicReader { // completion, and avoid blocking the thread. The default shutdown on drop // can still use blocking call. If user already explicitly called shutdown, // drop won't call shutdown again. - fn shutdown(&self) -> OTelSdkResult { - self.inner.shutdown() + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { + self.inner.shutdown(_timeout) } /// To construct a [MetricReader][metric-reader] when setting up an SDK, diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 5ba1de731f..9f1b46bc0f 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -391,7 +391,7 @@ impl MetricReader for PeriodicReader { .and_then(|res| res) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let mut inner = self .inner .lock() diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index d8c9429c51..f3f28cb708 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -1,12 +1,12 @@ use core::fmt; +use opentelemetry::{otel_debug, InstrumentationScope, KeyValue}; +use std::time::Duration; use std::{ borrow::Cow, collections::{HashMap, HashSet}, sync::{Arc, Mutex}, }; -use opentelemetry::{otel_debug, InstrumentationScope, KeyValue}; - use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{ @@ -95,8 +95,8 @@ impl Pipeline { } /// Shut down pipeline - fn shutdown(&self) -> OTelSdkResult { - self.reader.shutdown() + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { + self.reader.shutdown(timeout) } } @@ -664,10 +664,10 @@ impl Pipelines { } /// Shut down all pipelines - pub(crate) fn shutdown(&self) -> OTelSdkResult { + pub(crate) fn shutdown(&self, timeout: Duration) -> OTelSdkResult { let mut errs = vec![]; for pipeline in &self.0 { - if let Err(err) = pipeline.shutdown() { + if let Err(err) = pipeline.shutdown(timeout) { errs.push(err); } } diff --git a/opentelemetry-sdk/src/metrics/reader.rs b/opentelemetry-sdk/src/metrics/reader.rs index 04710bdd41..ba4f6056f9 100644 --- a/opentelemetry-sdk/src/metrics/reader.rs +++ b/opentelemetry-sdk/src/metrics/reader.rs @@ -1,7 +1,7 @@ //! Interfaces for reading and producing metrics -use std::{fmt, sync::Weak}; - use crate::{error::OTelSdkResult, metrics::MetricResult}; +use std::time::Duration; +use std::{fmt, sync::Weak}; use super::{data::ResourceMetrics, pipeline::Pipeline, InstrumentKind, Temporality}; @@ -46,7 +46,7 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static { /// /// After `shutdown` is called, calls to `collect` will perform no operation and /// instead will return an error indicating the shutdown state. - fn shutdown(&self) -> OTelSdkResult; + fn shutdown(&self, timeout: Duration) -> OTelSdkResult; /// The output temporality, a function of instrument kind. /// This SHOULD be obtained from the exporter. diff --git a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs index 041eebfddb..ef62e41f03 100644 --- a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs +++ b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs @@ -1,10 +1,10 @@ -use std::sync::{Arc, Mutex, Weak}; - use crate::error::{OTelSdkError, OTelSdkResult}; use crate::metrics::{ data::ResourceMetrics, pipeline::Pipeline, reader::MetricReader, InstrumentKind, }; use crate::metrics::{MetricResult, Temporality}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Duration; #[derive(Debug, Clone)] pub struct TestMetricReader { @@ -42,7 +42,7 @@ impl MetricReader for TestMetricReader { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let result = self.force_flush(); { let mut is_shutdown = self.is_shutdown.lock().unwrap(); diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 6958070861..518dbfe2a3 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,6 +1,5 @@ use chrono::{DateTime, Utc}; use core::fmt; -use std::time::Duration; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::logs::LogBatch; use opentelemetry_sdk::Resource; @@ -58,7 +57,7 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter { } } - fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult { + fn shutdown(&mut self) -> OTelSdkResult { self.is_shutdown.store(true, atomic::Ordering::SeqCst); Ok(()) } From 85b53d0aa7019fda6f68b6631402aa7dcd67eb3d Mon Sep 17 00:00:00 2001 From: mohammadVatandoost Date: Tue, 11 Mar 2025 23:04:36 +0100 Subject: [PATCH 4/4] fix lint --- opentelemetry-appender-tracing/benches/log-attributes.rs | 4 ++-- opentelemetry-appender-tracing/src/layer.rs | 3 ++- opentelemetry-sdk/src/logs/mod.rs | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/log-attributes.rs b/opentelemetry-appender-tracing/benches/log-attributes.rs index 397fca0443..ec344d2104 100644 --- a/opentelemetry-appender-tracing/benches/log-attributes.rs +++ b/opentelemetry-appender-tracing/benches/log-attributes.rs @@ -21,7 +21,6 @@ | otel_11_attributes | 625 ns | +106 ns | // vec! initial capacity is 5. 11th attribute causes vec! to be reallocated | otel_12_attributes | 676 ns | +51 ns | */ - use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer as tracing_layer; @@ -30,6 +29,7 @@ use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}; use opentelemetry_sdk::Resource; #[cfg(not(target_os = "windows"))] use pprof::criterion::{Output, PProfProfiler}; +use std::time::Duration; use tracing::error; use tracing_subscriber::prelude::*; use tracing_subscriber::Registry; @@ -44,7 +44,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 6a15bc9dba..b61d86fb75 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -232,6 +232,7 @@ mod tests { use opentelemetry_sdk::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{SdkLogRecord, SdkLoggerProvider}; use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider}; + use std::time::Duration; use tracing::{error, warn}; use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -821,7 +822,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 0da96bb730..6ff036aa64 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -42,6 +42,7 @@ mod tests { use opentelemetry::{Context, InstrumentationScope}; use std::borrow::Borrow; use std::collections::HashMap; + use std::time::Duration; #[test] fn logging_sdk_test() { @@ -167,7 +168,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> crate::error::OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> crate::error::OTelSdkResult { Ok(()) } }