Skip to content

Commit 59d632f

Browse files
authored
feat(wasm): Improve wasm join handle to implement more tokio methods (#5088)
This change adds support for the abort/abort_handle/is_finished methods onto the JoinHandle shim for Wasm targets. Signed-off-by: Daniel Salinas
1 parent c55652d commit 59d632f

File tree

2 files changed

+118
-71
lines changed

2 files changed

+118
-71
lines changed

crates/matrix-sdk-common/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,17 @@ ruma.workspace = true
3434
serde.workspace = true
3535
serde_json.workspace = true
3636
thiserror.workspace = true
37-
tokio = { workspace = true, features = ["rt", "time"] }
3837
tracing.workspace = true
3938
uniffi = { workspace = true, optional = true }
4039

41-
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
40+
[target.'cfg(not(target_family = "wasm"))'.dependencies]
4241
# Enable the test macro.
43-
tokio = { workspace = true, features = ["rt", "macros"] }
42+
tokio = { workspace = true, features = ["rt", "time", "macros"] }
4443

45-
[target.'cfg(target_arch = "wasm32")'.dependencies]
44+
[target.'cfg(target_family = "wasm")'.dependencies]
4645
futures-util = { workspace = true, features = ["channel"] }
4746
gloo-timers = { workspace = true, features = ["futures"] }
47+
tokio = { workspace = true, features = ["macros"] }
4848
tracing-subscriber = { workspace = true, features = ["fmt", "ansi"] }
4949
wasm-bindgen.workspace = true
5050
wasm-bindgen-futures = { version = "0.4.33", optional = true }
@@ -57,7 +57,7 @@ matrix-sdk-test-macros = { path = "../../testing/matrix-sdk-test-macros" }
5757
proptest.workspace = true
5858
wasm-bindgen-test.workspace = true
5959

60-
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
60+
[target.'cfg(target_family = "wasm")'.dev-dependencies]
6161
# Enable the JS feature for getrandom.
6262
getrandom = { workspace = true, default-features = false, features = ["js"] }
6363
js-sys.workspace = true

crates/matrix-sdk-common/src/executor.rs

Lines changed: 113 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -12,88 +12,135 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
//! Abstraction over an executor so we can spawn tasks under WASM the same way
15+
//! Abstraction over an executor so we can spawn tasks under Wasm the same way
1616
//! we do usually.
1717
18-
#[cfg(target_arch = "wasm32")]
19-
use std::{
20-
future::Future,
21-
pin::Pin,
22-
task::{Context, Poll},
23-
};
24-
25-
#[cfg(target_arch = "wasm32")]
26-
pub use futures_util::future::Aborted as JoinError;
27-
#[cfg(target_arch = "wasm32")]
28-
use futures_util::{
29-
future::{AbortHandle, Abortable, RemoteHandle},
30-
FutureExt,
31-
};
32-
#[cfg(not(target_arch = "wasm32"))]
33-
pub use tokio::task::{spawn, JoinError, JoinHandle};
34-
35-
#[cfg(target_arch = "wasm32")]
36-
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
37-
where
38-
F: Future<Output = T> + 'static,
39-
{
40-
let (future, remote_handle) = future.remote_handle();
41-
let (abort_handle, abort_registration) = AbortHandle::new_pair();
42-
let future = Abortable::new(future, abort_registration);
43-
44-
wasm_bindgen_futures::spawn_local(async {
45-
// Poll the future, and ignore the result (either it's `Ok(())`, or it's
46-
// `Err(Aborted)`).
47-
let _ = future.await;
48-
});
49-
50-
JoinHandle { remote_handle: Some(remote_handle), abort_handle }
51-
}
18+
//! On non Wasm platforms, this re-exports parts of tokio directly. For Wasm,
19+
//! we provide a single-threaded solution that matches the interface that tokio
20+
//! provides as a drop in replacement.
5221
53-
#[cfg(target_arch = "wasm32")]
54-
#[derive(Debug)]
55-
pub struct JoinHandle<T> {
56-
remote_handle: Option<RemoteHandle<T>>,
57-
abort_handle: AbortHandle,
22+
#[cfg(not(target_family = "wasm"))]
23+
mod sys {
24+
pub use tokio::{
25+
runtime::{Handle, Runtime},
26+
task::{spawn, AbortHandle, JoinError, JoinHandle},
27+
};
5828
}
5929

60-
#[cfg(target_arch = "wasm32")]
61-
impl<T> JoinHandle<T> {
62-
pub fn abort(&self) {
63-
self.abort_handle.abort();
30+
#[cfg(target_family = "wasm")]
31+
mod sys {
32+
use std::{
33+
future::Future,
34+
pin::Pin,
35+
task::{Context, Poll},
36+
};
37+
38+
pub use futures_util::future::AbortHandle;
39+
use futures_util::{
40+
future::{Abortable, RemoteHandle},
41+
FutureExt,
42+
};
43+
44+
/// A Wasm specific version of `tokio::task::JoinError` designed to work
45+
/// in the single-threaded environment available in Wasm environments.
46+
#[derive(Debug)]
47+
pub enum JoinError {
48+
Cancelled,
49+
Panic,
6450
}
6551

66-
pub fn is_finished(&self) -> bool {
67-
self.abort_handle.is_aborted()
52+
impl JoinError {
53+
/// Returns true if the error was caused by the task being cancelled.
54+
///
55+
/// See [the module level docs] for more information on cancellation.
56+
///
57+
/// [the module level docs]: crate::task#cancellation
58+
pub fn is_cancelled(&self) -> bool {
59+
matches!(self, JoinError::Cancelled)
60+
}
6861
}
69-
}
7062

71-
#[cfg(target_arch = "wasm32")]
72-
impl<T> Drop for JoinHandle<T> {
73-
fn drop(&mut self) {
74-
// don't abort the spawned future
75-
if let Some(h) = self.remote_handle.take() {
76-
h.forget();
63+
impl std::fmt::Display for JoinError {
64+
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65+
match &self {
66+
JoinError::Cancelled => write!(fmt, "task was cancelled"),
67+
JoinError::Panic => write!(fmt, "task panicked"),
68+
}
7769
}
7870
}
79-
}
8071

81-
#[cfg(target_arch = "wasm32")]
82-
impl<T: 'static> Future for JoinHandle<T> {
83-
type Output = Result<T, JoinError>;
84-
85-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86-
if self.abort_handle.is_aborted() {
87-
// The future has been aborted. It is not possible to poll it again.
88-
Poll::Ready(Err(JoinError))
89-
} else if let Some(handle) = self.remote_handle.as_mut() {
90-
Pin::new(handle).poll(cx).map(Ok)
91-
} else {
92-
Poll::Ready(Err(JoinError))
72+
/// A Wasm specific version of `tokio::task::JoinHandle` that
73+
/// holds handles to locally executing futures.
74+
#[derive(Debug)]
75+
pub struct JoinHandle<T> {
76+
remote_handle: Option<RemoteHandle<T>>,
77+
abort_handle: AbortHandle,
78+
}
79+
80+
impl<T> JoinHandle<T> {
81+
/// Aborts the spawned future, preventing it from being polled again.
82+
pub fn abort(&self) {
83+
self.abort_handle.abort();
84+
}
85+
86+
/// Returns the handle to the `AbortHandle` that can be used to
87+
/// abort the spawned future.
88+
pub fn abort_handle(&self) -> AbortHandle {
89+
self.abort_handle.clone()
90+
}
91+
92+
/// Returns true if the spawned future has been aborted.
93+
pub fn is_finished(&self) -> bool {
94+
self.abort_handle.is_aborted()
9395
}
9496
}
97+
98+
impl<T> Drop for JoinHandle<T> {
99+
fn drop(&mut self) {
100+
// don't abort the spawned future
101+
if let Some(h) = self.remote_handle.take() {
102+
h.forget();
103+
}
104+
}
105+
}
106+
107+
impl<T: 'static> Future for JoinHandle<T> {
108+
type Output = Result<T, JoinError>;
109+
110+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111+
if self.abort_handle.is_aborted() {
112+
// The future has been aborted. It is not possible to poll it again.
113+
Poll::Ready(Err(JoinError::Cancelled))
114+
} else if let Some(handle) = self.remote_handle.as_mut() {
115+
Pin::new(handle).poll(cx).map(Ok)
116+
} else {
117+
Poll::Ready(Err(JoinError::Panic))
118+
}
119+
}
120+
}
121+
122+
/// A Wasm specific version of `tokio::task::spawn` that utilizes
123+
/// wasm_bindgen_futures to spawn futures on the local executor.
124+
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
125+
where
126+
F: Future<Output = T> + 'static,
127+
{
128+
let (future, remote_handle) = future.remote_handle();
129+
let (abort_handle, abort_registration) = AbortHandle::new_pair();
130+
let future = Abortable::new(future, abort_registration);
131+
132+
wasm_bindgen_futures::spawn_local(async {
133+
// Poll the future, and ignore the result (either it's `Ok(())`, or it's
134+
// `Err(Aborted)`).
135+
let _ = future.await;
136+
});
137+
138+
JoinHandle { remote_handle: Some(remote_handle), abort_handle }
139+
}
95140
}
96141

142+
pub use sys::*;
143+
97144
#[cfg(test)]
98145
mod tests {
99146
use assert_matches::assert_matches;

0 commit comments

Comments
 (0)