Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl PeriodicReader {
let reader = PeriodicReader {
inner: Arc::new(PeriodicReaderInner {
message_sender: Arc::new(message_sender),
is_shutdown: AtomicBool::new(false),
shutdown_invoked: AtomicBool::new(false),
producer: Mutex::new(None),
exporter: Arc::new(exporter),
}),
Expand Down Expand Up @@ -300,7 +300,7 @@ struct PeriodicReaderInner {
exporter: Arc<dyn PushMetricExporter>,
message_sender: Arc<mpsc::Sender<Message>>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
is_shutdown: AtomicBool,
shutdown_invoked: AtomicBool,
}

impl PeriodicReaderInner {
Expand All @@ -314,10 +314,6 @@ impl PeriodicReaderInner {
}

fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("reader is shut down".into()));
}

let producer = self.producer.lock().expect("lock poisoned");
if let Some(p) = producer.as_ref() {
p.upgrade()
Expand Down Expand Up @@ -378,9 +374,28 @@ impl PeriodicReaderInner {
}

fn force_flush(&self) -> MetricResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("reader is shut down".into()));
if self
.shutdown_invoked
.load(std::sync::atomic::Ordering::Relaxed)
{
return Err(MetricError::Other(
"Cannot perform flush as PeriodicReader shutdown already invoked.".into(),
));
}

// TODO: Better message for this scenario.
// Flush and Shutdown called from 2 threads Flush check shutdown
// flag before shutdown thread sets it. Both threads attempt to send
// message to the same channel. Case1: Flush thread sends message first,
// shutdown thread sends message next. Flush would succeed, as
// background thread won't process shutdown message until flush
// triggered export is done. Case2: Shutdown thread sends message first,
// flush thread sends message next. Shutdown would succeed, as
// background thread would process shutdown message first. The
// background exits so it won't receive the flush message. ForceFlush
// returns Failure, but we could indicate specifically that shutdown has
// completed. TODO is to see if this message can be improved.

let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Flush(response_tx))
Expand All @@ -399,8 +414,13 @@ impl PeriodicReaderInner {
}

fn shutdown(&self) -> MetricResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Err(MetricError::Other("Reader is already shut down".into()));
if self
.shutdown_invoked
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
return Err(MetricError::Other(
"PeriodicReader shutdown already invoked.".into(),
));
}

// TODO: See if this is better to be created upfront.
Expand All @@ -410,16 +430,12 @@ impl PeriodicReaderInner {
.map_err(|e| MetricError::Other(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to shutdown".into()))
}
} else {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
Err(MetricError::Other("Failed to shutdown".into()))
}
}
Expand Down Expand Up @@ -697,27 +713,31 @@ mod tests {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_from_tokio_multi_with_one_worker() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_from_tokio_with_two_worker() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

#[tokio::test(flavor = "current_thread")]
async fn collection_from_tokio_current() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}

fn collection_triggered_by_interval_helper() {
Expand All @@ -742,7 +762,13 @@ mod tests {
});
}

fn collection_helper(trigger: fn(&SdkMeterProvider)) {
fn collection_triggered_by_drop_helper() {
collection_helper(|meter_provider| {
drop(meter_provider);
});
}

fn collection_helper(trigger: fn(SdkMeterProvider)) {
// Arrange
let interval = std::time::Duration::from_millis(10);
let exporter = InMemoryMetricExporter::default();
Expand All @@ -762,7 +788,7 @@ mod tests {
.build();

// Act
trigger(&meter_provider);
trigger(meter_provider);

// Assert
receiver
Expand Down
Loading