Skip to content

Commit a8d62c1

Browse files
authored
PeriodicReader::build make interval registration in same thread/task (#2133)
1 parent 3347bde commit a8d62c1

File tree

2 files changed

+21
-24
lines changed

2 files changed

+21
-24
lines changed

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -114,27 +114,26 @@ where
114114
let (message_sender, message_receiver) = mpsc::channel(256);
115115

116116
let worker = move |reader: &PeriodicReader| {
117-
let ticker = self
118-
.runtime
119-
.interval(self.interval)
120-
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
121-
.map(|_| Message::Export);
122-
123-
let messages = Box::pin(stream::select(message_receiver, ticker));
124-
125117
let runtime = self.runtime.clone();
126-
self.runtime.spawn(Box::pin(
118+
let reader = reader.clone();
119+
self.runtime.spawn(Box::pin(async move {
120+
let ticker = runtime
121+
.interval(self.interval)
122+
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
123+
.map(|_| Message::Export);
124+
let messages = Box::pin(stream::select(message_receiver, ticker));
127125
PeriodicReaderWorker {
128-
reader: reader.clone(),
126+
reader,
129127
timeout: self.timeout,
130128
runtime,
131129
rm: ResourceMetrics {
132130
resource: Resource::empty(),
133131
scope_metrics: Vec::new(),
134132
},
135133
}
136-
.run(messages),
137-
));
134+
.run(messages)
135+
.await
136+
}));
138137
};
139138

140139
PeriodicReader {
@@ -378,15 +377,15 @@ mod tests {
378377
metrics::data::ResourceMetrics, metrics::reader::MetricReader, metrics::SdkMeterProvider,
379378
runtime, testing::metrics::InMemoryMetricsExporter, Resource,
380379
};
381-
use opentelemetry::metrics::MeterProvider;
380+
use opentelemetry::metrics::{MeterProvider, MetricsError};
382381
use std::sync::mpsc;
383382

384-
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
385-
async fn registration_triggers_collection() {
383+
#[test]
384+
fn collection_triggered_by_interval() {
386385
// Arrange
387386
let interval = std::time::Duration::from_millis(1);
388387
let exporter = InMemoryMetricsExporter::default();
389-
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio)
388+
let reader = PeriodicReader::builder(exporter.clone(), runtime::TokioCurrentThread)
390389
.with_interval(interval)
391390
.build();
392391
let (sender, receiver) = mpsc::channel();
@@ -401,16 +400,14 @@ mod tests {
401400
})
402401
.init();
403402

404-
_ = meter_provider.force_flush();
405-
406403
// Assert
407404
receiver
408-
.try_recv()
405+
.recv()
409406
.expect("message should be available in channel, indicating a collection occurred");
410407
}
411408

412-
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
413-
async fn unregistered_collect() {
409+
#[test]
410+
fn unregistered_collect() {
414411
// Arrange
415412
let exporter = InMemoryMetricsExporter::default();
416413
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
@@ -423,6 +420,8 @@ mod tests {
423420
let result = reader.collect(&mut rm);
424421

425422
// Assert
426-
result.expect_err("error expected when reader is not registered");
423+
assert!(
424+
matches!(result.unwrap_err(), MetricsError::Other(err) if err == "reader is not registered")
425+
);
427426
}
428427
}

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,6 @@ fn aggregate_fn<T: Number>(
470470
agg: &aggregation::Aggregation,
471471
kind: InstrumentKind,
472472
) -> Result<Option<AggregateFns<T>>> {
473-
use aggregation::Aggregation;
474473
fn box_val<T>(
475474
(m, ca): (impl internal::Measure<T>, impl internal::ComputeAggregation),
476475
) -> (
@@ -544,7 +543,6 @@ fn aggregate_fn<T: Number>(
544543
/// | Gauge | ✓ | ✓ | | ✓ | ✓ |
545544
/// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |
546545
fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> {
547-
use aggregation::Aggregation;
548546
match agg {
549547
Aggregation::Default => Ok(()),
550548
Aggregation::ExplicitBucketHistogram { .. }

0 commit comments

Comments
 (0)