Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
self.client.export(metrics).await
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 148 in opentelemetry-otlp/src/metric.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/metric.rs#L148

Added line #L148 was not covered by tests
// this component is stateless
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use async_trait::async_trait;

use crate::error::OTelSdkResult;
use crate::metrics::MetricResult;

use crate::metrics::data::ResourceMetrics;

Expand All @@ -21,7 +20,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;

/// Flushes any metric data held by an exporter.
async fn force_flush(&self) -> MetricResult<()>;
async fn force_flush(&self) -> OTelSdkResult;

/// Releases any held computational resources.
///
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 277 in opentelemetry-sdk/src/metrics/in_memory_exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/in_memory_exporter.rs#L277

Added line #L277 was not covered by tests
Ok(()) // In this implementation, flush does nothing
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/manual_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
}

/// ForceFlush is a no-op, it always returns nil.
fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {

Check warning on line 108 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L108

Added line #L108 was not covered by tests
Ok(())
}

Expand Down
13 changes: 4 additions & 9 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ use opentelemetry::{
otel_debug, otel_error, otel_info, InstrumentationScope,
};

use crate::error::OTelSdkResult;
use crate::Resource;
use crate::{
error::OTelSdkResult,
metrics::{MetricError, MetricResult},
};

use super::{
exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
Expand Down Expand Up @@ -96,7 +93,7 @@ impl SdkMeterProvider {
/// Ok(())
/// }
/// ```
pub fn force_flush(&self) -> MetricResult<()> {
pub fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}

Expand All @@ -122,14 +119,12 @@ impl SdkMeterProvider {
}

impl SdkMeterProviderInner {
fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
if self
.shutdown_invoked
.load(std::sync::atomic::Ordering::Relaxed)
{
Err(MetricError::Other(
"Cannot perform flush as MeterProvider shutdown already invoked.".into(),
))
Err(crate::error::OTelSdkError::AlreadyShutdown)
} else {
self.pipes.force_flush()
}
Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@
futures_executor::block_on(self.exporter.export(&mut rm))
}

fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
// TODO: Better message for this scenario.
// Flush and Shutdown called from 2 threads Flush check shutdown
// flag before shutdown thread sets it. Both threads attempt to send
Expand All @@ -460,17 +460,17 @@
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Flush(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
// TODO: call exporter's force_flush method.
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to flush".into()))
Err(OTelSdkError::InternalFailure("Failed to flush".into()))
}
} else {
Err(MetricError::Other("Failed to flush".into()))
Err(OTelSdkError::InternalFailure("Failed to flush".into()))

Check warning on line 473 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L473

Added line #L473 was not covered by tests
}
}

Expand Down Expand Up @@ -515,7 +515,7 @@
self.inner.collect(rm)
}

fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}

Expand Down Expand Up @@ -546,7 +546,7 @@
error::{OTelSdkError, OTelSdkResult},
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality,
InMemoryMetricExporter, SdkMeterProvider, Temporality,
},
Resource,
};
Expand Down Expand Up @@ -592,7 +592,7 @@
}
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 595 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L595

Added line #L595 was not covered by tests
Ok(())
}

Expand All @@ -616,7 +616,7 @@
Ok(())
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 619 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L619

Added line #L619 was not covered by tests
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
#[derive(Debug)]
enum Message {
Export,
Flush(oneshot::Sender<MetricResult<()>>),
Flush(oneshot::Sender<OTelSdkResult>),
Shutdown(oneshot::Sender<OTelSdkResult>),
}

Expand Down Expand Up @@ -282,10 +282,7 @@
name: "PeriodicReader.ForceFlushCalled",
message = "Flush message received.",
);
let res = self
.collect_and_export()
.await
.map_err(|e| MetricError::Other(e.to_string()));
let res = self.collect_and_export().await;

Check warning on line 285 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L285

Added line #L285 was not covered by tests
if let Err(send_error) = ch.send(res) {
otel_debug!(
name: "PeriodicReader.Flush.SendResultError",
Expand Down Expand Up @@ -365,21 +362,24 @@
Ok(())
}

fn force_flush(&self) -> MetricResult<()> {
let mut inner = self.inner.lock()?;
fn force_flush(&self) -> OTelSdkResult {
let mut inner = self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

Check warning on line 369 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L365-L369

Added lines #L365 - L369 were not covered by tests
if inner.is_shutdown {
return Err(MetricError::Other("reader is shut down".into()));
return Err(OTelSdkError::AlreadyShutdown);

Check warning on line 371 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L371

Added line #L371 was not covered by tests
}
let (sender, receiver) = oneshot::channel();
inner
.message_sender
.try_send(Message::Flush(sender))
.map_err(|e| MetricError::Other(e.to_string()))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

Check warning on line 377 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L377

Added line #L377 was not covered by tests

drop(inner); // don't hold lock when blocking on future

futures_executor::block_on(receiver)
.map_err(|err| MetricError::Other(err.to_string()))
.map_err(|err| OTelSdkError::InternalFailure(err.to_string()))

Check warning on line 382 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L382

Added line #L382 was not covered by tests
.and_then(|res| res)
}

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use opentelemetry::{otel_debug, InstrumentationScope, KeyValue};

use crate::{
error::OTelSdkResult,
error::{OTelSdkError, OTelSdkResult},
metrics::{
aggregation,
data::{Metric, ResourceMetrics, ScopeMetrics},
Expand Down Expand Up @@ -90,7 +90,7 @@
}

/// Send accumulated telemetry
fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
self.reader.force_flush()
}

Expand Down Expand Up @@ -634,7 +634,7 @@
}

/// Force flush all pipelines
pub(crate) fn force_flush(&self) -> MetricResult<()> {
pub(crate) fn force_flush(&self) -> OTelSdkResult {
let mut errs = vec![];
for pipeline in &self.0 {
if let Err(err) = pipeline.force_flush() {
Expand All @@ -645,7 +645,7 @@
if errs.is_empty() {
Ok(())
} else {
Err(MetricError::Other(format!("{errs:?}")))
Err(OTelSdkError::InternalFailure(format!("{errs:?}")))

Check warning on line 648 in opentelemetry-sdk/src/metrics/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/pipeline.rs#L648

Added line #L648 was not covered by tests
}
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
fn force_flush(&self) -> MetricResult<()>;
fn force_flush(&self) -> OTelSdkResult;

/// Flushes all metric measurements held in an export pipeline and releases any
/// held computational resources.
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/testing/metrics/metric_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl MetricReader for TestMetricReader {
Ok(())
}

fn force_flush(&self) -> MetricResult<()> {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-stdout/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use core::{f64, fmt};
use opentelemetry_sdk::metrics::{MetricResult, Temporality};
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::{
error::OTelSdkResult,
metrics::{
Expand Down Expand Up @@ -60,7 +60,7 @@
}
}

async fn force_flush(&self) -> MetricResult<()> {
async fn force_flush(&self) -> OTelSdkResult {

Check warning on line 63 in opentelemetry-stdout/src/metrics/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/metrics/exporter.rs#L63

Added line #L63 was not covered by tests
// exporter holds no state, nothing to flush
Ok(())
}
Expand Down
Loading