Skip to content

Commit b1745f0

Browse files
committed
Move shutdown detection logic to MeterProvider
1 parent c726c4d commit b1745f0

File tree

2 files changed

+17
-32
lines changed

2 files changed

+17
-32
lines changed

opentelemetry-sdk/src/metrics/meter_provider.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub struct SdkMeterProvider {
3838
struct SdkMeterProviderInner {
3939
pipes: Arc<Pipelines>,
4040
meters: Mutex<HashMap<InstrumentationScope, Arc<SdkMeter>>>,
41-
is_shutdown: AtomicBool,
41+
shutdown_invoked: AtomicBool,
4242
}
4343

4444
impl Default for SdkMeterProvider {
@@ -119,20 +119,26 @@ impl SdkMeterProvider {
119119

120120
impl SdkMeterProviderInner {
121121
fn force_flush(&self) -> MetricResult<()> {
122-
self.pipes.force_flush()
122+
if self.shutdown_invoked.load(std::sync::atomic::Ordering::Relaxed) {
123+
Err(MetricError::Other(
124+
"Cannot perform flush as MeterProvider shutdown already invoked.".into(),
125+
))
126+
} else {
127+
self.pipes.force_flush()
128+
}
123129
}
124130

125131
fn shutdown(&self) -> MetricResult<()> {
126132
if self
127-
.is_shutdown
128-
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
129-
.is_ok()
133+
.shutdown_invoked
134+
.swap(true, std::sync::atomic::Ordering::SeqCst)
130135
{
131-
self.pipes.shutdown()
132-
} else {
136+
// If the previous value was true, shutdown was already invoked.
133137
Err(MetricError::Other(
134-
"metrics provider already shut down".into(),
138+
"MeterProvider shutdown already invoked.".into(),
135139
))
140+
} else {
141+
self.pipes.shutdown()
136142
}
137143
}
138144
}
@@ -141,7 +147,7 @@ impl Drop for SdkMeterProviderInner {
141147
fn drop(&mut self) {
142148
// If user has already shutdown the provider manually by calling
143149
// shutdown(), then we don't need to call shutdown again.
144-
if self.is_shutdown.load(Ordering::Relaxed) {
150+
if self.shutdown_invoked.load(Ordering::Relaxed) {
145151
otel_debug!(
146152
name: "MeterProvider.Drop.AlreadyShutdown",
147153
message = "MeterProvider was already shut down; drop will not attempt shutdown again."
@@ -173,7 +179,7 @@ impl MeterProvider for SdkMeterProvider {
173179
}
174180

175181
fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter {
176-
if self.inner.is_shutdown.load(Ordering::Relaxed) {
182+
if self.inner.shutdown_invoked.load(Ordering::Relaxed) {
177183
otel_debug!(
178184
name: "MeterProvider.NoOpMeterReturned",
179185
meter_name = scope.name(),
@@ -270,7 +276,7 @@ impl MeterProviderBuilder {
270276
self.views,
271277
)),
272278
meters: Default::default(),
273-
is_shutdown: AtomicBool::new(false),
279+
shutdown_invoked: AtomicBool::new(false),
274280
}),
275281
};
276282

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{
22
env, fmt,
33
sync::{
4-
atomic::AtomicBool,
54
mpsc::{self, Receiver, Sender},
65
Arc, Mutex, Weak,
76
},
@@ -158,7 +157,6 @@ impl PeriodicReader {
158157
let reader = PeriodicReader {
159158
inner: Arc::new(PeriodicReaderInner {
160159
message_sender: Arc::new(message_sender),
161-
shutdown_invoked: AtomicBool::new(false),
162160
producer: Mutex::new(None),
163161
exporter: Arc::new(exporter),
164162
}),
@@ -300,7 +298,6 @@ struct PeriodicReaderInner {
300298
exporter: Arc<dyn PushMetricExporter>,
301299
message_sender: Arc<mpsc::Sender<Message>>,
302300
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
303-
shutdown_invoked: AtomicBool,
304301
}
305302

306303
impl PeriodicReaderInner {
@@ -374,15 +371,6 @@ impl PeriodicReaderInner {
374371
}
375372

376373
fn force_flush(&self) -> MetricResult<()> {
377-
if self
378-
.shutdown_invoked
379-
.load(std::sync::atomic::Ordering::Relaxed)
380-
{
381-
return Err(MetricError::Other(
382-
"Cannot perform flush as PeriodicReader shutdown already invoked.".into(),
383-
));
384-
}
385-
386374
// TODO: Better message for this scenario.
387375
// Flush and Shutdown called from 2 threads Flush check shutdown
388376
// flag before shutdown thread sets it. Both threads attempt to send
@@ -414,15 +402,6 @@ impl PeriodicReaderInner {
414402
}
415403

416404
fn shutdown(&self) -> MetricResult<()> {
417-
if self
418-
.shutdown_invoked
419-
.swap(true, std::sync::atomic::Ordering::Relaxed)
420-
{
421-
return Err(MetricError::Other(
422-
"PeriodicReader shutdown already invoked.".into(),
423-
));
424-
}
425-
426405
// TODO: See if this is better to be created upfront.
427406
let (response_tx, response_rx) = mpsc::channel();
428407
self.message_sender

0 commit comments

Comments
 (0)