diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 9c503dbfc9..ed4cfd098f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,13 +2,15 @@ ## vNext +- **Breaking**: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641) + for the changes. - Calls to `MeterProviderBuilder::with_resource`, `TracerProviderBuilder::with_resource`, `LoggerProviderBuilder::with_resource` are now additive ([#2677](https://github.com/open-telemetry/opentelemetry-rust/pull/2677)). - Moved `ExportError` trait from `opentelemetry::trace::ExportError` to `opentelemetry_sdk::export::ExportError` - Moved `TraceError` enum from `opentelemetry::trace::TraceError` to `opentelemetry_sdk::trace::TraceError` - Moved `TraceResult` type alias from `opentelemetry::trace::TraceResult` to `opentelemetry_sdk::trace::TraceResult` -- *Breaking*: Make `force_flush()` in `PushMetricExporter` synchronous -- **Breaking Change:** Updated the `SpanExporter` trait method signature: +- **Breaking**: Make `force_flush()` in `PushMetricExporter` synchronous +- **Breaking**: Updated the `SpanExporter` trait method signature: ```rust fn export(&mut self, batch: Vec) -> BoxFuture<'static, OTelSdkResult>; diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index bb7f7fa2ea..6b684f4d63 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -17,7 +17,7 @@ use std::{ }; use super::{BatchConfig, LogProcessor}; -use crate::runtime::{RuntimeChannel, TrySend}; +use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; use futures_channel::oneshot; use futures_util::{ future::{self, Either}, @@ -126,13 +126,13 @@ impl BatchLogProcessor { let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { + runtime.spawn(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) + let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| BatchMessage::Flush(None)); + let timeout_runtime = inner_runtime.clone(); let mut logs = Vec::new(); let mut messages = Box::pin(stream::select(message_receiver, ticker)); @@ -204,7 +204,7 @@ impl BatchLogProcessor { } } } - })); + }); // Return batch processor with link to worker BatchLogProcessor { message_sender, diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 5ba1de731f..77c18f76e8 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -13,7 +13,7 @@ use futures_util::{ }; use opentelemetry::{otel_debug, otel_error}; -use crate::runtime::Runtime; +use crate::runtime::{to_interval_stream, Runtime}; use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, @@ -109,9 +109,8 @@ where let worker = move |reader: &PeriodicReader| { let runtime = self.runtime.clone(); let reader = reader.clone(); - self.runtime.spawn(Box::pin(async move { - let ticker = runtime - .interval(self.interval) + self.runtime.spawn(async move { + let ticker = to_interval_stream(runtime.clone(), self.interval) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| Message::Export); let messages = Box::pin(stream::select(message_receiver, ticker)); @@ -126,7 +125,7 @@ where } .run(messages) .await - })); + }); }; otel_debug!( diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 00720e0892..815f089ac1 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -6,42 +6,57 @@ //! [Tokio]: https://crates.io/crates/tokio //! [async-std]: https://crates.io/crates/async-std -use futures_util::{future::BoxFuture, stream::Stream}; +use futures_util::stream::{unfold, Stream}; use std::{fmt::Debug, future::Future, time::Duration}; use thiserror::Error; /// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows -/// OpenTelemetry to work with any current and hopefully future runtime implementation. +/// OpenTelemetry to work with any current and hopefully future runtime implementations. /// /// [Tokio]: https://crates.io/crates/tokio /// [async-std]: https://crates.io/crates/async-std +/// +/// # Note +/// +/// OpenTelemetry expects a *multithreaded* runtime because its types can move across threads. +/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes +/// can implement this trait in a way that spawns the tasks on the same thread as the calling code. #[cfg(feature = "experimental_async_runtime")] pub trait Runtime: Clone + Send + Sync + 'static { - /// A future stream, which returns items in a previously specified interval. The item type is - /// not important. - type Interval: Stream + Send; - - /// A future, which resolves after a previously specified amount of time. The output type is - /// not important. - type Delay: Future + Send + Unpin; - - /// Create a [futures_util::stream::Stream], which returns a new item every - /// [std::time::Duration]. - fn interval(&self, duration: Duration) -> Self::Interval; - /// Spawn a new task or thread, which executes the given future. /// /// # Note /// /// This is mainly used to run batch span processing in the background. Note, that the function /// does not return a handle. OpenTelemetry will use a different way to wait for the future to - /// finish when TracerProvider gets shutdown. At the moment this happens by blocking the + /// finish when the caller shuts down. + /// + /// At the moment, the shutdown happens by blocking the /// current thread. This means runtime implementations need to make sure they can still execute /// the given future even if the main thread is blocked. - fn spawn(&self, future: BoxFuture<'static, ()>); + fn spawn(&self, future: F) + where + F: Future + Send + 'static; + + /// Return a future that resolves after the specified [Duration]. + fn delay(&self, duration: Duration) -> impl Future + Send + 'static; +} - /// Return a new future, which resolves after the specified [std::time::Duration]. - fn delay(&self, duration: Duration) -> Self::Delay; +/// Uses the given runtime to produce an interval stream. +#[cfg(feature = "experimental_async_runtime")] +#[allow(dead_code)] +pub(crate) fn to_interval_stream( + runtime: T, + interval: Duration, +) -> impl Stream { + unfold((), move |_| { + let runtime_cloned = runtime.clone(); + + async move { + runtime_cloned.delay(interval).await; + Some(((), ())) + } + }) } /// Runtime implementation, which works with Tokio's multi thread runtime. @@ -59,21 +74,17 @@ pub struct Tokio; doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) )] impl Runtime for Tokio { - type Interval = tokio_stream::wrappers::IntervalStream; - type Delay = ::std::pin::Pin>; - - fn interval(&self, duration: Duration) -> Self::Interval { - crate::util::tokio_interval_stream(duration) - } - - fn spawn(&self, future: BoxFuture<'static, ()>) { + fn spawn(&self, future: F) + where + F: Future + Send + 'static, + { #[allow(clippy::let_underscore_future)] // we don't have to await on the returned future to execute let _ = tokio::spawn(future); } - fn delay(&self, duration: Duration) -> Self::Delay { - Box::pin(tokio::time::sleep(duration)) + fn delay(&self, duration: Duration) -> impl Future + Send + 'static { + tokio::time::sleep(duration) } } @@ -104,14 +115,10 @@ pub struct TokioCurrentThread; ))) )] impl Runtime for TokioCurrentThread { - type Interval = tokio_stream::wrappers::IntervalStream; - type Delay = ::std::pin::Pin>; - - fn interval(&self, duration: Duration) -> Self::Interval { - crate::util::tokio_interval_stream(duration) - } - - fn spawn(&self, future: BoxFuture<'static, ()>) { + fn spawn(&self, future: F) + where + F: Future + Send + 'static, + { // We cannot force push tracing in current thread tokio scheduler because we rely on // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the // shutdown function so that the runtime will not finish the blocked task and kill any @@ -127,8 +134,8 @@ impl Runtime for TokioCurrentThread { }); } - fn delay(&self, duration: Duration) -> Self::Delay { - Box::pin(tokio::time::sleep(duration)) + fn delay(&self, duration: Duration) -> impl Future + Send + 'static { + tokio::time::sleep(duration) } } @@ -147,20 +154,16 @@ pub struct AsyncStd; doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) )] impl Runtime for AsyncStd { - type Interval = async_std::stream::Interval; - type Delay = BoxFuture<'static, ()>; - - fn interval(&self, duration: Duration) -> Self::Interval { - async_std::stream::interval(duration) - } - - fn spawn(&self, future: BoxFuture<'static, ()>) { + fn spawn(&self, future: F) + where + F: Future + Send + 'static, + { #[allow(clippy::let_underscore_future)] let _ = async_std::task::spawn(future); } - fn delay(&self, duration: Duration) -> Self::Delay { - Box::pin(async_std::task::sleep(duration)) + fn delay(&self, duration: Duration) -> impl Future + Send + 'static { + async_std::task::sleep(duration) } } @@ -193,7 +196,7 @@ pub enum TrySendError { /// Send failed due to the channel being closed. #[error("cannot send message to batch processor as the channel is closed")] ChannelClosed, - /// Any other send error that isnt covered above. + /// Any other send error that isn't covered above. #[error(transparent)] Other(#[from] Box), } diff --git a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs index d16e76e126..a24fb13b75 100644 --- a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs +++ b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs @@ -1,4 +1,4 @@ -use crate::runtime::RuntimeChannel; +use crate::runtime::{to_interval_stream, RuntimeChannel}; use crate::trace::error::TraceError; use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse; use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner; @@ -190,8 +190,9 @@ impl JaegerRemoteSampler { C: HttpClient + 'static, { // todo: review if we need 'static here - let interval = runtime.interval(update_timeout); - runtime.spawn(Box::pin(async move { + let interval = to_interval_stream(runtime.clone(), update_timeout); + + runtime.spawn(async move { // either update or shutdown let mut update = Box::pin(stream::select( shutdown.map(|_| false), @@ -217,7 +218,7 @@ impl JaegerRemoteSampler { break; } } - })); + }); } async fn request_new_strategy( diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 63e8d0498b..62b5de3fff 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -1,6 +1,6 @@ use crate::error::{OTelSdkError, OTelSdkResult}; use crate::resource::Resource; -use crate::runtime::{RuntimeChannel, TrySend}; +use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; use crate::trace::BatchConfig; use crate::trace::Span; use crate::trace::SpanProcessor; @@ -309,6 +309,7 @@ impl BatchSpanProcessorInternal { let export = self.exporter.export(self.spans.split_off(0)); let timeout = self.runtime.delay(self.config.max_export_timeout); let time_out = self.config.max_export_timeout; + pin_mut!(export); pin_mut!(timeout); @@ -353,11 +354,10 @@ impl BatchSpanProcessor { let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { + runtime.spawn(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) + let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| BatchMessage::Flush(None)); let timeout_runtime = inner_runtime.clone(); @@ -372,7 +372,7 @@ impl BatchSpanProcessor { }; processor.run(messages).await - })); + }); // Return batch processor with link to worker BatchSpanProcessor {