diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 6ba2f705d4..6796105b85 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -4,6 +4,8 @@ - TODO: Placeholder for Span processor related things - *Fix* SpanProcessor::on_start is no longer called on non recording spans +- Revert removal of `MetricProducer` which allowed metrics from + external sources to be sent through OpenTelemetry. ## 0.30.0 diff --git a/opentelemetry-sdk/src/metrics/manual_reader.rs b/opentelemetry-sdk/src/metrics/manual_reader.rs index 3293238109..7adb15176d 100644 --- a/opentelemetry-sdk/src/metrics/manual_reader.rs +++ b/opentelemetry-sdk/src/metrics/manual_reader.rs @@ -13,7 +13,7 @@ use crate::{ use super::{ data::ResourceMetrics, pipeline::Pipeline, - reader::{MetricReader, SdkProducer}, + reader::{MetricProducer, MetricReader, SdkProducer}, }; /// A simple [MetricReader] that allows an application to read metrics on demand. @@ -50,6 +50,7 @@ impl fmt::Debug for ManualReader { struct ManualReaderInner { sdk_producer: Option>, is_shutdown: bool, + external_producers: Vec>, } impl ManualReader { @@ -59,11 +60,15 @@ impl ManualReader { } /// A [MetricReader] which is directly called to collect metrics. - pub(crate) fn new(temporality: Temporality) -> Self { + pub(crate) fn new( + temporality: Temporality, + external_producers: Vec>, + ) -> Self { ManualReader { inner: Mutex::new(ManualReaderInner { sdk_producer: None, is_shutdown: false, + external_producers, }), temporality, } @@ -86,7 +91,7 @@ impl MetricReader for ManualReader { }); } - /// Gathers all metrics from the SDK, calling any + /// Gathers all metrics from the SDK and other [MetricProducer]s, calling any /// callbacks necessary and returning the results. /// /// Returns an error if called after shutdown. @@ -105,7 +110,19 @@ impl MetricReader for ManualReader { } }; - Ok(()) + let mut errs = vec![]; + for producer in &inner.external_producers { + match producer.produce() { + Ok(metrics) => rm.scope_metrics.push(metrics), + Err(err) => errs.push(err), + } + } + + if errs.is_empty() { + Ok(()) + } else { + Err(OTelSdkError::InternalFailure(format!("{errs:?}"))) + } } /// ForceFlush is a no-op, it always returns nil. @@ -123,6 +140,7 @@ impl MetricReader for ManualReader { // Any future call to collect will now return an error. inner.sdk_producer = None; inner.is_shutdown = true; + inner.external_producers = Vec::new(); Ok(()) } @@ -136,6 +154,7 @@ impl MetricReader for ManualReader { #[derive(Default)] pub struct ManualReaderBuilder { temporality: Temporality, + producers: Vec>, } impl fmt::Debug for ManualReaderBuilder { @@ -156,8 +175,17 @@ impl ManualReaderBuilder { self } + /// Registers a an external [MetricProducer] with this reader. + /// + /// The producer is used as a source of aggregated metric data which is + /// incorporated into metrics collected from the SDK. + pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self { + self.producers.push(Box::new(producer)); + self + } + /// Create a new [ManualReader] from this configuration. pub fn build(self) -> ManualReader { - ManualReader::new(self.temporality) + ManualReader::new(self.temporality, self.producers) } } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 70e78f9253..85d0505535 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -12,7 +12,7 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context}; use crate::{ error::{OTelSdkError, OTelSdkResult}, - metrics::{exporter::PushMetricExporter, reader::SdkProducer}, + metrics::{exporter::PushMetricExporter, reader::MetricProducer, reader::SdkProducer}, Resource, }; @@ -30,6 +30,7 @@ const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL"; pub struct PeriodicReaderBuilder { interval: Duration, exporter: E, + producers: Vec>, } impl PeriodicReaderBuilder @@ -42,7 +43,11 @@ where .and_then(|v| v.parse().map(Duration::from_millis).ok()) .unwrap_or(DEFAULT_INTERVAL); - PeriodicReaderBuilder { interval, exporter } + PeriodicReaderBuilder { + interval, + exporter, + producers: Vec::new(), + } } /// Configures the intervening time between exports for a [PeriodicReader]. @@ -59,6 +64,15 @@ where self } + /// Registers a an external [MetricProducer] with this reader. + /// + /// The producer is used as a source of aggregated metric data which is + /// incorporated into metrics collected from the SDK. + pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self { + self.producers.push(Box::new(producer)); + self + } + /// Create a [PeriodicReader] with the given config. pub fn build(self) -> PeriodicReader { PeriodicReader::new(self.exporter, self.interval) @@ -152,6 +166,7 @@ impl PeriodicReader { message_sender, producer: Mutex::new(None), exporter: exporter_arc.clone(), + external_producers: Vec::new(), }), }; let cloned_reader = reader.clone(); @@ -351,6 +366,7 @@ struct PeriodicReaderInner { exporter: Arc, message_sender: mpsc::Sender, producer: Mutex>>, + external_producers: Vec>, } impl PeriodicReaderInner { @@ -364,12 +380,12 @@ impl PeriodicReaderInner { } fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { + let mut errs = vec![]; let producer = self.producer.lock().expect("lock poisoned"); if let Some(p) = producer.as_ref() { p.upgrade() .ok_or(OTelSdkError::AlreadyShutdown)? .produce(rm)?; - Ok(()) } else { otel_warn!( name: "PeriodReader.MeterProviderNotRegistered", @@ -377,10 +393,23 @@ impl PeriodicReaderInner { This occurs when a periodic reader is created but not associated with a MeterProvider \ by calling `.with_reader(reader)` on MeterProviderBuilder." ); - Err(OTelSdkError::InternalFailure( + errs.push(OTelSdkError::InternalFailure( "MeterProvider is not registered".into(), )) } + + for producer in &self.external_producers { + match producer.produce() { + Ok(metrics) => rm.scope_metrics.push(metrics), + Err(err) => errs.push(err), + } + } + + if errs.is_empty() { + Ok(()) + } else { + Err(OTelSdkError::InternalFailure(format!("{errs:?}"))) + } } fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { diff --git a/opentelemetry-sdk/src/metrics/reader.rs b/opentelemetry-sdk/src/metrics/reader.rs index ae19841155..c492b3a0e5 100644 --- a/opentelemetry-sdk/src/metrics/reader.rs +++ b/opentelemetry-sdk/src/metrics/reader.rs @@ -1,9 +1,14 @@ //! Interfaces for reading and producing metrics -use crate::error::OTelSdkResult; +use crate::error::{OTelSdkError, OTelSdkResult}; use std::time::Duration; use std::{fmt, sync::Weak}; -use super::{data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, Temporality}; +use super::{ + data::{ResourceMetrics, ScopeMetrics}, + instrument::InstrumentKind, + pipeline::Pipeline, + Temporality, +}; /// The interface used between the SDK and an exporter. /// @@ -65,3 +70,9 @@ pub(crate) trait SdkProducer: fmt::Debug + Send + Sync { /// Returns aggregated metrics from a single collection. fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult; } + +/// Produces metrics for a [MetricReader] from an external source. +pub trait MetricProducer: fmt::Debug + Send + Sync { + /// Returns aggregated metrics from an external source. + fn produce(&self) -> Result; +}