Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct SdkMeterProvider {
struct SdkMeterProviderInner {
pipes: Arc<Pipelines>,
meters: Mutex<HashMap<InstrumentationScope, Arc<SdkMeter>>>,
is_shutdown: AtomicBool,
shutdown_invoked: AtomicBool,
}

impl Default for SdkMeterProvider {
Expand Down Expand Up @@ -119,20 +119,29 @@ impl SdkMeterProvider {

impl SdkMeterProviderInner {
fn force_flush(&self) -> MetricResult<()> {
self.pipes.force_flush()
if self
.shutdown_invoked
.load(std::sync::atomic::Ordering::Relaxed)
{
Err(MetricError::Other(
"Cannot perform flush as MeterProvider shutdown already invoked.".into(),
))
} else {
self.pipes.force_flush()
}
}

fn shutdown(&self) -> MetricResult<()> {
if self
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
.shutdown_invoked
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
self.pipes.shutdown()
} else {
// If the previous value was true, shutdown was already invoked.
Err(MetricError::Other(
"metrics provider already shut down".into(),
"MeterProvider shutdown already invoked.".into(),
))
} else {
self.pipes.shutdown()
}
}
}
Expand All @@ -141,7 +150,7 @@ impl Drop for SdkMeterProviderInner {
fn drop(&mut self) {
// If user has already shutdown the provider manually by calling
// shutdown(), then we don't need to call shutdown again.
if self.is_shutdown.load(Ordering::Relaxed) {
if self.shutdown_invoked.load(Ordering::Relaxed) {
otel_debug!(
name: "MeterProvider.Drop.AlreadyShutdown",
message = "MeterProvider was already shut down; drop will not attempt shutdown again."
Expand Down Expand Up @@ -173,7 +182,7 @@ impl MeterProvider for SdkMeterProvider {
}

fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
if self.inner.shutdown_invoked.load(Ordering::Relaxed) {
otel_debug!(
name: "MeterProvider.NoOpMeterReturned",
meter_name = scope.name(),
Expand Down Expand Up @@ -270,7 +279,7 @@ impl MeterProviderBuilder {
self.views,
)),
meters: Default::default(),
is_shutdown: AtomicBool::new(false),
shutdown_invoked: AtomicBool::new(false),
}),
};

Expand Down
21 changes: 0 additions & 21 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
env, fmt,
sync::{
atomic::AtomicBool,
mpsc::{self, Receiver, Sender},
Arc, Mutex, Weak,
},
Expand Down Expand Up @@ -158,7 +157,6 @@ impl PeriodicReader {
let reader = PeriodicReader {
inner: Arc::new(PeriodicReaderInner {
message_sender: Arc::new(message_sender),
shutdown_invoked: AtomicBool::new(false),
producer: Mutex::new(None),
exporter: Arc::new(exporter),
}),
Expand Down Expand Up @@ -300,7 +298,6 @@ struct PeriodicReaderInner {
exporter: Arc<dyn PushMetricExporter>,
message_sender: Arc<mpsc::Sender<Message>>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
shutdown_invoked: AtomicBool,
}

impl PeriodicReaderInner {
Expand Down Expand Up @@ -374,15 +371,6 @@ impl PeriodicReaderInner {
}

fn force_flush(&self) -> MetricResult<()> {
if self
.shutdown_invoked
.load(std::sync::atomic::Ordering::Relaxed)
{
return Err(MetricError::Other(
"Cannot perform flush as PeriodicReader shutdown already invoked.".into(),
));
}

// TODO: Better message for this scenario.
// Flush and Shutdown called from 2 threads Flush check shutdown
// flag before shutdown thread sets it. Both threads attempt to send
Expand Down Expand Up @@ -414,15 +402,6 @@ impl PeriodicReaderInner {
}

fn shutdown(&self) -> MetricResult<()> {
if self
.shutdown_invoked
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking - we don't need any check in periodically executed collect_and_export to not do export if shutdown is invoked?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after shutdown is invoked, there is no way to invoke collect or export methods (they are not public).
I am still checking for ways to better architect this - reader is cloneable today, which I am not sure if we need to. but requires more refactoring/design..

.swap(true, std::sync::atomic::Ordering::Relaxed)
{
return Err(MetricError::Other(
"PeriodicReader shutdown already invoked.".into(),
));
}

// TODO: See if this is better to be created upfront.
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
Expand Down
Loading