Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

## vNext

- *Breaking* Make `force_flush()` in `PushMetricExporter` synchronous
- *Breaking*: Make `force_flush()` in `PushMetricExporter` synchronous
- *Breaking*: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641)
for the changes.

## 0.28.0

Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
};

use super::{BatchConfig, LogProcessor};
use crate::runtime::{RuntimeChannel, TrySend};
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
use futures_channel::oneshot;
use futures_util::{
future::{self, Either},
Expand Down Expand Up @@ -126,13 +126,13 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
let inner_runtime = runtime.clone();

// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
runtime.spawn(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = inner_runtime
.interval(config.scheduled_delay)
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| BatchMessage::Flush(None));

let timeout_runtime = inner_runtime.clone();
let mut logs = Vec::new();
let mut messages = Box::pin(stream::select(message_receiver, ticker));
Expand Down Expand Up @@ -204,7 +204,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
}
}
}));
});
// Return batch processor with link to worker
BatchLogProcessor {
message_sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures_util::{
};
use opentelemetry::{otel_debug, otel_error};

use crate::runtime::Runtime;
use crate::runtime::{to_interval_stream, Runtime};
use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Expand Down Expand Up @@ -109,9 +109,8 @@ where
let worker = move |reader: &PeriodicReader<E>| {
let runtime = self.runtime.clone();
let reader = reader.clone();
self.runtime.spawn(Box::pin(async move {
let ticker = runtime
.interval(self.interval)
self.runtime.spawn(async move {
let ticker = to_interval_stream(runtime.clone(), self.interval)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| Message::Export);
let messages = Box::pin(stream::select(message_receiver, ticker));
Expand All @@ -126,7 +125,7 @@ where
}
.run(messages)
.await
}));
});
};

otel_debug!(
Expand Down
101 changes: 52 additions & 49 deletions opentelemetry-sdk/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,57 @@
//! [Tokio]: https://crates.io/crates/tokio
//! [async-std]: https://crates.io/crates/async-std

use futures_util::{future::BoxFuture, stream::Stream};
use futures_util::stream::{unfold, Stream};
use std::{fmt::Debug, future::Future, time::Duration};
use thiserror::Error;

/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
/// OpenTelemetry to work with any current and hopefully future runtime implementation.
/// OpenTelemetry to work with any current and hopefully future runtime implementations.
///
/// [Tokio]: https://crates.io/crates/tokio
/// [async-std]: https://crates.io/crates/async-std
///
/// # Note
///
/// OpenTelemetry expects a *multithreaded* runtime because its types can move across threads.
/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes
/// can implement this trait in a way that spawns the tasks on the same thread as the calling code.
#[cfg(feature = "experimental_async_runtime")]
pub trait Runtime: Clone + Send + Sync + 'static {
/// A future stream, which returns items in a previously specified interval. The item type is
/// not important.
type Interval: Stream + Send;

/// A future, which resolves after a previously specified amount of time. The output type is
/// not important.
type Delay: Future + Send + Unpin;

/// Create a [futures_util::stream::Stream], which returns a new item every
/// [std::time::Duration].
fn interval(&self, duration: Duration) -> Self::Interval;

/// Spawn a new task or thread, which executes the given future.
///
/// # Note
///
/// This is mainly used to run batch span processing in the background. Note, that the function
/// does not return a handle. OpenTelemetry will use a different way to wait for the future to
/// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
/// finish when the caller shuts down.
///
/// At the moment, the shutdown happens by blocking the
/// current thread. This means runtime implementations need to make sure they can still execute
/// the given future even if the main thread is blocked.
fn spawn(&self, future: BoxFuture<'static, ()>);
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;

/// Return a future that resolves after the specified [Duration].
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
}

/// Return a new future, which resolves after the specified [std::time::Duration].
fn delay(&self, duration: Duration) -> Self::Delay;
/// Uses the given runtime to produce an interval stream.
#[cfg(feature = "experimental_async_runtime")]
#[allow(dead_code)]
pub(crate) fn to_interval_stream<T: Runtime>(
runtime: T,
interval: Duration,
) -> impl Stream<Item = ()> {
unfold((), move |_| {
let runtime_cloned = runtime.clone();

async move {
runtime_cloned.delay(interval).await;
Some(((), ()))
}
})
}

/// Runtime implementation, which works with Tokio's multi thread runtime.
Expand All @@ -59,21 +74,17 @@ pub struct Tokio;
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
impl Runtime for Tokio {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;

fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(clippy::let_underscore_future)]
// we don't have to await on the returned future to execute
let _ = tokio::spawn(future);
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
tokio::time::sleep(duration)
}
}

Expand Down Expand Up @@ -104,14 +115,10 @@ pub struct TokioCurrentThread;
)))
)]
impl Runtime for TokioCurrentThread {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;

fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
// We cannot force push tracing in current thread tokio scheduler because we rely on
// BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
// shutdown function so that the runtime will not finish the blocked task and kill any
Expand All @@ -127,8 +134,8 @@ impl Runtime for TokioCurrentThread {
});
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
tokio::time::sleep(duration)
}
}

Expand All @@ -147,20 +154,16 @@ pub struct AsyncStd;
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
impl Runtime for AsyncStd {
type Interval = async_std::stream::Interval;
type Delay = BoxFuture<'static, ()>;

fn interval(&self, duration: Duration) -> Self::Interval {
async_std::stream::interval(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(clippy::let_underscore_future)]
let _ = async_std::task::spawn(future);
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(async_std::task::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
async_std::task::sleep(duration)
}
}

Expand Down Expand Up @@ -193,7 +196,7 @@ pub enum TrySendError {
/// Send failed due to the channel being closed.
#[error("cannot send message to batch processor as the channel is closed")]
ChannelClosed,
/// Any other send error that isnt covered above.
/// Any other send error that isn't covered above.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::RuntimeChannel;
use crate::runtime::{to_interval_stream, RuntimeChannel};
use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse;
use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner;
use crate::trace::{Sampler, ShouldSample};
Expand Down Expand Up @@ -189,8 +189,9 @@
C: HttpClient + 'static,
{
// todo: review if we need 'static here
let interval = runtime.interval(update_timeout);
runtime.spawn(Box::pin(async move {
let interval = to_interval_stream(runtime.clone(), update_timeout);

runtime.spawn(async move {

Check warning on line 194 in opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs#L192-L194

Added lines #L192 - L194 were not covered by tests
// either update or shutdown
let mut update = Box::pin(stream::select(
shutdown.map(|_| false),
Expand All @@ -216,7 +217,7 @@
break;
}
}
}));
});

Check warning on line 220 in opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs#L220

Added line #L220 was not covered by tests
}

async fn request_new_strategy<C>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::{OTelSdkError, OTelSdkResult};
use crate::resource::Resource;
use crate::runtime::{RuntimeChannel, TrySend};
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
use crate::trace::BatchConfig;
use crate::trace::Span;
use crate::trace::SpanProcessor;
Expand Down Expand Up @@ -308,13 +308,13 @@ impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
}

let export = self.exporter.export(self.spans.split_off(0));
let timeout = self.runtime.delay(self.config.max_export_timeout);
let time_out = self.config.max_export_timeout;
let timeout_future = Box::pin(self.runtime.delay(self.config.max_export_timeout));
let timeout = self.config.max_export_timeout;

Box::pin(async move {
match future::select(export, timeout).await {
match future::select(export, timeout_future).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
Either::Right((_, _)) => Err(OTelSdkError::Timeout(timeout)),
}
})
}
Expand Down Expand Up @@ -351,11 +351,10 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {

let inner_runtime = runtime.clone();
// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
runtime.spawn(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = inner_runtime
.interval(config.scheduled_delay)
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = inner_runtime.clone();
Expand All @@ -370,7 +369,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
};

processor.run(messages).await
}));
});

// Return batch processor with link to worker
BatchSpanProcessor {
Expand Down