Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
self.client.export(metrics).await
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 148 in opentelemetry-otlp/src/metric.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/metric.rs#L148

Added line #L148 was not covered by tests
// this component is stateless
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl MetricReader for SharedReader {
self.0.collect(rm)
}

fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
self.0.force_flush()
}

Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use async_trait::async_trait;

use crate::error::OTelSdkResult;
use crate::metrics::MetricResult;

use crate::metrics::data::ResourceMetrics;

Expand All @@ -21,7 +20,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;

/// Flushes any metric data held by an exporter.
async fn force_flush(&self) -> MetricResult<()>;
async fn force_flush(&self) -> OTelSdkResult;

/// Releases any held computational resources.
///
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 277 in opentelemetry-sdk/src/metrics/in_memory_exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/in_memory_exporter.rs#L277

Added line #L277 was not covered by tests
Ok(()) // In this implementation, flush does nothing
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/manual_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
}

/// ForceFlush is a no-op, it always returns nil.
fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {

Check warning on line 108 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L108

Added line #L108 was not covered by tests
Ok(())
}

Expand Down
13 changes: 4 additions & 9 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ use opentelemetry::{
otel_debug, otel_error, otel_info, InstrumentationScope,
};

use crate::error::OTelSdkResult;
use crate::Resource;
use crate::{
error::OTelSdkResult,
metrics::{MetricError, MetricResult},
};

use super::{
exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
Expand Down Expand Up @@ -96,7 +93,7 @@ impl SdkMeterProvider {
/// Ok(())
/// }
/// ```
pub fn force_flush(&self) -> MetricResult<()> {
pub fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}

Expand All @@ -122,14 +119,12 @@ impl SdkMeterProvider {
}

impl SdkMeterProviderInner {
fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
if self
.shutdown_invoked
.load(std::sync::atomic::Ordering::Relaxed)
{
Err(MetricError::Other(
"Cannot perform flush as MeterProvider shutdown already invoked.".into(),
))
Err(crate::error::OTelSdkError::AlreadyShutdown)
} else {
self.pipes.force_flush()
}
Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@
futures_executor::block_on(self.exporter.export(&mut rm))
}

fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
// TODO: Better message for this scenario.
// Flush and Shutdown called from 2 threads Flush check shutdown
// flag before shutdown thread sets it. Both threads attempt to send
Expand All @@ -460,17 +460,17 @@
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Flush(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
// TODO: call exporter's force_flush method.
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to flush".into()))
Err(OTelSdkError::InternalFailure("Failed to flush".into()))
}
} else {
Err(MetricError::Other("Failed to flush".into()))
Err(OTelSdkError::InternalFailure("Failed to flush".into()))

Check warning on line 473 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L473

Added line #L473 was not covered by tests
}
}

Expand Down Expand Up @@ -515,7 +515,7 @@
self.inner.collect(rm)
}

fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}

Expand Down Expand Up @@ -546,7 +546,7 @@
error::{OTelSdkError, OTelSdkResult},
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality,
InMemoryMetricExporter, SdkMeterProvider, Temporality,
},
Resource,
};
Expand Down Expand Up @@ -592,7 +592,7 @@
}
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 595 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L595

Added line #L595 was not covered by tests
Ok(())
}

Expand All @@ -616,7 +616,7 @@
Ok(())
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 619 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L619

Added line #L619 was not covered by tests
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
#[derive(Debug)]
enum Message {
Export,
Flush(oneshot::Sender<MetricResult<()>>),
Flush(oneshot::Sender<OTelSdkResult>),
Shutdown(oneshot::Sender<OTelSdkResult>),
}

Expand Down Expand Up @@ -282,10 +282,7 @@
name: "PeriodicReader.ForceFlushCalled",
message = "Flush message received.",
);
let res = self
.collect_and_export()
.await
.map_err(|e| MetricError::Other(e.to_string()));
let res = self.collect_and_export().await;

Check warning on line 285 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L285

Added line #L285 was not covered by tests
if let Err(send_error) = ch.send(res) {
otel_debug!(
name: "PeriodicReader.Flush.SendResultError",
Expand Down Expand Up @@ -365,21 +362,24 @@
Ok(())
}

fn force_flush(&self) -> MetricResult<()> {
let mut inner = self.inner.lock()?;
fn force_flush(&self) -> OTelSdkResult {
let mut inner = self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

Check warning on line 369 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L365-L369

Added lines #L365 - L369 were not covered by tests
if inner.is_shutdown {
return Err(MetricError::Other("reader is shut down".into()));
return Err(OTelSdkError::AlreadyShutdown);

Check warning on line 371 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L371

Added line #L371 was not covered by tests
}
let (sender, receiver) = oneshot::channel();
inner
.message_sender
.try_send(Message::Flush(sender))
.map_err(|e| MetricError::Other(e.to_string()))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

Check warning on line 377 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L377

Added line #L377 was not covered by tests

drop(inner); // don't hold lock when blocking on future

futures_executor::block_on(receiver)
.map_err(|err| MetricError::Other(err.to_string()))
.map_err(|err| OTelSdkError::InternalFailure(err.to_string()))

Check warning on line 382 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L382

Added line #L382 was not covered by tests
.and_then(|res| res)
}

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use opentelemetry::{otel_debug, InstrumentationScope, KeyValue};

use crate::{
error::OTelSdkResult,
error::{OTelSdkError, OTelSdkResult},
metrics::{
aggregation,
data::{Metric, ResourceMetrics, ScopeMetrics},
Expand Down Expand Up @@ -90,7 +90,7 @@
}

/// Send accumulated telemetry
fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
self.reader.force_flush()
}

Expand Down Expand Up @@ -634,7 +634,7 @@
}

/// Force flush all pipelines
pub(crate) fn force_flush(&self) -> MetricResult<()> {
pub(crate) fn force_flush(&self) -> OTelSdkResult {
let mut errs = vec![];
for pipeline in &self.0 {
if let Err(err) = pipeline.force_flush() {
Expand All @@ -645,7 +645,7 @@
if errs.is_empty() {
Ok(())
} else {
Err(MetricError::Other(format!("{errs:?}")))
Err(OTelSdkError::InternalFailure(format!("{errs:?}")))

Check warning on line 648 in opentelemetry-sdk/src/metrics/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/pipeline.rs#L648

Added line #L648 was not covered by tests
}
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
fn force_flush(&self) -> MetricResult<()>;
fn force_flush(&self) -> OTelSdkResult;

/// Flushes all metric measurements held in an export pipeline and releases any
/// held computational resources.
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/testing/metrics/metric_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl MetricReader for TestMetricReader {
Ok(())
}

fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-stdout/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use core::{f64, fmt};
use opentelemetry_sdk::metrics::{MetricResult, Temporality};
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::{
error::OTelSdkResult,
metrics::{
Expand Down Expand Up @@ -60,7 +60,7 @@
}
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 63 in opentelemetry-stdout/src/metrics/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/metrics/exporter.rs#L63

Added line #L63 was not covered by tests
// exporter holds no state, nothing to flush
Ok(())
}
Expand Down