Skip to content

Commit 68f4806

Browse files
committed
Update metrics
1 parent 82ed8e0 commit 68f4806

File tree

4 files changed

+39
-8
lines changed

4 files changed

+39
-8
lines changed

opentelemetry-sdk/src/metrics/instrument.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,20 @@ impl InstrumentId {
278278
}
279279
}
280280

281+
pub(crate) trait SyncInstrumentUpdate: Send + Sync {}
282+
283+
impl<T> SyncInstrumentUpdate for ResolvedMeasures<T> {}
284+
281285
pub(crate) struct ResolvedMeasures<T> {
282286
pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
283287
}
284288

289+
impl<T: 'static> ResolvedMeasures<T> {
290+
pub(crate) fn as_sync_instrument_update(self: Arc<Self>) -> Arc<dyn SyncInstrumentUpdate> {
291+
self
292+
}
293+
}
294+
285295
impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
286296
fn measure(&self, val: T, attrs: &[KeyValue]) {
287297
for measure in &self.measures {

opentelemetry-sdk/src/metrics/meter.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use core::fmt;
2-
use std::{borrow::Cow, sync::Arc};
2+
use std::{
3+
borrow::Cow,
4+
sync::{Arc, Mutex},
5+
};
36

47
use opentelemetry::{
58
metrics::{
@@ -11,7 +14,7 @@ use opentelemetry::{
1114
};
1215

1316
use crate::metrics::{
14-
instrument::{Instrument, InstrumentKind, Observable, ResolvedMeasures},
17+
instrument::{Instrument, InstrumentKind, Observable, ResolvedMeasures, SyncInstrumentUpdate},
1518
internal::{self, Number},
1619
pipeline::{Pipelines, Resolver},
1720
MetricError, MetricResult,
@@ -50,6 +53,7 @@ pub(crate) struct SdkMeter {
5053
u64_resolver: Resolver<u64>,
5154
i64_resolver: Resolver<i64>,
5255
f64_resolver: Resolver<f64>,
56+
pub(crate) sync_instruments: Mutex<Vec<Arc<dyn SyncInstrumentUpdate>>>,
5357
}
5458

5559
impl SdkMeter {
@@ -62,6 +66,7 @@ impl SdkMeter {
6266
u64_resolver: Resolver::new(Arc::clone(&pipes), Arc::clone(&view_cache)),
6367
i64_resolver: Resolver::new(Arc::clone(&pipes), Arc::clone(&view_cache)),
6468
f64_resolver: Resolver::new(pipes, view_cache),
69+
sync_instruments: Mutex::new(Vec::new()),
6570
}
6671
}
6772

@@ -93,8 +98,16 @@ impl SdkMeter {
9398
builder.unit,
9499
None,
95100
)
96-
.map(|i| Counter::new(Arc::new(i)))
97-
{
101+
.map(|i| {
102+
let resolved_measures = Arc::new(i);
103+
let sync_instrument_update = resolved_measures.clone().as_sync_instrument_update();
104+
105+
self.sync_instruments
106+
.lock()
107+
.unwrap()
108+
.push(sync_instrument_update);
109+
Counter::new(resolved_measures)
110+
}) {
98111
Ok(counter) => counter,
99112
Err(err) => {
100113
otel_error!(

opentelemetry-sdk/src/metrics/meter_provider.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ impl SdkMeterProviderInner {
141141
"MeterProvider shutdown already invoked.".into(),
142142
))
143143
} else {
144+
let mut meters = self.meters.lock().unwrap();
145+
for (_, meter) in meters.drain() {
146+
meter.sync_instruments.lock().unwrap().clear(); // Clear all `SyncInstrument` trait objects to avoid any further updates from instruments.
147+
}
144148
self.pipes.shutdown()
145149
}
146150
}

opentelemetry/src/metrics/instruments/counter.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::KeyValue;
22
use core::fmt;
3-
use std::sync::Arc;
3+
use std::sync::{Arc, Weak};
44

55
use super::SyncInstrument;
66

@@ -11,7 +11,7 @@ use super::SyncInstrument;
1111
/// duplicate [`Counter`]s for the same metric could lower SDK performance.
1212
#[derive(Clone)]
1313
#[non_exhaustive]
14-
pub struct Counter<T>(Arc<dyn SyncInstrument<T> + Send + Sync>);
14+
pub struct Counter<T>(Weak<dyn SyncInstrument<T> + Send + Sync>);
1515

1616
impl<T> fmt::Debug for Counter<T>
1717
where
@@ -25,12 +25,16 @@ where
2525
impl<T> Counter<T> {
2626
/// Create a new counter.
2727
pub fn new(inner: Arc<dyn SyncInstrument<T> + Send + Sync>) -> Self {
28-
Counter(inner)
28+
Counter(Arc::downgrade(&inner))
2929
}
3030

3131
/// Records an increment to the counter.
3232
pub fn add(&self, value: T, attributes: &[KeyValue]) {
33-
self.0.measure(value, attributes)
33+
if let Some(inner) = self.0.upgrade() {
34+
inner.measure(value, attributes);
35+
} else {
36+
// TODO: Log error
37+
}
3438
}
3539
}
3640

0 commit comments

Comments
 (0)