Skip to content

Commit 0a7be31

Browse files
committed
Small improvements to PeriodicReader
1 parent 1da35a9 commit 0a7be31

File tree

1 file changed

+21
-35
lines changed

1 file changed

+21
-35
lines changed

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ where
117117
/// return metric data to the user. It will not automatically send that data to
118118
/// the exporter outside of the predefined interval.
119119
///
120-
/// As this spuns up own background thread, this is recommended to be used with push exporters
121-
/// that do not require any particular async runtime. As of now, this cannot be used with
122-
/// OTLP exporters as they requires async runtime
123120
///
124121
/// [collect]: MetricReader::collect
125122
///
@@ -160,7 +157,7 @@ impl PeriodicReader {
160157
mpsc::channel();
161158
let reader = PeriodicReader {
162159
inner: Arc::new(PeriodicReaderInner {
163-
message_sender: Arc::new(Mutex::new(message_sender)),
160+
message_sender: Arc::new(message_sender),
164161
is_shutdown: AtomicBool::new(false),
165162
producer: Mutex::new(None),
166163
exporter: Arc::new(exporter),
@@ -223,6 +220,11 @@ impl PeriodicReader {
223220
} else {
224221
response_sender.send(true).unwrap();
225222
}
223+
224+
otel_debug!(
225+
name: "PeriodReaderThreadExiting",
226+
reason = "ShutdownRequested"
227+
);
226228
break;
227229
}
228230
Err(mpsc::RecvTimeoutError::Timeout) => {
@@ -255,8 +257,13 @@ impl PeriodicReader {
255257
interval_start = Instant::now();
256258
}
257259
}
258-
Err(_) => {
259-
// Some other error. Break out and exit the thread.
260+
Err(mpsc::RecvTimeoutError::Disconnected) => {
261+
// Channel disconnected, only thing to do is break
262+
// out (i.e exit the thread)
263+
otel_debug!(
264+
name: "PeriodReaderThreadExiting",
265+
reason = "MessageReceiverDisconnected"
266+
);
260267
break;
261268
}
262269
}
@@ -271,6 +278,7 @@ impl PeriodicReader {
271278
if let Err(e) = result_thread_creation {
272279
otel_error!(
273280
name: "PeriodReaderThreadStartError",
281+
message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
274282
error = format!("{:?}", e)
275283
);
276284
}
@@ -290,7 +298,7 @@ impl fmt::Debug for PeriodicReader {
290298

291299
struct PeriodicReaderInner {
292300
exporter: Arc<dyn PushMetricExporter>,
293-
message_sender: Arc<Mutex<mpsc::Sender<Message>>>,
301+
message_sender: Arc<mpsc::Sender<Message>>,
294302
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
295303
is_shutdown: AtomicBool,
296304
}
@@ -374,20 +382,9 @@ impl PeriodicReaderInner {
374382
return Err(MetricError::Other("reader is shut down".into()));
375383
}
376384
let (response_tx, response_rx) = mpsc::channel();
377-
match self.message_sender.lock() {
378-
Ok(sender) => {
379-
sender
380-
.send(Message::Flush(response_tx))
381-
.map_err(|e| MetricError::Other(e.to_string()))?;
382-
}
383-
Err(e) => {
384-
otel_debug!(
385-
name: "PeriodReaderForceFlushError",
386-
error = format!("{:?}", e)
387-
);
388-
return Err(MetricError::Other(e.to_string()));
389-
}
390-
}
385+
self.message_sender
386+
.send(Message::Flush(response_tx))
387+
.map_err(|e| MetricError::Other(e.to_string()))?;
391388

392389
if let Ok(response) = response_rx.recv() {
393390
// TODO: call exporter's force_flush method.
@@ -408,20 +405,9 @@ impl PeriodicReaderInner {
408405

409406
// TODO: See if this is better to be created upfront.
410407
let (response_tx, response_rx) = mpsc::channel();
411-
match self.message_sender.lock() {
412-
Ok(sender) => {
413-
sender
414-
.send(Message::Shutdown(response_tx))
415-
.map_err(|e| MetricError::Other(e.to_string()))?;
416-
}
417-
Err(e) => {
418-
otel_debug!(
419-
name: "PeriodReaderShutdownError",
420-
error = format!("{:?}", e)
421-
);
422-
return Err(MetricError::Other(e.to_string()));
423-
}
424-
}
408+
self.message_sender
409+
.send(Message::Shutdown(response_tx))
410+
.map_err(|e| MetricError::Other(e.to_string()))?;
425411

426412
if let Ok(response) = response_rx.recv() {
427413
self.is_shutdown

0 commit comments

Comments
 (0)