Skip to content

Commit 6ac8aa0

Browse files
authored
Merge branch 'main' into log-async-trait-impl
2 parents f752d50 + 15d69b1 commit 6ac8aa0

File tree

1 file changed

+63
-51
lines changed

1 file changed

+63
-51
lines changed

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 63 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ where
117117
/// return metric data to the user. It will not automatically send that data to
118118
/// the exporter outside of the predefined interval.
119119
///
120-
/// As this spuns up own background thread, this is recommended to be used with push exporters
121-
/// that do not require any particular async runtime. As of now, this cannot be used with
122-
/// OTLP exporters as they requires async runtime
123120
///
124121
/// [collect]: MetricReader::collect
125122
///
@@ -160,8 +157,8 @@ impl PeriodicReader {
160157
mpsc::channel();
161158
let reader = PeriodicReader {
162159
inner: Arc::new(PeriodicReaderInner {
163-
message_sender: Arc::new(Mutex::new(message_sender)),
164-
is_shutdown: AtomicBool::new(false),
160+
message_sender: Arc::new(message_sender),
161+
shutdown_invoked: AtomicBool::new(false),
165162
producer: Mutex::new(None),
166163
exporter: Arc::new(exporter),
167164
}),
@@ -223,6 +220,11 @@ impl PeriodicReader {
223220
} else {
224221
response_sender.send(true).unwrap();
225222
}
223+
224+
otel_debug!(
225+
name: "PeriodReaderThreadExiting",
226+
reason = "ShutdownRequested"
227+
);
226228
break;
227229
}
228230
Err(mpsc::RecvTimeoutError::Timeout) => {
@@ -255,8 +257,13 @@ impl PeriodicReader {
255257
interval_start = Instant::now();
256258
}
257259
}
258-
Err(_) => {
259-
// Some other error. Break out and exit the thread.
260+
Err(mpsc::RecvTimeoutError::Disconnected) => {
261+
// Channel disconnected, only thing to do is break
262+
// out (i.e exit the thread)
263+
otel_debug!(
264+
name: "PeriodReaderThreadExiting",
265+
reason = "MessageReceiverDisconnected"
266+
);
260267
break;
261268
}
262269
}
@@ -271,6 +278,7 @@ impl PeriodicReader {
271278
if let Err(e) = result_thread_creation {
272279
otel_error!(
273280
name: "PeriodReaderThreadStartError",
281+
message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
274282
error = format!("{:?}", e)
275283
);
276284
}
@@ -290,9 +298,9 @@ impl fmt::Debug for PeriodicReader {
290298

291299
struct PeriodicReaderInner {
292300
exporter: Arc<dyn PushMetricExporter>,
293-
message_sender: Arc<Mutex<mpsc::Sender<Message>>>,
301+
message_sender: Arc<mpsc::Sender<Message>>,
294302
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
295-
is_shutdown: AtomicBool,
303+
shutdown_invoked: AtomicBool,
296304
}
297305

298306
impl PeriodicReaderInner {
@@ -306,10 +314,6 @@ impl PeriodicReaderInner {
306314
}
307315

308316
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
309-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
310-
return Err(MetricError::Other("reader is shut down".into()));
311-
}
312-
313317
let producer = self.producer.lock().expect("lock poisoned");
314318
if let Some(p) = producer.as_ref() {
315319
p.upgrade()
@@ -370,24 +374,32 @@ impl PeriodicReaderInner {
370374
}
371375

372376
fn force_flush(&self) -> MetricResult<()> {
373-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
374-
return Err(MetricError::Other("reader is shut down".into()));
377+
if self
378+
.shutdown_invoked
379+
.load(std::sync::atomic::Ordering::Relaxed)
380+
{
381+
return Err(MetricError::Other(
382+
"Cannot perform flush as PeriodicReader shutdown already invoked.".into(),
383+
));
375384
}
385+
386+
// TODO: Better message for this scenario.
387+
// Flush and Shutdown called from 2 threads Flush check shutdown
388+
// flag before shutdown thread sets it. Both threads attempt to send
389+
// message to the same channel. Case1: Flush thread sends message first,
390+
// shutdown thread sends message next. Flush would succeed, as
391+
// background thread won't process shutdown message until flush
392+
// triggered export is done. Case2: Shutdown thread sends message first,
393+
// flush thread sends message next. Shutdown would succeed, as
394+
// background thread would process shutdown message first. The
395+
// background exits so it won't receive the flush message. ForceFlush
396+
// returns Failure, but we could indicate specifically that shutdown has
397+
// completed. TODO is to see if this message can be improved.
398+
376399
let (response_tx, response_rx) = mpsc::channel();
377-
match self.message_sender.lock() {
378-
Ok(sender) => {
379-
sender
380-
.send(Message::Flush(response_tx))
381-
.map_err(|e| MetricError::Other(e.to_string()))?;
382-
}
383-
Err(e) => {
384-
otel_debug!(
385-
name: "PeriodReaderForceFlushError",
386-
error = format!("{:?}", e)
387-
);
388-
return Err(MetricError::Other(e.to_string()));
389-
}
390-
}
400+
self.message_sender
401+
.send(Message::Flush(response_tx))
402+
.map_err(|e| MetricError::Other(e.to_string()))?;
391403

392404
if let Ok(response) = response_rx.recv() {
393405
// TODO: call exporter's force_flush method.
@@ -402,38 +414,28 @@ impl PeriodicReaderInner {
402414
}
403415

404416
fn shutdown(&self) -> MetricResult<()> {
405-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
406-
return Err(MetricError::Other("Reader is already shut down".into()));
417+
if self
418+
.shutdown_invoked
419+
.swap(true, std::sync::atomic::Ordering::Relaxed)
420+
{
421+
return Err(MetricError::Other(
422+
"PeriodicReader shutdown already invoked.".into(),
423+
));
407424
}
408425

409426
// TODO: See if this is better to be created upfront.
410427
let (response_tx, response_rx) = mpsc::channel();
411-
match self.message_sender.lock() {
412-
Ok(sender) => {
413-
sender
414-
.send(Message::Shutdown(response_tx))
415-
.map_err(|e| MetricError::Other(e.to_string()))?;
416-
}
417-
Err(e) => {
418-
otel_debug!(
419-
name: "PeriodReaderShutdownError",
420-
error = format!("{:?}", e)
421-
);
422-
return Err(MetricError::Other(e.to_string()));
423-
}
424-
}
428+
self.message_sender
429+
.send(Message::Shutdown(response_tx))
430+
.map_err(|e| MetricError::Other(e.to_string()))?;
425431

426432
if let Ok(response) = response_rx.recv() {
427-
self.is_shutdown
428-
.store(true, std::sync::atomic::Ordering::Relaxed);
429433
if response {
430434
Ok(())
431435
} else {
432436
Err(MetricError::Other("Failed to shutdown".into()))
433437
}
434438
} else {
435-
self.is_shutdown
436-
.store(true, std::sync::atomic::Ordering::Relaxed);
437439
Err(MetricError::Other("Failed to shutdown".into()))
438440
}
439441
}
@@ -711,27 +713,31 @@ mod tests {
711713
collection_triggered_by_interval_helper();
712714
collection_triggered_by_flush_helper();
713715
collection_triggered_by_shutdown_helper();
716+
collection_triggered_by_drop_helper();
714717
}
715718

716719
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
717720
async fn collection_from_tokio_multi_with_one_worker() {
718721
collection_triggered_by_interval_helper();
719722
collection_triggered_by_flush_helper();
720723
collection_triggered_by_shutdown_helper();
724+
collection_triggered_by_drop_helper();
721725
}
722726

723727
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
724728
async fn collection_from_tokio_with_two_worker() {
725729
collection_triggered_by_interval_helper();
726730
collection_triggered_by_flush_helper();
727731
collection_triggered_by_shutdown_helper();
732+
collection_triggered_by_drop_helper();
728733
}
729734

730735
#[tokio::test(flavor = "current_thread")]
731736
async fn collection_from_tokio_current() {
732737
collection_triggered_by_interval_helper();
733738
collection_triggered_by_flush_helper();
734739
collection_triggered_by_shutdown_helper();
740+
collection_triggered_by_drop_helper();
735741
}
736742

737743
fn collection_triggered_by_interval_helper() {
@@ -756,7 +762,13 @@ mod tests {
756762
});
757763
}
758764

759-
fn collection_helper(trigger: fn(&SdkMeterProvider)) {
765+
fn collection_triggered_by_drop_helper() {
766+
collection_helper(|meter_provider| {
767+
drop(meter_provider);
768+
});
769+
}
770+
771+
fn collection_helper(trigger: fn(SdkMeterProvider)) {
760772
// Arrange
761773
let interval = std::time::Duration::from_millis(10);
762774
let exporter = InMemoryMetricExporter::default();
@@ -776,7 +788,7 @@ mod tests {
776788
.build();
777789

778790
// Act
779-
trigger(&meter_provider);
791+
trigger(meter_provider);
780792

781793
// Assert
782794
receiver

0 commit comments

Comments
 (0)