Skip to content

Commit 4a76c96

Browse files
authored
Use OTelSdkResult for metric flush (#2606)
1 parent 4783b64 commit 4a76c96

File tree

12 files changed

+35
-41
lines changed

12 files changed

+35
-41
lines changed

opentelemetry-otlp/src/metric.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ impl PushMetricExporter for MetricExporter {
145145
self.client.export(metrics).await
146146
}
147147

148-
async fn force_flush(&self) -> MetricResult<()> {
148+
async fn force_flush(&self) -> OTelSdkResult {
149149
// this component is stateless
150150
Ok(())
151151
}

opentelemetry-sdk/benches/metric.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl MetricReader for SharedReader {
2828
self.0.collect(rm)
2929
}
3030

31-
fn force_flush(&self) -> MetricResult<()> {
31+
fn force_flush(&self) -> OTelSdkResult {
3232
self.0.force_flush()
3333
}
3434

opentelemetry-sdk/src/metrics/exporter.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
use async_trait::async_trait;
33

44
use crate::error::OTelSdkResult;
5-
use crate::metrics::MetricResult;
65

76
use crate::metrics::data::ResourceMetrics;
87

@@ -21,7 +20,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
2120
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
2221

2322
/// Flushes any metric data held by an exporter.
24-
async fn force_flush(&self) -> MetricResult<()>;
23+
async fn force_flush(&self) -> OTelSdkResult;
2524

2625
/// Releases any held computational resources.
2726
///

opentelemetry-sdk/src/metrics/in_memory_exporter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ impl PushMetricExporter for InMemoryMetricExporter {
274274
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
275275
}
276276

277-
async fn force_flush(&self) -> MetricResult<()> {
277+
async fn force_flush(&self) -> OTelSdkResult {
278278
Ok(()) // In this implementation, flush does nothing
279279
}
280280

opentelemetry-sdk/src/metrics/manual_reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl MetricReader for ManualReader {
105105
}
106106

107107
/// ForceFlush is a no-op, it always returns nil.
108-
fn force_flush(&self) -> MetricResult<()> {
108+
fn force_flush(&self) -> OTelSdkResult {
109109
Ok(())
110110
}
111111

opentelemetry-sdk/src/metrics/meter_provider.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,8 @@ use opentelemetry::{
1212
otel_debug, otel_error, otel_info, InstrumentationScope,
1313
};
1414

15+
use crate::error::OTelSdkResult;
1516
use crate::Resource;
16-
use crate::{
17-
error::OTelSdkResult,
18-
metrics::{MetricError, MetricResult},
19-
};
2017

2118
use super::{
2219
exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
@@ -96,7 +93,7 @@ impl SdkMeterProvider {
9693
/// Ok(())
9794
/// }
9895
/// ```
99-
pub fn force_flush(&self) -> MetricResult<()> {
96+
pub fn force_flush(&self) -> OTelSdkResult {
10097
self.inner.force_flush()
10198
}
10299

@@ -122,14 +119,12 @@ impl SdkMeterProvider {
122119
}
123120

124121
impl SdkMeterProviderInner {
125-
fn force_flush(&self) -> MetricResult<()> {
122+
fn force_flush(&self) -> OTelSdkResult {
126123
if self
127124
.shutdown_invoked
128125
.load(std::sync::atomic::Ordering::Relaxed)
129126
{
130-
Err(MetricError::Other(
131-
"Cannot perform flush as MeterProvider shutdown already invoked.".into(),
132-
))
127+
Err(crate::error::OTelSdkError::AlreadyShutdown)
133128
} else {
134129
self.pipes.force_flush()
135130
}

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ impl PeriodicReaderInner {
443443
futures_executor::block_on(self.exporter.export(&mut rm))
444444
}
445445

446-
fn force_flush(&self) -> MetricResult<()> {
446+
fn force_flush(&self) -> OTelSdkResult {
447447
// TODO: Better message for this scenario.
448448
// Flush and Shutdown called from 2 threads Flush check shutdown
449449
// flag before shutdown thread sets it. Both threads attempt to send
@@ -460,17 +460,17 @@ impl PeriodicReaderInner {
460460
let (response_tx, response_rx) = mpsc::channel();
461461
self.message_sender
462462
.send(Message::Flush(response_tx))
463-
.map_err(|e| MetricError::Other(e.to_string()))?;
463+
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
464464

465465
if let Ok(response) = response_rx.recv() {
466466
// TODO: call exporter's force_flush method.
467467
if response {
468468
Ok(())
469469
} else {
470-
Err(MetricError::Other("Failed to flush".into()))
470+
Err(OTelSdkError::InternalFailure("Failed to flush".into()))
471471
}
472472
} else {
473-
Err(MetricError::Other("Failed to flush".into()))
473+
Err(OTelSdkError::InternalFailure("Failed to flush".into()))
474474
}
475475
}
476476

@@ -515,7 +515,7 @@ impl MetricReader for PeriodicReader {
515515
self.inner.collect(rm)
516516
}
517517

518-
fn force_flush(&self) -> MetricResult<()> {
518+
fn force_flush(&self) -> OTelSdkResult {
519519
self.inner.force_flush()
520520
}
521521

@@ -546,7 +546,7 @@ mod tests {
546546
error::{OTelSdkError, OTelSdkResult},
547547
metrics::{
548548
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
549-
InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality,
549+
InMemoryMetricExporter, SdkMeterProvider, Temporality,
550550
},
551551
Resource,
552552
};
@@ -592,7 +592,7 @@ mod tests {
592592
}
593593
}
594594

595-
async fn force_flush(&self) -> MetricResult<()> {
595+
async fn force_flush(&self) -> OTelSdkResult {
596596
Ok(())
597597
}
598598

@@ -616,7 +616,7 @@ mod tests {
616616
Ok(())
617617
}
618618

619-
async fn force_flush(&self) -> MetricResult<()> {
619+
async fn force_flush(&self) -> OTelSdkResult {
620620
Ok(())
621621
}
622622

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ struct PeriodicReaderInner {
216216
#[derive(Debug)]
217217
enum Message {
218218
Export,
219-
Flush(oneshot::Sender<MetricResult<()>>),
219+
Flush(oneshot::Sender<OTelSdkResult>),
220220
Shutdown(oneshot::Sender<OTelSdkResult>),
221221
}
222222

@@ -282,10 +282,7 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
282282
name: "PeriodicReader.ForceFlushCalled",
283283
message = "Flush message received.",
284284
);
285-
let res = self
286-
.collect_and_export()
287-
.await
288-
.map_err(|e| MetricError::Other(e.to_string()));
285+
let res = self.collect_and_export().await;
289286
if let Err(send_error) = ch.send(res) {
290287
otel_debug!(
291288
name: "PeriodicReader.Flush.SendResultError",
@@ -365,21 +362,24 @@ impl MetricReader for PeriodicReader {
365362
Ok(())
366363
}
367364

368-
fn force_flush(&self) -> MetricResult<()> {
369-
let mut inner = self.inner.lock()?;
365+
fn force_flush(&self) -> OTelSdkResult {
366+
let mut inner = self
367+
.inner
368+
.lock()
369+
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
370370
if inner.is_shutdown {
371-
return Err(MetricError::Other("reader is shut down".into()));
371+
return Err(OTelSdkError::AlreadyShutdown);
372372
}
373373
let (sender, receiver) = oneshot::channel();
374374
inner
375375
.message_sender
376376
.try_send(Message::Flush(sender))
377-
.map_err(|e| MetricError::Other(e.to_string()))?;
377+
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
378378

379379
drop(inner); // don't hold lock when blocking on future
380380

381381
futures_executor::block_on(receiver)
382-
.map_err(|err| MetricError::Other(err.to_string()))
382+
.map_err(|err| OTelSdkError::InternalFailure(err.to_string()))
383383
.and_then(|res| res)
384384
}
385385

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
use opentelemetry::{otel_debug, InstrumentationScope, KeyValue};
99

1010
use crate::{
11-
error::OTelSdkResult,
11+
error::{OTelSdkError, OTelSdkResult},
1212
metrics::{
1313
aggregation,
1414
data::{Metric, ResourceMetrics, ScopeMetrics},
@@ -90,7 +90,7 @@ impl Pipeline {
9090
}
9191

9292
/// Send accumulated telemetry
93-
fn force_flush(&self) -> MetricResult<()> {
93+
fn force_flush(&self) -> OTelSdkResult {
9494
self.reader.force_flush()
9595
}
9696

@@ -634,7 +634,7 @@ impl Pipelines {
634634
}
635635

636636
/// Force flush all pipelines
637-
pub(crate) fn force_flush(&self) -> MetricResult<()> {
637+
pub(crate) fn force_flush(&self) -> OTelSdkResult {
638638
let mut errs = vec![];
639639
for pipeline in &self.0 {
640640
if let Err(err) = pipeline.force_flush() {
@@ -645,7 +645,7 @@ impl Pipelines {
645645
if errs.is_empty() {
646646
Ok(())
647647
} else {
648-
Err(MetricError::Other(format!("{errs:?}")))
648+
Err(OTelSdkError::InternalFailure(format!("{errs:?}")))
649649
}
650650
}
651651

opentelemetry-sdk/src/metrics/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
3636
///
3737
/// There is no guaranteed that all telemetry be flushed or all resources have
3838
/// been released on error.
39-
fn force_flush(&self) -> MetricResult<()>;
39+
fn force_flush(&self) -> OTelSdkResult;
4040

4141
/// Flushes all metric measurements held in an export pipeline and releases any
4242
/// held computational resources.

0 commit comments

Comments
 (0)