Skip to content

Commit 344a954

Browse files
committed
Add rt-tokio-with-wasm feature
1 parent 60f6709 commit 344a954

File tree

3 files changed

+79
-5
lines changed

3 files changed

+79
-5
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ criterion = "0.5"
2525
futures-core = "0.3"
2626
futures-executor = "0.3"
2727
futures-util = { version = "0.3", default-features = false }
28+
futures-timer = "3"
2829
http = { version = "1.1", default-features = false, features = ["std"] }
2930
http-body-util = "0.1"
3031
hyper = { version = "1.3", default-features = false }
@@ -46,6 +47,7 @@ tonic = { version = "0.14.1", default-features = false }
4647
tonic-prost-build = "0.14.1"
4748
tonic-prost = "0.14.1"
4849
tokio = { version = "1", default-features = false }
50+
tokio_with_wasm = { version = "0.8.7", default-features = false }
4951
tokio-stream = "0.1"
5052
# Using `tracing 0.1.40` because 0.1.39 (which is yanked) introduces the ability to set event names in macros,
5153
# required for OpenTelemetry's internal logging macros.

opentelemetry-sdk/Cargo.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,22 @@ rust-version = "1.75.0"
1111
autobenches = false
1212

1313
[dependencies]
14-
opentelemetry = { workspace = true }
15-
opentelemetry-http = { workspace = true, optional = true }
1614
futures-channel = { workspace = true }
1715
futures-executor = { workspace = true }
16+
futures-timer = { workspace = true, optional = true }
1817
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
18+
http = { workspace = true, optional = true }
19+
opentelemetry = { workspace = true }
20+
opentelemetry-http = { workspace = true, optional = true }
1921
percent-encoding = { workspace = true, optional = true }
2022
rand = { workspace = true, features = ["std", "std_rng", "small_rng", "os_rng", "thread_rng"], optional = true }
2123
serde = { workspace = true, features = ["derive", "rc"], optional = true }
2224
serde_json = { workspace = true, optional = true }
2325
thiserror = { workspace = true }
24-
url = { workspace = true, optional = true }
2526
tokio = { workspace = true, default-features = false, optional = true }
2627
tokio-stream = { workspace = true, optional = true }
27-
http = { workspace = true, optional = true }
28+
tokio_with_wasm = { workspace = true, default-features = false, optional = true }
29+
url = { workspace = true, optional = true }
2830

2931
[target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dependencies]
3032
getrandom = { workspace = true, features = ["wasm_js"] }
@@ -52,6 +54,7 @@ testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-
5254
experimental_async_runtime = []
5355
rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
5456
rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
57+
rt-tokio-with-wasm = ["tokio_with_wasm/rt", "tokio_with_wasm/sync", "tokio_with_wasm/time", "tokio-stream", "experimental_async_runtime", "futures-timer"]
5558
internal-logs = ["opentelemetry/internal-logs"]
5659
experimental_metrics_periodicreader_with_async_runtime = ["metrics", "experimental_async_runtime"]
5760
spec_unstable_metrics_views = ["metrics"]

opentelemetry-sdk/src/runtime.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,35 @@ impl Runtime for TokioCurrentThread {
137137
}
138138
}
139139

140+
/// Runtime implementation, which works with Tokio's multi thread runtime.
141+
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio-with-wasm"))]
142+
#[cfg_attr(
143+
docsrs,
144+
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio-with-wasm")))
145+
)]
146+
#[derive(Debug, Clone)]
147+
pub struct TokioWithWasm;
148+
149+
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio-with-wasm"))]
150+
#[cfg_attr(
151+
docsrs,
152+
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio-with-wasm")))
153+
)]
154+
impl Runtime for TokioWithWasm {
155+
fn spawn<F>(&self, future: F)
156+
where
157+
F: Future<Output = ()> + Send + 'static,
158+
{
159+
#[allow(clippy::let_underscore_future)]
160+
// we don't have to await on the returned future to execute
161+
let _ = tokio_with_wasm::alias::spawn(future);
162+
}
163+
164+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
165+
futures_timer::Delay::new(duration)
166+
}
167+
}
168+
140169
/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a
141170
/// channel that is used by the [log] and [span] batch processors.
142171
///
@@ -185,7 +214,7 @@ pub trait TrySend: Sync + Send {
185214

186215
#[cfg(all(
187216
feature = "experimental_async_runtime",
188-
any(feature = "rt-tokio", feature = "rt-tokio-current-thread")
217+
any(feature = "rt-tokio", feature = "rt-tokio-current-thread",)
189218
))]
190219
impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
191220
type Message = T;
@@ -198,6 +227,25 @@ impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
198227
}
199228
}
200229

230+
#[cfg(all(
231+
feature = "experimental_async_runtime",
232+
any(feature = "rt-tokio-with-wasm")
233+
))]
234+
impl<T: Send> TrySend for tokio_with_wasm::alias::sync::mpsc::Sender<T> {
235+
type Message = T;
236+
237+
fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
238+
self.try_send(item).map_err(|err| match err {
239+
tokio_with_wasm::alias::sync::mpsc::error::TrySendError::Full(_) => {
240+
TrySendError::ChannelFull
241+
}
242+
tokio_with_wasm::alias::sync::mpsc::error::TrySendError::Closed(_) => {
243+
TrySendError::ChannelClosed
244+
}
245+
})
246+
}
247+
}
248+
201249
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
202250
#[cfg_attr(
203251
docsrs,
@@ -245,3 +293,24 @@ impl RuntimeChannel for TokioCurrentThread {
245293
)
246294
}
247295
}
296+
297+
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio-with-wasm"))]
298+
#[cfg_attr(
299+
docsrs,
300+
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio-with-wasm")))
301+
)]
302+
impl RuntimeChannel for TokioWithWasm {
303+
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
304+
type Sender<T: Debug + Send> = tokio_with_wasm::alias::sync::mpsc::Sender<T>;
305+
306+
fn batch_message_channel<T: Debug + Send>(
307+
&self,
308+
capacity: usize,
309+
) -> (Self::Sender<T>, Self::Receiver<T>) {
310+
let (sender, receiver) = tokio_with_wasm::alias::sync::mpsc::channel(capacity);
311+
(
312+
sender,
313+
tokio_stream::wrappers::ReceiverStream::new(receiver),
314+
)
315+
}
316+
}

0 commit comments

Comments
 (0)