Skip to content

Commit 800c660

Browse files
committed
Prototype measurement processor
1 parent ad88615 commit 800c660

File tree

6 files changed

+127
-26
lines changed

6 files changed

+127
-26
lines changed

examples/metrics-advanced/src/main.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use opentelemetry::global;
1+
use std::borrow::Cow;
2+
use opentelemetry::{global, Context};
23
use opentelemetry::Key;
34
use opentelemetry::KeyValue;
4-
use opentelemetry_sdk::metrics::{Aggregation, Instrument, SdkMeterProvider, Stream, Temporality};
5+
use opentelemetry_sdk::metrics::{Aggregation, Instrument, MeasurementProcessor, SdkMeterProvider, Stream, Temporality};
56
use opentelemetry_sdk::Resource;
67
use std::error::Error;
78

@@ -57,6 +58,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
5758
.with_view(my_view_rename_and_unit)
5859
.with_view(my_view_drop_attributes)
5960
.with_view(my_view_change_aggregation)
61+
.with_measurement_processor(UserTypeMeasurementProcessor)
6062
.build();
6163
global::set_meter_provider(provider.clone());
6264
provider
@@ -128,6 +130,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
128130
],
129131
);
130132

133+
// Enrich the next measurement with user type
134+
let guard = Context::current_with_value(UserType::Admin).attach();
131135
histogram2.record(
132136
1.2,
133137
&[
@@ -137,6 +141,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
137141
KeyValue::new("mykey4", "myvalue4"),
138142
],
139143
);
144+
drop(guard);
140145

141146
histogram2.record(
142147
1.23,
@@ -154,3 +159,36 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
154159
meter_provider.shutdown()?;
155160
Ok(())
156161
}
162+
163+
164+
165+
enum UserType {
166+
User,
167+
Admin,
168+
}
169+
170+
impl UserType {
171+
fn as_str(&self) -> &'static str {
172+
match self {
173+
UserType::User => "user",
174+
UserType::Admin => "admin",
175+
}
176+
}
177+
}
178+
179+
struct UserTypeMeasurementProcessor;
180+
181+
impl MeasurementProcessor for UserTypeMeasurementProcessor {
182+
fn process<'a>(&self, attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]> {
183+
match Context::current().get::<UserType>() {
184+
Some(user_type) => {
185+
let mut attrs = attributes.to_vec();
186+
attrs.push(KeyValue::new("user_type", user_type.as_str()));
187+
Cow::Owned(attrs)
188+
}
189+
190+
// No changes to the attributes
191+
None => attributes,
192+
}
193+
}
194+
}

opentelemetry-sdk/src/metrics/internal/aggregate.rs

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use std::{
55
sync::{Arc, Mutex},
66
time::SystemTime,
77
};
8-
8+
use std::borrow::Cow;
99
use crate::metrics::{data::Aggregation, Temporality};
1010
use opentelemetry::time::now;
1111
use opentelemetry::KeyValue;
12-
12+
use crate::metrics::measurement_processor::MeasurementProcessor;
1313
use super::{
1414
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
1515
precomputed_sum::PrecomputedSum, sum::Sum, Number,
@@ -106,21 +106,41 @@ type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
106106
#[derive(Clone)]
107107
pub(crate) struct AttributeSetFilter {
108108
filter: Option<Filter>,
109+
processor: Option<AggregateProcessor>
109110
}
110111

111112
impl AttributeSetFilter {
112-
pub(crate) fn new(filter: Option<Filter>) -> Self {
113-
Self { filter }
113+
pub(crate) fn new(filter: Option<Filter>, processors: Vec<Arc<dyn MeasurementProcessor>>) -> Self {
114+
Self { filter, processor: AggregateProcessor::try_create(processors) }
114115
}
115116

116117
pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
117-
if let Some(filter) = &self.filter {
118-
let filtered_attrs: Vec<KeyValue> =
119-
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
120-
run(&filtered_attrs);
121-
} else {
122-
run(attrs);
123-
};
118+
match (&self.filter, &self.processor) {
119+
(None, None) => {
120+
run(attrs);
121+
},
122+
(Some(filter), None) => {
123+
let filtered_attrs: Vec<KeyValue> =
124+
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
125+
126+
run(&filtered_attrs);
127+
},
128+
(None, Some(processor)) => {
129+
let attributes = Cow::Borrowed(attrs);
130+
let attributes = processor.process(attributes);
131+
132+
run(attributes.as_ref());
133+
},
134+
(Some(filter), Some(processor)) => {
135+
let filtered_attrs: Vec<KeyValue> =
136+
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
137+
138+
let attributes = Cow::Owned(filtered_attrs);
139+
let attributes = processor.process(attributes);
140+
141+
run(attributes.as_ref());
142+
}
143+
}
124144
}
125145
}
126146

@@ -137,10 +157,10 @@ pub(crate) struct AggregateBuilder<T> {
137157
}
138158

139159
impl<T: Number> AggregateBuilder<T> {
140-
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
160+
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>, processors: Vec<Arc<dyn MeasurementProcessor>>) -> Self {
141161
AggregateBuilder {
142162
temporality,
143-
filter: AttributeSetFilter::new(filter),
163+
filter: AttributeSetFilter::new(filter, processors),
144164
_marker: marker::PhantomData,
145165
}
146166
}
@@ -201,6 +221,31 @@ impl<T: Number> AggregateBuilder<T> {
201221
}
202222
}
203223

224+
225+
#[derive(Clone)]
226+
struct AggregateProcessor(Arc<Vec<Arc<dyn MeasurementProcessor>>>);
227+
228+
impl AggregateProcessor {
229+
pub fn try_create(
230+
processors: Vec<Arc<dyn MeasurementProcessor>>,
231+
) -> Option<Self> {
232+
if processors.is_empty() {
233+
return None;
234+
}
235+
Some(Self(Arc::new(processors)))
236+
}
237+
}
238+
239+
impl MeasurementProcessor for AggregateProcessor {
240+
fn process<'a>(&self, mut attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]> {
241+
for processor in self.0.iter() {
242+
attributes = processor.process(attributes);
243+
}
244+
245+
attributes
246+
}
247+
}
248+
204249
#[cfg(test)]
205250
mod tests {
206251
use crate::metrics::data::{
@@ -214,7 +259,7 @@ mod tests {
214259
#[test]
215260
fn last_value_aggregation() {
216261
let AggregateFns { measure, collect } =
217-
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
262+
AggregateBuilder::<u64>::new(Temporality::Cumulative, None, vec![]).last_value(None);
218263
let mut a = Gauge {
219264
data_points: vec![GaugeDataPoint {
220265
attributes: vec![KeyValue::new("a", 1)],
@@ -240,7 +285,7 @@ mod tests {
240285
fn precomputed_sum_aggregation() {
241286
for temporality in [Temporality::Delta, Temporality::Cumulative] {
242287
let AggregateFns { measure, collect } =
243-
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
288+
AggregateBuilder::<u64>::new(temporality, None, vec![]).precomputed_sum(true);
244289
let mut a = Sum {
245290
data_points: vec![
246291
SumDataPoint {
@@ -282,7 +327,7 @@ mod tests {
282327
fn sum_aggregation() {
283328
for temporality in [Temporality::Delta, Temporality::Cumulative] {
284329
let AggregateFns { measure, collect } =
285-
AggregateBuilder::<u64>::new(temporality, None).sum(true);
330+
AggregateBuilder::<u64>::new(temporality, None, vec![]).sum(true);
286331
let mut a = Sum {
287332
data_points: vec![
288333
SumDataPoint {
@@ -323,7 +368,7 @@ mod tests {
323368
#[test]
324369
fn explicit_bucket_histogram_aggregation() {
325370
for temporality in [Temporality::Delta, Temporality::Cumulative] {
326-
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
371+
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None, vec![])
327372
.explicit_bucket_histogram(vec![1.0], true, true);
328373
let mut a = Histogram {
329374
data_points: vec![HistogramDataPoint {
@@ -366,7 +411,7 @@ mod tests {
366411
#[test]
367412
fn exponential_histogram_aggregation() {
368413
for temporality in [Temporality::Delta, Temporality::Cumulative] {
369-
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
414+
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None, vec![])
370415
.exponential_bucket_histogram(4, 20, true, true);
371416
let mut a = ExponentialHistogram {
372417
data_points: vec![ExponentialHistogramDataPoint {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use opentelemetry::KeyValue;
2+
use std::borrow::Cow;
3+
4+
pub trait MeasurementProcessor: Send + Sync + 'static {
5+
fn process<'a>(&self, attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]>;
6+
}
7+

opentelemetry-sdk/src/metrics/meter_provider.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ use opentelemetry::{
1515
use crate::error::OTelSdkResult;
1616
use crate::Resource;
1717

18-
use super::{
19-
exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
20-
reader::MetricReader, view::View, PeriodicReader,
21-
};
18+
use super::{exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View, MeasurementProcessor, PeriodicReader};
2219

2320
/// Handles the creation and coordination of [Meter]s.
2421
///
@@ -223,6 +220,7 @@ pub struct MeterProviderBuilder {
223220
resource: Option<Resource>,
224221
readers: Vec<Box<dyn MetricReader>>,
225222
views: Vec<Arc<dyn View>>,
223+
measurement_processors: Vec<Arc<dyn MeasurementProcessor>>,
226224
}
227225

228226
impl MeterProviderBuilder {
@@ -291,6 +289,11 @@ impl MeterProviderBuilder {
291289
self
292290
}
293291

292+
pub fn with_measurement_processor<T: MeasurementProcessor>(mut self, processor: T) -> Self {
293+
self.measurement_processors.push(Arc::new(processor));
294+
self
295+
}
296+
294297
/// Construct a new [MeterProvider] with this configuration.
295298
pub fn build(self) -> SdkMeterProvider {
296299
otel_debug!(
@@ -304,6 +307,7 @@ impl MeterProviderBuilder {
304307
self.resource.unwrap_or(Resource::builder().build()),
305308
self.readers,
306309
self.views,
310+
self.measurement_processors,
307311
)),
308312
meters: Default::default(),
309313
shutdown_invoked: AtomicBool::new(false),

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ pub(crate) mod view;
6161
#[cfg(any(feature = "testing", test))]
6262
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
6363
pub mod in_memory_exporter;
64+
65+
mod measurement_processor;
66+
pub use measurement_processor::MeasurementProcessor;
67+
6468
#[cfg(any(feature = "testing", test))]
6569
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
6670
pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323

2424
use self::internal::AggregateFns;
2525

26-
use super::{Aggregation, Temporality};
26+
use super::{Aggregation, MeasurementProcessor, Temporality};
2727

2828
/// Connects all of the instruments created by a meter provider to a [MetricReader].
2929
///
@@ -39,6 +39,7 @@ pub struct Pipeline {
3939
reader: Box<dyn MetricReader>,
4040
views: Vec<Arc<dyn View>>,
4141
inner: Mutex<PipelineInner>,
42+
processors: Vec<Arc<dyn MeasurementProcessor>>,
4243
}
4344

4445
impl fmt::Debug for Pipeline {
@@ -385,7 +386,7 @@ where
385386
.clone()
386387
.map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);
387388

388-
let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter);
389+
let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter, self.pipeline.processors.clone());
389390
let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
390391
Ok(Some(inst)) => inst,
391392
other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error
@@ -621,6 +622,7 @@ impl Pipelines {
621622
res: Resource,
622623
readers: Vec<Box<dyn MetricReader>>,
623624
views: Vec<Arc<dyn View>>,
625+
processors: Vec<Arc<dyn MeasurementProcessor>>,
624626
) -> Self {
625627
let mut pipes = Vec::with_capacity(readers.len());
626628
for r in readers {
@@ -629,6 +631,7 @@ impl Pipelines {
629631
reader: r,
630632
views: views.clone(),
631633
inner: Default::default(),
634+
processors: processors.clone(),
632635
});
633636
p.reader.register_pipeline(Arc::downgrade(&p));
634637
pipes.push(p);

0 commit comments

Comments
 (0)