Skip to content

Commit 563e377

Browse files
committed
Improve shutdown handling in PeriodicReader
1 parent dcaff0d commit 563e377

File tree

1 file changed

+33
-12
lines changed

1 file changed

+33
-12
lines changed

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -306,10 +306,6 @@ impl PeriodicReaderInner {
306306
}
307307

308308
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
309-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
310-
return Err(MetricError::Other("reader is shut down".into()));
311-
}
312-
313309
let producer = self.producer.lock().expect("lock poisoned");
314310
if let Some(p) = producer.as_ref() {
315311
p.upgrade()
@@ -373,6 +369,20 @@ impl PeriodicReaderInner {
373369
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
374370
return Err(MetricError::Other("reader is shut down".into()));
375371
}
372+
373+
// TODO: Better message for this scenario.
374+
// Flush and Shutdown called from 2 threads Flush check shutdown
375+
// flag before shutdown thread sets it. Both threads attempt to send
376+
// message to the same channel. Case1: Flush thread sends message first,
377+
// shutdown thread sends message next. Flush would succeed, as
378+
// background thread won't process shutdown message until flush
379+
// triggered export is done. Case2: Shutdown thread sends message first,
380+
// flush thread sends message next. Shutdown would succeed, as
381+
// background thread would process shutdown message first. The
382+
// background exits so it won't receive the flush message. ForceFlush
383+
// returns Failure, but we could indicate specifically that shutdown has
384+
// completed. TODO is to see if this message can be improved.
385+
376386
let (response_tx, response_rx) = mpsc::channel();
377387
match self.message_sender.lock() {
378388
Ok(sender) => {
@@ -402,8 +412,13 @@ impl PeriodicReaderInner {
402412
}
403413

404414
fn shutdown(&self) -> MetricResult<()> {
405-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
406-
return Err(MetricError::Other("Reader is already shut down".into()));
415+
if self
416+
.is_shutdown
417+
.swap(true, std::sync::atomic::Ordering::Relaxed)
418+
{
419+
return Err(MetricError::Other(
420+
"PeriodicReader shutdown already invoked.".into(),
421+
));
407422
}
408423

409424
// TODO: See if this is better to be created upfront.
@@ -424,16 +439,12 @@ impl PeriodicReaderInner {
424439
}
425440

426441
if let Ok(response) = response_rx.recv() {
427-
self.is_shutdown
428-
.store(true, std::sync::atomic::Ordering::Relaxed);
429442
if response {
430443
Ok(())
431444
} else {
432445
Err(MetricError::Other("Failed to shutdown".into()))
433446
}
434447
} else {
435-
self.is_shutdown
436-
.store(true, std::sync::atomic::Ordering::Relaxed);
437448
Err(MetricError::Other("Failed to shutdown".into()))
438449
}
439450
}
@@ -711,27 +722,31 @@ mod tests {
711722
collection_triggered_by_interval_helper();
712723
collection_triggered_by_flush_helper();
713724
collection_triggered_by_shutdown_helper();
725+
collection_triggered_by_drop_helper();
714726
}
715727

716728
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
717729
async fn collection_from_tokio_multi_with_one_worker() {
718730
collection_triggered_by_interval_helper();
719731
collection_triggered_by_flush_helper();
720732
collection_triggered_by_shutdown_helper();
733+
collection_triggered_by_drop_helper();
721734
}
722735

723736
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
724737
async fn collection_from_tokio_with_two_worker() {
725738
collection_triggered_by_interval_helper();
726739
collection_triggered_by_flush_helper();
727740
collection_triggered_by_shutdown_helper();
741+
collection_triggered_by_drop_helper();
728742
}
729743

730744
#[tokio::test(flavor = "current_thread")]
731745
async fn collection_from_tokio_current() {
732746
collection_triggered_by_interval_helper();
733747
collection_triggered_by_flush_helper();
734748
collection_triggered_by_shutdown_helper();
749+
collection_triggered_by_drop_helper();
735750
}
736751

737752
fn collection_triggered_by_interval_helper() {
@@ -756,7 +771,13 @@ mod tests {
756771
});
757772
}
758773

759-
fn collection_helper(trigger: fn(&SdkMeterProvider)) {
774+
fn collection_triggered_by_drop_helper() {
775+
collection_helper(|meter_provider| {
776+
drop(meter_provider);
777+
});
778+
}
779+
780+
fn collection_helper(trigger: fn(SdkMeterProvider)) {
760781
// Arrange
761782
let interval = std::time::Duration::from_millis(10);
762783
let exporter = InMemoryMetricExporter::default();
@@ -776,7 +797,7 @@ mod tests {
776797
.build();
777798

778799
// Act
779-
trigger(&meter_provider);
800+
trigger(meter_provider);
780801

781802
// Assert
782803
receiver

0 commit comments

Comments
 (0)