Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions examples/metrics-advanced/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use opentelemetry::global;
use std::borrow::Cow;
use opentelemetry::{global, Context};
use opentelemetry::Key;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::{Aggregation, Instrument, SdkMeterProvider, Stream, Temporality};
use opentelemetry_sdk::metrics::{Aggregation, Instrument, MeasurementProcessor, SdkMeterProvider, Stream, Temporality};
use opentelemetry_sdk::Resource;
use std::error::Error;

Expand Down Expand Up @@ -57,6 +58,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
.with_view(my_view_rename_and_unit)
.with_view(my_view_drop_attributes)
.with_view(my_view_change_aggregation)
.with_measurement_processor(UserTypeMeasurementProcessor)
.build();
global::set_meter_provider(provider.clone());
provider
Expand Down Expand Up @@ -128,6 +130,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
],
);

// Enrich the next measurement with user type
let guard = Context::current_with_value(UserType::Admin).attach();
histogram2.record(
1.2,
&[
Expand All @@ -137,6 +141,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
KeyValue::new("mykey4", "myvalue4"),
],
);
drop(guard);

histogram2.record(
1.23,
Expand All @@ -154,3 +159,36 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
meter_provider.shutdown()?;
Ok(())
}



enum UserType {
User,
Admin,
}

impl UserType {
fn as_str(&self) -> &'static str {
match self {
UserType::User => "user",
UserType::Admin => "admin",
}
}
}

struct UserTypeMeasurementProcessor;

impl MeasurementProcessor for UserTypeMeasurementProcessor {
fn process<'a>(&self, attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the trait be modified to accept slice of attributes, and return Option ? If processor has no interest in updating, it can return None, at which point the caller uses the original slice itself.
If processor wants to change, it'll create a new Vec! and return it.
Cost is paid only when one wants to enrich..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the trait with this suggestion. Looks nicer indeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: One big advantage of using Cow is the re-use of owned Vec in case multiple processors are modifying the attributes:

impl MeasurementProcessor for UserTypeMeasurementProcessor {
    fn process<'a>(&self, attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]> {
        match Context::current().get::<UserType>() {
            Some(user_type) => {
                let mut attrs = attributes.into_owned();
                attrs.push(KeyValue::new("user_type", user_type.as_str()));
                Cow::Owned(attrs)
            }

            // No changes to the attributes
            None => attributes,
        }
    }
}

The into_owned will either:

  • Return already allocated vector: Cow::Owned
  • Create a new vector in case the Cow contained the borrowed data.

match Context::current().get::<UserType>() {
Some(user_type) => {
let mut attrs = attributes.to_vec();
attrs.push(KeyValue::new("user_type", user_type.as_str()));
Cow::Owned(attrs)
}

// No changes to the attributes
None => attributes,
}
}
}
81 changes: 63 additions & 18 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::{
sync::{Arc, Mutex},
time::SystemTime,
};

use std::borrow::Cow;
use crate::metrics::{data::Aggregation, Temporality};
use opentelemetry::time::now;
use opentelemetry::KeyValue;

use crate::metrics::measurement_processor::MeasurementProcessor;
use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
Expand Down Expand Up @@ -106,21 +106,41 @@ type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
#[derive(Clone)]
pub(crate) struct AttributeSetFilter {
filter: Option<Filter>,
processor: Option<AggregateProcessor>
}

impl AttributeSetFilter {
pub(crate) fn new(filter: Option<Filter>) -> Self {
Self { filter }
pub(crate) fn new(filter: Option<Filter>, processors: Vec<Arc<dyn MeasurementProcessor>>) -> Self {
Self { filter, processor: AggregateProcessor::try_create(processors) }
}

pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
if let Some(filter) = &self.filter {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
run(&filtered_attrs);
} else {
run(attrs);
};
match (&self.filter, &self.processor) {
(None, None) => {
run(attrs);
},
(Some(filter), None) => {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();

run(&filtered_attrs);
},
(None, Some(processor)) => {
let attributes = Cow::Borrowed(attrs);
let attributes = processor.process(attributes);

run(attributes.as_ref());
},
(Some(filter), Some(processor)) => {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();

let attributes = Cow::Owned(filtered_attrs);
let attributes = processor.process(attributes);

run(attributes.as_ref());
}
}
}
}

Expand All @@ -137,10 +157,10 @@ pub(crate) struct AggregateBuilder<T> {
}

impl<T: Number> AggregateBuilder<T> {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>, processors: Vec<Arc<dyn MeasurementProcessor>>) -> Self {
AggregateBuilder {
temporality,
filter: AttributeSetFilter::new(filter),
filter: AttributeSetFilter::new(filter, processors),
_marker: marker::PhantomData,
}
}
Expand Down Expand Up @@ -201,6 +221,31 @@ impl<T: Number> AggregateBuilder<T> {
}
}


#[derive(Clone)]
struct AggregateProcessor(Arc<Vec<Arc<dyn MeasurementProcessor>>>);

impl AggregateProcessor {
pub fn try_create(
processors: Vec<Arc<dyn MeasurementProcessor>>,
) -> Option<Self> {
if processors.is_empty() {
return None;
}
Some(Self(Arc::new(processors)))
}
}

impl MeasurementProcessor for AggregateProcessor {
fn process<'a>(&self, mut attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]> {
for processor in self.0.iter() {
attributes = processor.process(attributes);
}

attributes
}
}

#[cfg(test)]
mod tests {
use crate::metrics::data::{
Expand All @@ -214,7 +259,7 @@ mod tests {
#[test]
fn last_value_aggregation() {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
AggregateBuilder::<u64>::new(Temporality::Cumulative, None, vec![]).last_value(None);
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
Expand All @@ -240,7 +285,7 @@ mod tests {
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
AggregateBuilder::<u64>::new(temporality, None, vec![]).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -282,7 +327,7 @@ mod tests {
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).sum(true);
AggregateBuilder::<u64>::new(temporality, None, vec![]).sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -323,7 +368,7 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None, vec![])
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
Expand Down Expand Up @@ -366,7 +411,7 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None, vec![])
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
Expand Down
7 changes: 7 additions & 0 deletions opentelemetry-sdk/src/metrics/measurement_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use opentelemetry::KeyValue;
use std::borrow::Cow;

pub trait MeasurementProcessor: Send + Sync + 'static {
fn process<'a>(&self, attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]>;
}

12 changes: 8 additions & 4 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use opentelemetry::{
use crate::error::OTelSdkResult;
use crate::Resource;

use super::{
exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
reader::MetricReader, view::View, PeriodicReader,
};
use super::{exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View, MeasurementProcessor, PeriodicReader};

/// Handles the creation and coordination of [Meter]s.
///
Expand Down Expand Up @@ -223,6 +220,7 @@ pub struct MeterProviderBuilder {
resource: Option<Resource>,
readers: Vec<Box<dyn MetricReader>>,
views: Vec<Arc<dyn View>>,
measurement_processors: Vec<Arc<dyn MeasurementProcessor>>,
}

impl MeterProviderBuilder {
Expand Down Expand Up @@ -291,6 +289,11 @@ impl MeterProviderBuilder {
self
}

pub fn with_measurement_processor<T: MeasurementProcessor>(mut self, processor: T) -> Self {
self.measurement_processors.push(Arc::new(processor));
self
}

/// Construct a new [MeterProvider] with this configuration.
pub fn build(self) -> SdkMeterProvider {
otel_debug!(
Expand All @@ -304,6 +307,7 @@ impl MeterProviderBuilder {
self.resource.unwrap_or(Resource::builder().build()),
self.readers,
self.views,
self.measurement_processors,
)),
meters: Default::default(),
shutdown_invoked: AtomicBool::new(false),
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ pub(crate) mod view;
#[cfg(any(feature = "testing", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
pub mod in_memory_exporter;

mod measurement_processor;
pub use measurement_processor::MeasurementProcessor;

#[cfg(any(feature = "testing", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
Expand Down
7 changes: 5 additions & 2 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{

use self::internal::AggregateFns;

use super::{Aggregation, Temporality};
use super::{Aggregation, MeasurementProcessor, Temporality};

/// Connects all of the instruments created by a meter provider to a [MetricReader].
///
Expand All @@ -39,6 +39,7 @@ pub struct Pipeline {
reader: Box<dyn MetricReader>,
views: Vec<Arc<dyn View>>,
inner: Mutex<PipelineInner>,
processors: Vec<Arc<dyn MeasurementProcessor>>,
}

impl fmt::Debug for Pipeline {
Expand Down Expand Up @@ -385,7 +386,7 @@ where
.clone()
.map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);

let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter);
let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter, self.pipeline.processors.clone());
let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
Ok(Some(inst)) => inst,
other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error
Expand Down Expand Up @@ -621,6 +622,7 @@ impl Pipelines {
res: Resource,
readers: Vec<Box<dyn MetricReader>>,
views: Vec<Arc<dyn View>>,
processors: Vec<Arc<dyn MeasurementProcessor>>,
) -> Self {
let mut pipes = Vec::with_capacity(readers.len());
for r in readers {
Expand All @@ -629,6 +631,7 @@ impl Pipelines {
reader: r,
views: views.clone(),
inner: Default::default(),
processors: processors.clone(),
});
p.reader.register_pipeline(Arc::downgrade(&p));
pipes.push(p);
Expand Down
Loading