From a7290acc84aa53a4bfc2fea0ea8e76fe0d0b4707 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Tue, 11 Feb 2025 09:39:48 +0100 Subject: [PATCH 1/8] Refine runtime trait --- .../logs/log_processor_with_async_runtime.rs | 6 +- .../periodic_reader_with_async_runtime.rs | 5 +- opentelemetry-sdk/src/runtime.rs | 73 ++++++++----------- .../trace/sampler/jaeger_remote/sampler.rs | 5 +- .../span_processor_with_async_runtime.rs | 7 +- 5 files changed, 41 insertions(+), 55 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index 6727d21bb7..b50dc660d9 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -17,7 +17,7 @@ use std::{ }; use super::{BatchConfig, LogProcessor}; -use crate::runtime::{RuntimeChannel, TrySend}; +use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; use futures_channel::oneshot; use futures_util::{ future::{self, Either}, @@ -129,10 +129,10 @@ impl BatchLogProcessor { runtime.spawn(Box::pin(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) + let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| BatchMessage::Flush(None)); + let timeout_runtime = inner_runtime.clone(); let mut logs = Vec::new(); let mut messages = Box::pin(stream::select(message_receiver, ticker)); diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 5ba1de731f..5bc646bbbd 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -13,7 +13,7 @@ use futures_util::{ }; use opentelemetry::{otel_debug, otel_error}; -use crate::runtime::Runtime; +use crate::runtime::{to_interval_stream, Runtime}; use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, @@ -110,8 +110,7 @@ where let runtime = self.runtime.clone(); let reader = reader.clone(); self.runtime.spawn(Box::pin(async move { - let ticker = runtime - .interval(self.interval) + let ticker = to_interval_stream(runtime.clone(), self.interval) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| Message::Export); let messages = Box::pin(stream::select(message_receiver, ticker)); diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 00720e0892..7235aa000b 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -8,40 +8,48 @@ use futures_util::{future::BoxFuture, stream::Stream}; use std::{fmt::Debug, future::Future, time::Duration}; +use futures_util::stream::unfold; use thiserror::Error; /// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows -/// OpenTelemetry to work with any current and hopefully future runtime implementation. +/// OpenTelemetry to work with any current and hopefully future runtime implementations. /// /// [Tokio]: https://crates.io/crates/tokio /// [async-std]: https://crates.io/crates/async-std +/// +/// # Note +/// +/// OpenTelemetry expects a *multi-threaded* runtime because its types can move across threads. +/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes +/// can implement this trait in a way that spawns the tasks on the same thread as the calling code. #[cfg(feature = "experimental_async_runtime")] pub trait Runtime: Clone + Send + Sync + 'static { - /// A future stream, which returns items in a previously specified interval. The item type is - /// not important. - type Interval: Stream + Send; - - /// A future, which resolves after a previously specified amount of time. The output type is - /// not important. - type Delay: Future + Send + Unpin; - - /// Create a [futures_util::stream::Stream], which returns a new item every - /// [std::time::Duration]. - fn interval(&self, duration: Duration) -> Self::Interval; - /// Spawn a new task or thread, which executes the given future. /// /// # Note /// /// This is mainly used to run batch span processing in the background. Note, that the function /// does not return a handle. OpenTelemetry will use a different way to wait for the future to - /// finish when TracerProvider gets shutdown. At the moment this happens by blocking the + /// finish when `TracerProvider` gets shutdown. At the moment this happens by blocking the /// current thread. This means runtime implementations need to make sure they can still execute /// the given future even if the main thread is blocked. fn spawn(&self, future: BoxFuture<'static, ()>); /// Return a new future, which resolves after the specified [std::time::Duration]. - fn delay(&self, duration: Duration) -> Self::Delay; + fn delay(&self, duration: Duration) -> impl Future + Send + Sync + 'static; +} + +/// Uses the given runtime to produce an interval stream. +#[cfg(feature = "experimental_async_runtime")] +pub(crate) fn to_interval_stream(runtime: T, interval: Duration) -> impl Stream { + unfold((), move |_| { + let runtime_cloned = runtime.clone(); + + async move { + runtime_cloned.delay(interval).await; + Some(((), ())) + } + }) } /// Runtime implementation, which works with Tokio's multi thread runtime. @@ -59,21 +67,14 @@ pub struct Tokio; doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) )] impl Runtime for Tokio { - type Interval = tokio_stream::wrappers::IntervalStream; - type Delay = ::std::pin::Pin>; - - fn interval(&self, duration: Duration) -> Self::Interval { - crate::util::tokio_interval_stream(duration) - } - fn spawn(&self, future: BoxFuture<'static, ()>) { #[allow(clippy::let_underscore_future)] // we don't have to await on the returned future to execute let _ = tokio::spawn(future); } - fn delay(&self, duration: Duration) -> Self::Delay { - Box::pin(tokio::time::sleep(duration)) + fn delay(&self, duration: Duration) -> impl Future + Send + Sync + 'static { + tokio::time::sleep(duration) } } @@ -104,13 +105,6 @@ pub struct TokioCurrentThread; ))) )] impl Runtime for TokioCurrentThread { - type Interval = tokio_stream::wrappers::IntervalStream; - type Delay = ::std::pin::Pin>; - - fn interval(&self, duration: Duration) -> Self::Interval { - crate::util::tokio_interval_stream(duration) - } - fn spawn(&self, future: BoxFuture<'static, ()>) { // We cannot force push tracing in current thread tokio scheduler because we rely on // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the @@ -127,8 +121,8 @@ impl Runtime for TokioCurrentThread { }); } - fn delay(&self, duration: Duration) -> Self::Delay { - Box::pin(tokio::time::sleep(duration)) + fn delay(&self, duration: Duration) -> impl Future + Send + Sync + 'static { + tokio::time::sleep(duration) } } @@ -147,20 +141,13 @@ pub struct AsyncStd; doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) )] impl Runtime for AsyncStd { - type Interval = async_std::stream::Interval; - type Delay = BoxFuture<'static, ()>; - - fn interval(&self, duration: Duration) -> Self::Interval { - async_std::stream::interval(duration) - } - fn spawn(&self, future: BoxFuture<'static, ()>) { #[allow(clippy::let_underscore_future)] let _ = async_std::task::spawn(future); } - fn delay(&self, duration: Duration) -> Self::Delay { - Box::pin(async_std::task::sleep(duration)) + fn delay(&self, duration: Duration) -> impl Future + Send + Sync + 'static { + async_std::task::sleep(duration) } } @@ -193,7 +180,7 @@ pub enum TrySendError { /// Send failed due to the channel being closed. #[error("cannot send message to batch processor as the channel is closed")] ChannelClosed, - /// Any other send error that isnt covered above. + /// Any other send error that isn't covered above. #[error(transparent)] Other(#[from] Box), } diff --git a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs index 815abc828e..e2c9cfcc79 100644 --- a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs +++ b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs @@ -1,4 +1,4 @@ -use crate::runtime::RuntimeChannel; +use crate::runtime::{to_interval_stream, RuntimeChannel}; use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse; use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner; use crate::trace::{Sampler, ShouldSample}; @@ -189,7 +189,8 @@ impl JaegerRemoteSampler { C: HttpClient + 'static, { // todo: review if we need 'static here - let interval = runtime.interval(update_timeout); + let interval = to_interval_stream(runtime.clone(), update_timeout); + runtime.spawn(Box::pin(async move { // either update or shutdown let mut update = Box::pin(stream::select( diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 6cb2c18e8e..b9df19f390 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -1,6 +1,6 @@ use crate::error::{OTelSdkError, OTelSdkResult}; use crate::resource::Resource; -use crate::runtime::{RuntimeChannel, TrySend}; +use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; use crate::trace::BatchConfig; use crate::trace::Span; use crate::trace::SpanProcessor; @@ -308,7 +308,7 @@ impl BatchSpanProcessorInternal { } let export = self.exporter.export(self.spans.split_off(0)); - let timeout = self.runtime.delay(self.config.max_export_timeout); + let timeout = Box::pin(self.runtime.delay(self.config.max_export_timeout)); let time_out = self.config.max_export_timeout; Box::pin(async move { @@ -354,8 +354,7 @@ impl BatchSpanProcessor { runtime.spawn(Box::pin(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) + let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| BatchMessage::Flush(None)); let timeout_runtime = inner_runtime.clone(); From b26d1e49153d9957c7826cf312f9a25d34c0ff0b Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Tue, 11 Feb 2025 20:38:18 +0100 Subject: [PATCH 2/8] cleanup --- .../logs/log_processor_with_async_runtime.rs | 4 +-- .../periodic_reader_with_async_runtime.rs | 4 +-- opentelemetry-sdk/src/runtime.rs | 36 ++++++++++++------- .../trace/sampler/jaeger_remote/sampler.rs | 4 +-- .../span_processor_with_async_runtime.rs | 4 +-- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index b50dc660d9..9304095820 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -126,7 +126,7 @@ impl BatchLogProcessor { let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { + runtime.spawn(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) @@ -204,7 +204,7 @@ impl BatchLogProcessor { } } } - })); + }); // Return batch processor with link to worker BatchLogProcessor { message_sender, diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 5bc646bbbd..77c18f76e8 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -109,7 +109,7 @@ where let worker = move |reader: &PeriodicReader| { let runtime = self.runtime.clone(); let reader = reader.clone(); - self.runtime.spawn(Box::pin(async move { + self.runtime.spawn(async move { let ticker = to_interval_stream(runtime.clone(), self.interval) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| Message::Export); @@ -125,7 +125,7 @@ where } .run(messages) .await - })); + }); }; otel_debug!( diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 7235aa000b..90c2e351e8 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -6,9 +6,8 @@ //! [Tokio]: https://crates.io/crates/tokio //! [async-std]: https://crates.io/crates/async-std -use futures_util::{future::BoxFuture, stream::Stream}; use std::{fmt::Debug, future::Future, time::Duration}; -use futures_util::stream::unfold; +use futures_util::stream::{unfold, Stream}; use thiserror::Error; /// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows @@ -19,7 +18,7 @@ use thiserror::Error; /// /// # Note /// -/// OpenTelemetry expects a *multi-threaded* runtime because its types can move across threads. +/// OpenTelemetry expects a *multithreaded* runtime because its types can move across threads. /// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes /// can implement this trait in a way that spawns the tasks on the same thread as the calling code. #[cfg(feature = "experimental_async_runtime")] @@ -30,13 +29,15 @@ pub trait Runtime: Clone + Send + Sync + 'static { /// /// This is mainly used to run batch span processing in the background. Note, that the function /// does not return a handle. OpenTelemetry will use a different way to wait for the future to - /// finish when `TracerProvider` gets shutdown. At the moment this happens by blocking the + /// finish when the caller shuts down. + /// + /// At the moment, the shutdown happens by blocking the /// current thread. This means runtime implementations need to make sure they can still execute /// the given future even if the main thread is blocked. - fn spawn(&self, future: BoxFuture<'static, ()>); + fn spawn(&self, future: F) where F: Future + Send + 'static; - /// Return a new future, which resolves after the specified [std::time::Duration]. - fn delay(&self, duration: Duration) -> impl Future + Send + Sync + 'static; + /// Return a future that resolves after the specified [Duration]. + fn delay(&self, duration: Duration) -> impl Future + Send + 'static; } /// Uses the given runtime to produce an interval stream. @@ -67,13 +68,16 @@ pub struct Tokio; doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) )] impl Runtime for Tokio { - fn spawn(&self, future: BoxFuture<'static, ()>) { + fn spawn(&self, future: F) + where + F: Future + Send + 'static + { #[allow(clippy::let_underscore_future)] // we don't have to await on the returned future to execute let _ = tokio::spawn(future); } - fn delay(&self, duration: Duration) -> impl Future + Send + Sync + 'static { + fn delay(&self, duration: Duration) -> impl Future + Send + 'static { tokio::time::sleep(duration) } } @@ -105,7 +109,10 @@ pub struct TokioCurrentThread; ))) )] impl Runtime for TokioCurrentThread { - fn spawn(&self, future: BoxFuture<'static, ()>) { + fn spawn(&self, future: F) + where + F: Future + Send + 'static + { // We cannot force push tracing in current thread tokio scheduler because we rely on // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the // shutdown function so that the runtime will not finish the blocked task and kill any @@ -121,7 +128,7 @@ impl Runtime for TokioCurrentThread { }); } - fn delay(&self, duration: Duration) -> impl Future + Send + Sync + 'static { + fn delay(&self, duration: Duration) -> impl Future + Send + 'static { tokio::time::sleep(duration) } } @@ -141,12 +148,15 @@ pub struct AsyncStd; doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) )] impl Runtime for AsyncStd { - fn spawn(&self, future: BoxFuture<'static, ()>) { + fn spawn(&self, future: F) + where + F: Future + Send + 'static + { #[allow(clippy::let_underscore_future)] let _ = async_std::task::spawn(future); } - fn delay(&self, duration: Duration) -> impl Future + Send + Sync + 'static { + fn delay(&self, duration: Duration) -> impl Future + Send + 'static { async_std::task::sleep(duration) } } diff --git a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs index e2c9cfcc79..8f1ed4f55d 100644 --- a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs +++ b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs @@ -191,7 +191,7 @@ impl JaegerRemoteSampler { // todo: review if we need 'static here let interval = to_interval_stream(runtime.clone(), update_timeout); - runtime.spawn(Box::pin(async move { + runtime.spawn(async move { // either update or shutdown let mut update = Box::pin(stream::select( shutdown.map(|_| false), @@ -217,7 +217,7 @@ impl JaegerRemoteSampler { break; } } - })); + }); } async fn request_new_strategy( diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index b9df19f390..f89f5be8b7 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -351,7 +351,7 @@ impl BatchSpanProcessor { let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { + runtime.spawn(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) @@ -369,7 +369,7 @@ impl BatchSpanProcessor { }; processor.run(messages).await - })); + }); // Return batch processor with link to worker BatchSpanProcessor { From cf4f15c0251e26568754f40e8845be4e94fa2d62 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Tue, 11 Feb 2025 21:01:02 +0100 Subject: [PATCH 3/8] cleanup --- .../src/trace/span_processor_with_async_runtime.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index f89f5be8b7..94f9f1af93 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -308,13 +308,13 @@ impl BatchSpanProcessorInternal { } let export = self.exporter.export(self.spans.split_off(0)); - let timeout = Box::pin(self.runtime.delay(self.config.max_export_timeout)); - let time_out = self.config.max_export_timeout; + let timeout_future = Box::pin(self.runtime.delay(self.config.max_export_timeout)); + let timeout = self.config.max_export_timeout; Box::pin(async move { - match future::select(export, timeout).await { + match future::select(export, timeout_future).await { Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)), + Either::Right((_, _)) => Err(OTelSdkError::Timeout(timeout)), } }) } From 2e2937ef5d87577f86511363a6eac6fc28ca9f61 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Wed, 12 Feb 2025 09:42:08 +0100 Subject: [PATCH 4/8] cleanup --- opentelemetry-sdk/src/runtime.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 90c2e351e8..5b994da037 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -6,8 +6,8 @@ //! [Tokio]: https://crates.io/crates/tokio //! [async-std]: https://crates.io/crates/async-std -use std::{fmt::Debug, future::Future, time::Duration}; use futures_util::stream::{unfold, Stream}; +use std::{fmt::Debug, future::Future, time::Duration}; use thiserror::Error; /// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows @@ -34,7 +34,9 @@ pub trait Runtime: Clone + Send + Sync + 'static { /// At the moment, the shutdown happens by blocking the /// current thread. This means runtime implementations need to make sure they can still execute /// the given future even if the main thread is blocked. - fn spawn(&self, future: F) where F: Future + Send + 'static; + fn spawn(&self, future: F) + where + F: Future + Send + 'static; /// Return a future that resolves after the specified [Duration]. fn delay(&self, duration: Duration) -> impl Future + Send + 'static; @@ -42,7 +44,10 @@ pub trait Runtime: Clone + Send + Sync + 'static { /// Uses the given runtime to produce an interval stream. #[cfg(feature = "experimental_async_runtime")] -pub(crate) fn to_interval_stream(runtime: T, interval: Duration) -> impl Stream { +pub(crate) fn to_interval_stream( + runtime: T, + interval: Duration, +) -> impl Stream { unfold((), move |_| { let runtime_cloned = runtime.clone(); @@ -70,7 +75,7 @@ pub struct Tokio; impl Runtime for Tokio { fn spawn(&self, future: F) where - F: Future + Send + 'static + F: Future + Send + 'static, { #[allow(clippy::let_underscore_future)] // we don't have to await on the returned future to execute @@ -111,7 +116,7 @@ pub struct TokioCurrentThread; impl Runtime for TokioCurrentThread { fn spawn(&self, future: F) where - F: Future + Send + 'static + F: Future + Send + 'static, { // We cannot force push tracing in current thread tokio scheduler because we rely on // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the @@ -150,7 +155,7 @@ pub struct AsyncStd; impl Runtime for AsyncStd { fn spawn(&self, future: F) where - F: Future + Send + 'static + F: Future + Send + 'static, { #[allow(clippy::let_underscore_future)] let _ = async_std::task::spawn(future); From 3f8ca2c80009f76cfdaf93d1ed1256e19f5f2834 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Wed, 12 Feb 2025 09:46:19 +0100 Subject: [PATCH 5/8] fixes --- opentelemetry-sdk/src/runtime.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 5b994da037..815f089ac1 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -44,6 +44,7 @@ pub trait Runtime: Clone + Send + Sync + 'static { /// Uses the given runtime to produce an interval stream. #[cfg(feature = "experimental_async_runtime")] +#[allow(dead_code)] pub(crate) fn to_interval_stream( runtime: T, interval: Duration, From 9987869cb99ab18148abf8b9ff42ae3685efc955 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Mon, 17 Feb 2025 10:21:43 +0100 Subject: [PATCH 6/8] use single threaded types for OTel metrics --- opentelemetry-sdk/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 0026252710..6611b97018 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -3,6 +3,8 @@ ## vNext - *Breaking* Make `force_flush()` in `PushMetricExporter` synchronous +- Breaking: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641) + for the changes. ## 0.28.0 From 9f082a7c80d3730559548f486ae6c6949490b35a Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Mon, 17 Feb 2025 10:23:04 +0100 Subject: [PATCH 7/8] cleanup changelog --- opentelemetry-sdk/CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 6611b97018..fb847e9eb7 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,8 +2,8 @@ ## vNext -- *Breaking* Make `force_flush()` in `PushMetricExporter` synchronous -- Breaking: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641) +- *Breaking*: Make `force_flush()` in `PushMetricExporter` synchronous +- *Breaking*: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641) for the changes. ## 0.28.0 From 2b9931bf7ce255837aa7d85aadfe2b1c5008bdcb Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Thu, 27 Feb 2025 15:47:40 +0100 Subject: [PATCH 8/8] fixes --- .../trace/span_processor_with_async_runtime.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 1de50ac73c..62b5de3fff 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -307,15 +307,16 @@ impl BatchSpanProcessorInternal { } let export = self.exporter.export(self.spans.split_off(0)); - let timeout_future = Box::pin(self.runtime.delay(self.config.max_export_timeout)); - let timeout = self.config.max_export_timeout; + let timeout = self.runtime.delay(self.config.max_export_timeout); + let time_out = self.config.max_export_timeout; - Box::pin(async move { - match future::select(export, timeout_future).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => Err(OTelSdkError::Timeout(timeout)), - } - }) + pin_mut!(export); + pin_mut!(timeout); + + match future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)), + } } async fn run(mut self, mut messages: impl FusedStream + Unpin) {