Skip to content

Commit f41d1e2

Browse files
authored
Merge branch 'main' into cijothomas/periodicreader-fix3
2 parents f68aecc + 9b0ccce commit f41d1e2

File tree

1 file changed

+21
-35
lines changed

1 file changed

+21
-35
lines changed

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ where
117117
/// return metric data to the user. It will not automatically send that data to
118118
/// the exporter outside of the predefined interval.
119119
///
120-
/// As this spuns up own background thread, this is recommended to be used with push exporters
121-
/// that do not require any particular async runtime. As of now, this cannot be used with
122-
/// OTLP exporters as they requires async runtime
123120
///
124121
/// [collect]: MetricReader::collect
125122
///
@@ -160,7 +157,7 @@ impl PeriodicReader {
160157
mpsc::channel();
161158
let reader = PeriodicReader {
162159
inner: Arc::new(PeriodicReaderInner {
163-
message_sender: Arc::new(Mutex::new(message_sender)),
160+
message_sender: Arc::new(message_sender),
164161
is_shutdown: AtomicBool::new(false),
165162
producer: Mutex::new(None),
166163
exporter: Arc::new(exporter),
@@ -223,6 +220,11 @@ impl PeriodicReader {
223220
} else {
224221
response_sender.send(true).unwrap();
225222
}
223+
224+
otel_debug!(
225+
name: "PeriodReaderThreadExiting",
226+
reason = "ShutdownRequested"
227+
);
226228
break;
227229
}
228230
Err(mpsc::RecvTimeoutError::Timeout) => {
@@ -255,8 +257,13 @@ impl PeriodicReader {
255257
interval_start = Instant::now();
256258
}
257259
}
258-
Err(_) => {
259-
// Some other error. Break out and exit the thread.
260+
Err(mpsc::RecvTimeoutError::Disconnected) => {
261+
// Channel disconnected, only thing to do is break
262+
// out (i.e exit the thread)
263+
otel_debug!(
264+
name: "PeriodReaderThreadExiting",
265+
reason = "MessageReceiverDisconnected"
266+
);
260267
break;
261268
}
262269
}
@@ -271,6 +278,7 @@ impl PeriodicReader {
271278
if let Err(e) = result_thread_creation {
272279
otel_error!(
273280
name: "PeriodReaderThreadStartError",
281+
message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
274282
error = format!("{:?}", e)
275283
);
276284
}
@@ -290,7 +298,7 @@ impl fmt::Debug for PeriodicReader {
290298

291299
struct PeriodicReaderInner {
292300
exporter: Arc<dyn PushMetricExporter>,
293-
message_sender: Arc<Mutex<mpsc::Sender<Message>>>,
301+
message_sender: Arc<mpsc::Sender<Message>>,
294302
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
295303
is_shutdown: AtomicBool,
296304
}
@@ -384,20 +392,9 @@ impl PeriodicReaderInner {
384392
// completed. TODO is to see if this message can be improved.
385393

386394
let (response_tx, response_rx) = mpsc::channel();
387-
match self.message_sender.lock() {
388-
Ok(sender) => {
389-
sender
390-
.send(Message::Flush(response_tx))
391-
.map_err(|e| MetricError::Other(e.to_string()))?;
392-
}
393-
Err(e) => {
394-
otel_debug!(
395-
name: "PeriodReaderForceFlushError",
396-
error = format!("{:?}", e)
397-
);
398-
return Err(MetricError::Other(e.to_string()));
399-
}
400-
}
395+
self.message_sender
396+
.send(Message::Flush(response_tx))
397+
.map_err(|e| MetricError::Other(e.to_string()))?;
401398

402399
if let Ok(response) = response_rx.recv() {
403400
// TODO: call exporter's force_flush method.
@@ -423,20 +420,9 @@ impl PeriodicReaderInner {
423420

424421
// TODO: See if this is better to be created upfront.
425422
let (response_tx, response_rx) = mpsc::channel();
426-
match self.message_sender.lock() {
427-
Ok(sender) => {
428-
sender
429-
.send(Message::Shutdown(response_tx))
430-
.map_err(|e| MetricError::Other(e.to_string()))?;
431-
}
432-
Err(e) => {
433-
otel_debug!(
434-
name: "PeriodReaderShutdownError",
435-
error = format!("{:?}", e)
436-
);
437-
return Err(MetricError::Other(e.to_string()));
438-
}
439-
}
423+
self.message_sender
424+
.send(Message::Shutdown(response_tx))
425+
.map_err(|e| MetricError::Other(e.to_string()))?;
440426

441427
if let Ok(response) = response_rx.recv() {
442428
if response {

0 commit comments

Comments
 (0)