Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,6 @@
}

fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("reader is shut down".into()));
}

let producer = self.producer.lock().expect("lock poisoned");
if let Some(p) = producer.as_ref() {
p.upgrade()
Expand Down Expand Up @@ -373,6 +369,20 @@
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("reader is shut down".into()));
}

// TODO: Better message for this scenario.
// Flush and Shutdown called from 2 threads Flush check shutdown
// flag before shutdown thread sets it. Both threads attempt to send
// message to the same channel. Case1: Flush thread sends message first,
// shutdown thread sends message next. Flush would succeed, as
// background thread won't process shutdown message until flush
// triggered export is done. Case2: Shutdown thread sends message first,
// flush thread sends message next. Shutdown would succeed, as
// background thread would process shutdown message first. The
// background exits so it won't receive the flush message. ForceFlush
// returns Failure, but we could indicate specifically that shutdown has
// completed. TODO is to see if this message can be improved.

let (response_tx, response_rx) = mpsc::channel();
match self.message_sender.lock() {
Ok(sender) => {
Expand Down Expand Up @@ -402,8 +412,13 @@
}

fn shutdown(&self) -> MetricResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("Reader is already shut down".into()));
if self
.is_shutdown
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
return Err(MetricError::Other(
"PeriodicReader shutdown already invoked.".into(),
));

Check warning on line 421 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L419-L421

Added lines #L419 - L421 were not covered by tests
}

// TODO: See if this is better to be created upfront.
Expand All @@ -424,16 +439,12 @@
}

if let Ok(response) = response_rx.recv() {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to shutdown".into()))
}
} else {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
Err(MetricError::Other("Failed to shutdown".into()))
}
}
Expand Down Expand Up @@ -711,27 +722,31 @@
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_from_tokio_multi_with_one_worker() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_from_tokio_with_two_worker() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "current_thread")]
async fn collection_from_tokio_current() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

fn collection_triggered_by_interval_helper() {
Expand All @@ -756,7 +771,13 @@
});
}

fn collection_helper(trigger: fn(&SdkMeterProvider)) {
fn collection_triggered_by_drop_helper() {
collection_helper(|meter_provider| {
drop(meter_provider);
});
}

fn collection_helper(trigger: fn(SdkMeterProvider)) {
// Arrange
let interval = std::time::Duration::from_millis(10);
let exporter = InMemoryMetricExporter::default();
Expand All @@ -776,7 +797,7 @@
.build();

// Act
trigger(&meter_provider);
trigger(meter_provider);

// Assert
receiver
Expand Down
Loading