Skip to content

Commit ca7582f

Browse files
committed
Change the signature of MeasurementProcessor to return Option
1 parent 09568f7 commit ca7582f

File tree

4 files changed

+52
-29
lines changed

4 files changed

+52
-29
lines changed

examples/metrics-advanced/src/main.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::borrow::Cow;
21
use opentelemetry::{global, Context};
32
use opentelemetry::Key;
43
use opentelemetry::KeyValue;
@@ -179,16 +178,11 @@ impl UserType {
179178
struct UserTypeMeasurementProcessor;
180179

181180
impl MeasurementProcessor for UserTypeMeasurementProcessor {
182-
fn process<'a>(&self, attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]> {
183-
match Context::current().get::<UserType>() {
184-
Some(user_type) => {
185-
let mut attrs = attributes.to_vec();
186-
attrs.push(KeyValue::new("user_type", user_type.as_str()));
187-
Cow::Owned(attrs)
188-
}
189-
190-
// No changes to the attributes
191-
None => attributes,
192-
}
181+
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>> {
182+
Context::current().get::<UserType>().map(|user_type| {
183+
let mut attrs = attributes.to_vec();
184+
attrs.push(KeyValue::new("user_type", user_type.as_str()));
185+
attrs
186+
})
193187
}
194188
}

opentelemetry-sdk/src/metrics/internal/aggregate.rs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
106106
#[derive(Clone)]
107107
pub(crate) struct AttributeSetFilter {
108108
filter: Option<Filter>,
109-
processor: Option<AggregateProcessor>
109+
processor: Option<Arc<dyn MeasurementProcessor>>
110110
}
111111

112112
impl AttributeSetFilter {
@@ -127,18 +127,28 @@ impl AttributeSetFilter {
127127
},
128128
(None, Some(processor)) => {
129129
let attributes = Cow::Borrowed(attrs);
130-
let attributes = processor.process(attributes);
131130

132-
run(attributes.as_ref());
131+
match processor.process(&attributes) {
132+
Some(attributes) => {
133+
run(&attributes);
134+
}
135+
None => {
136+
run(attrs);
137+
}
138+
}
133139
},
134140
(Some(filter), Some(processor)) => {
135141
let filtered_attrs: Vec<KeyValue> =
136142
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
137143

138-
let attributes = Cow::Owned(filtered_attrs);
139-
let attributes = processor.process(attributes);
140-
141-
run(attributes.as_ref());
144+
match processor.process(&filtered_attrs) {
145+
Some(attributes) => {
146+
run(&attributes);
147+
}
148+
None => {
149+
run(attrs);
150+
}
151+
}
142152
}
143153
}
144154
}
@@ -226,23 +236,35 @@ impl<T: Number> AggregateBuilder<T> {
226236
struct AggregateProcessor(Arc<Vec<Arc<dyn MeasurementProcessor>>>);
227237

228238
impl AggregateProcessor {
229-
pub fn try_create(
239+
fn try_create(
230240
processors: Vec<Arc<dyn MeasurementProcessor>>,
231-
) -> Option<Self> {
232-
if processors.is_empty() {
233-
return None;
241+
) -> Option<Arc<dyn MeasurementProcessor>> {
242+
243+
match processors.len() {
244+
0 => return None,
245+
1 => Some(processors[0].clone()),
246+
_ => Some(Arc::new(AggregateProcessor(Arc::new(processors)))),
234247
}
235-
Some(Self(Arc::new(processors)))
236248
}
237249
}
238250

239251
impl MeasurementProcessor for AggregateProcessor {
240-
fn process<'a>(&self, mut attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]> {
252+
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>> {
253+
// Do not allocate if not necessary.
254+
let mut new_attributes: Option<Vec<KeyValue>> = None;
255+
241256
for processor in self.0.iter() {
242-
attributes = processor.process(attributes);
257+
let existing_or_new = match &new_attributes {
258+
Some(new) => new,
259+
None => attributes
260+
};
261+
262+
if let Some(new) = processor.process(existing_or_new) {
263+
new_attributes = Some(new);
264+
}
243265
}
244266

245-
attributes
267+
new_attributes
246268
}
247269
}
248270

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
use opentelemetry::KeyValue;
2-
use std::borrow::Cow;
32

3+
/// A trait for processing measurement attributes.
44
pub trait MeasurementProcessor: Send + Sync + 'static {
5-
fn process<'a>(&self, attributes: Cow<'a, [KeyValue]>) -> Cow<'a, [KeyValue]>;
5+
6+
/// Processes the attributes of a measurement.
7+
///
8+
/// The processor might decide to modify the attributes. In that case, it returns
9+
/// `Some` with the modified attributes. If no attribute modification is needed,
10+
/// it returns `None`.
11+
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>>;
612
}
713

opentelemetry-sdk/src/metrics/meter_provider.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ impl MeterProviderBuilder {
289289
self
290290
}
291291

292+
/// Associates a [MeasurementProcessor] with a [MeterProvider].
292293
pub fn with_measurement_processor<T: MeasurementProcessor>(mut self, processor: T) -> Self {
293294
self.measurement_processors.push(Arc::new(processor));
294295
self

0 commit comments

Comments
 (0)