Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
strategy:
matrix:
os: [windows-latest, ubuntu-latest]
rust: [1.70.0, 1.71.1]
rust: [1.70.0, 1.71.1, 1.75.0]
runs-on: ${{ matrix.os }}
continue-on-error: true
steps:
Expand Down
56 changes: 21 additions & 35 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@
/// return metric data to the user. It will not automatically send that data to
/// the exporter outside of the predefined interval.
///
/// As this spuns up own background thread, this is recommended to be used with push exporters
/// that do not require any particular async runtime. As of now, this cannot be used with
/// OTLP exporters as they requires async runtime
///
/// [collect]: MetricReader::collect
///
Expand Down Expand Up @@ -160,7 +157,7 @@
mpsc::channel();
let reader = PeriodicReader {
inner: Arc::new(PeriodicReaderInner {
message_sender: Arc::new(Mutex::new(message_sender)),
message_sender: Arc::new(message_sender),
is_shutdown: AtomicBool::new(false),
producer: Mutex::new(None),
exporter: Arc::new(exporter),
Expand Down Expand Up @@ -223,6 +220,11 @@
} else {
response_sender.send(true).unwrap();
}

otel_debug!(
name: "PeriodReaderThreadExiting",
reason = "ShutdownRequested"
);
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
Expand Down Expand Up @@ -255,8 +257,13 @@
interval_start = Instant::now();
}
}
Err(_) => {
// Some other error. Break out and exit the thread.
Err(mpsc::RecvTimeoutError::Disconnected) => {
// Channel disconnected, only thing to do is break
// out (i.e exit the thread)
otel_debug!(
name: "PeriodReaderThreadExiting",
reason = "MessageReceiverDisconnected"
);

Check warning on line 266 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L263-L266

Added lines #L263 - L266 were not covered by tests
break;
}
}
Expand All @@ -271,6 +278,7 @@
if let Err(e) = result_thread_creation {
otel_error!(
name: "PeriodReaderThreadStartError",
message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
error = format!("{:?}", e)
);
}
Expand All @@ -290,7 +298,7 @@

struct PeriodicReaderInner {
exporter: Arc<dyn PushMetricExporter>,
message_sender: Arc<Mutex<mpsc::Sender<Message>>>,
message_sender: Arc<mpsc::Sender<Message>>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
is_shutdown: AtomicBool,
}
Expand Down Expand Up @@ -374,20 +382,9 @@
return Err(MetricError::Other("reader is shut down".into()));
}
let (response_tx, response_rx) = mpsc::channel();
match self.message_sender.lock() {
Ok(sender) => {
sender
.send(Message::Flush(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
}
Err(e) => {
otel_debug!(
name: "PeriodReaderForceFlushError",
error = format!("{:?}", e)
);
return Err(MetricError::Other(e.to_string()));
}
}
self.message_sender
.send(Message::Flush(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
// TODO: call exporter's force_flush method.
Expand All @@ -408,20 +405,9 @@

// TODO: See if this is better to be created upfront.
let (response_tx, response_rx) = mpsc::channel();
match self.message_sender.lock() {
Ok(sender) => {
sender
.send(Message::Shutdown(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
}
Err(e) => {
otel_debug!(
name: "PeriodReaderShutdownError",
error = format!("{:?}", e)
);
return Err(MetricError::Other(e.to_string()));
}
}
self.message_sender
.send(Message::Shutdown(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related - but should this be recv_timeout, to ensure that the main thread doesn't block indefinitely if the collect/export takes too much time? Similar for flush too.

self.is_shutdown
Expand Down
5 changes: 4 additions & 1 deletion scripts/msrv_config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"1.70.0": [
"opentelemetry/Cargo.toml",
"opentelemetry-sdk/Cargo.toml",
"opentelemetry-stdout/Cargo.toml",
"opentelemetry-http/Cargo.toml",
"opentelemetry-jaeger-propagator/Cargo.toml",
Expand All @@ -13,4 +12,8 @@
"opentelemetry-otlp/Cargo.toml",
"opentelemetry-proto/Cargo.toml"
]
,
"1.75.0": [
"opentelemetry-sdk/Cargo.toml"
]
}
Loading