Skip to content

Commit 1c3fc28

Browse files
committed
Setup for web timer
Signed-off-by: John Nunley <[email protected]>
1 parent e46f9ee commit 1c3fc28

File tree

8 files changed

+291
-12
lines changed

8 files changed

+291
-12
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ jobs:
3838
- name: Install Rust
3939
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
4040
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
41+
- run: rustup add target wasm32-unknown-unknown
4142
- run: cargo build --all --all-features --all-targets
4243
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
4344
if: startsWith(matrix.rust, 'nightly')
@@ -50,6 +51,9 @@ jobs:
5051
# if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
5152
# run: cargo check -Z build-std --target=riscv32imc-esp-espidf
5253
- run: cargo test
54+
- uses: taiki-e/install-action@wasm-pack
55+
- run: cargo check --target wasm32-unknown-unknown --all-features --tests
56+
- run: wasm-pack test --node
5357

5458
# Copied from: https://github.com/rust-lang/stacker/pull/19/files
5559
windows_gnu:

Cargo.toml

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ name = "timer"
2323
harness = false
2424

2525
[dependencies]
26-
async-lock = "2.6"
2726
cfg-if = "1"
27+
futures-lite = { version = "1.11.0", default-features = false }
28+
29+
[target.'cfg(not(target_family = "wasm"))'.dependencies]
30+
async-lock = "2.6"
2831
concurrent-queue = "2.2.0"
2932
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
30-
futures-lite = { version = "1.11.0", default-features = false }
3133
parking = "2.0.0"
3234
polling = "3.0.0"
3335
rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] }
@@ -36,15 +38,28 @@ socket2 = { version = "0.5.3", features = ["all"] }
3638
tracing = { version = "0.1.37", default-features = false }
3739
waker-fn = "1.1.0"
3840

41+
[target.'cfg(target_family = "wasm")'.dependencies]
42+
atomic-waker = "1.1.1"
43+
wasm-bindgen = "0.2.87"
44+
web-sys = { version = "0.3.0", features = ["Window"] }
45+
3946
[dev-dependencies]
4047
async-channel = "1"
48+
49+
[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
4150
async-net = "1"
4251
blocking = "1"
4352
criterion = { version = "0.4", default-features = false, features = ["cargo_bench_support"] }
4453
getrandom = "0.2.7"
4554
signal-hook = "0.3"
4655
tempfile = "3"
4756

57+
[target.'cfg(target_family = "wasm")'.dev-dependencies]
58+
spin_on = "0.1.1"
59+
wasm-bindgen-futures = "0.4.37"
60+
wasm-bindgen-test = "0.3.37"
61+
web-time = "0.2.0"
62+
4863
[target.'cfg(target_os = "linux")'.dev-dependencies]
4964
inotify = { version = "0.10.1", default-features = false }
5065
timerfd = "1"

src/lib.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,31 @@
6464
use std::future::Future;
6565
use std::pin::Pin;
6666
use std::task::{Context, Poll};
67-
use std::time::{Duration, Instant};
67+
use std::time::Duration;
6868

69-
use futures_lite::stream::Stream;
69+
#[cfg(not(target_family = "wasm"))]
70+
use std::time::Instant;
7071

71-
use crate::reactor::Reactor;
72+
use futures_lite::stream::Stream;
7273

74+
#[cfg(not(target_family = "wasm"))]
7375
mod driver;
76+
#[cfg(not(target_family = "wasm"))]
7477
mod io;
78+
#[cfg(not(target_family = "wasm"))]
7579
mod reactor;
7680

77-
#[path = "timer/native.rs"]
81+
#[cfg_attr(not(target_family = "wasm"), path = "timer/native.rs")]
82+
#[cfg_attr(target_family = "wasm", path = "timer/web.rs")]
7883
mod timer;
7984

8085
pub mod os;
8186

87+
#[cfg(not(target_family = "wasm"))]
8288
pub use driver::block_on;
89+
#[cfg(not(target_family = "wasm"))]
8390
pub use io::{Async, IoSafe};
91+
#[cfg(not(target_family = "wasm"))]
8492
pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
8593

8694
/// A future or stream that emits timed events.
@@ -197,6 +205,7 @@ impl Timer {
197205
/// Timer::at(when).await;
198206
/// # });
199207
/// ```
208+
#[cfg(not(target_family = "wasm"))]
200209
#[inline]
201210
pub fn at(instant: Instant) -> Timer {
202211
Timer(timer::Timer::at(instant))
@@ -236,6 +245,7 @@ impl Timer {
236245
/// Timer::interval_at(start, period).next().await;
237246
/// # });
238247
/// ```
248+
#[cfg(not(target_family = "wasm"))]
239249
#[inline]
240250
pub fn interval_at(start: Instant, period: Duration) -> Timer {
241251
Timer(timer::Timer::interval_at(start, period))
@@ -325,6 +335,7 @@ impl Timer {
325335
/// t.set_at(when);
326336
/// # });
327337
/// ```
338+
#[cfg(not(target_family = "wasm"))]
328339
#[inline]
329340
pub fn set_at(&mut self, instant: Instant) {
330341
self.0.set_at(instant)
@@ -376,15 +387,20 @@ impl Timer {
376387
/// t.set_interval_at(start, period);
377388
/// # });
378389
/// ```
390+
#[cfg(not(target_family = "wasm"))]
379391
#[inline]
380392
pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
381393
self.0.set_interval_at(start, period)
382394
}
383395
}
384396

385397
impl Future for Timer {
398+
#[cfg(not(target_family = "wasm"))]
386399
type Output = Instant;
387400

401+
#[cfg(target_family = "wasm")]
402+
type Output = ();
403+
388404
#[inline]
389405
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
390406
match self.poll_next(cx) {
@@ -396,8 +412,12 @@ impl Future for Timer {
396412
}
397413

398414
impl Stream for Timer {
415+
#[cfg(not(target_family = "wasm"))]
399416
type Item = Instant;
400417

418+
#[cfg(target_family = "wasm")]
419+
type Item = ();
420+
401421
#[inline]
402422
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
403423
self.0.poll_next(cx)

src/os/unix.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub fn reactor_fd() -> Option<BorrowedFd<'static>> {
6262
not(polling_test_poll_backend),
6363
))] {
6464
use std::os::unix::io::AsFd;
65-
Some(crate::Reactor::get().poller.as_fd())
65+
Some(crate::reactor::Reactor::get().poller.as_fd())
6666
} else {
6767
None
6868
}

src/timer/web.rs

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
//! Timers for web targets.
2+
//!
3+
//! These use the `setTimeout` function on the web to handle timing.
4+
5+
use std::convert::TryInto;
6+
use std::sync::atomic::{AtomicUsize, Ordering};
7+
use std::sync::Arc;
8+
use std::task::{Context, Poll};
9+
use std::time::Duration;
10+
11+
use atomic_waker::AtomicWaker;
12+
use wasm_bindgen::closure::Closure;
13+
use wasm_bindgen::JsCast;
14+
15+
/// A timer for non-Web platforms.
16+
///
17+
/// self registers a timeout in the global reactor, which in turn sets a timeout in the poll call.
18+
#[derive(Debug)]
19+
pub(super) struct Timer {
20+
/// The waker to wake when the timer fires.
21+
waker: Arc<State>,
22+
23+
/// The ongoing timeout or interval.
24+
ongoing_timeout: TimerId,
25+
}
26+
27+
#[derive(Debug)]
28+
struct State {
29+
/// The number of times this timer has been woken.
30+
woken: AtomicUsize,
31+
32+
/// The waker to wake when the timer fires.
33+
waker: AtomicWaker,
34+
}
35+
36+
#[derive(Debug)]
37+
enum TimerId {
38+
NoTimer,
39+
Timeout(i32),
40+
Interval(i32),
41+
}
42+
43+
impl Timer {
44+
/// Create a timer that will never fire.
45+
#[inline]
46+
pub(super) fn never() -> Self {
47+
Self {
48+
waker: Arc::new(State {
49+
woken: AtomicUsize::new(0),
50+
waker: AtomicWaker::new(),
51+
}),
52+
ongoing_timeout: TimerId::NoTimer,
53+
}
54+
}
55+
56+
/// Create a timer that will fire at the given instant.
57+
#[inline]
58+
pub(super) fn after(duration: Duration) -> Timer {
59+
let mut this = Self::never();
60+
this.set_after(duration);
61+
this
62+
}
63+
64+
/// Create a timer that will fire at the given instant.
65+
#[inline]
66+
pub(super) fn interval(period: Duration) -> Timer {
67+
let mut this = Self::never();
68+
this.set_interval(period);
69+
this
70+
}
71+
72+
/// Returns `true` if self timer will fire at some point.
73+
#[inline]
74+
pub(super) fn will_fire(&self) -> bool {
75+
matches!(
76+
self.ongoing_timeout,
77+
TimerId::Timeout(_) | TimerId::Interval(_)
78+
)
79+
}
80+
81+
/// Set the timer to fire after the given duration.
82+
#[inline]
83+
pub(super) fn set_after(&mut self, duration: Duration) {
84+
// Set the timeout.
85+
let id = {
86+
let waker = self.waker.clone();
87+
let closure: Closure<dyn FnMut()> = Closure::wrap(Box::new(move || {
88+
waker.wake();
89+
}));
90+
91+
let result = web_sys::window()
92+
.unwrap()
93+
.set_timeout_with_callback_and_timeout_and_arguments_0(
94+
closure.as_ref().unchecked_ref(),
95+
duration.as_millis().try_into().expect("timeout too long"),
96+
);
97+
98+
match result {
99+
Ok(id) => id,
100+
Err(_) => {
101+
panic!("failed to set timeout")
102+
}
103+
}
104+
};
105+
106+
// Set our ID.
107+
self.ongoing_timeout = TimerId::Timeout(id);
108+
}
109+
110+
/// Set the timer to emit events periodically.
111+
#[inline]
112+
pub(super) fn set_interval(&mut self, period: Duration) {
113+
// Set the timeout.
114+
let id = {
115+
let waker = self.waker.clone();
116+
let closure: Closure<dyn FnMut()> = Closure::wrap(Box::new(move || {
117+
waker.wake();
118+
}));
119+
120+
let result = web_sys::window()
121+
.unwrap()
122+
.set_interval_with_callback_and_timeout_and_arguments_0(
123+
closure.as_ref().unchecked_ref(),
124+
period.as_millis().try_into().expect("timeout too long"),
125+
);
126+
127+
match result {
128+
Ok(id) => id,
129+
Err(_) => {
130+
panic!("failed to set interval")
131+
}
132+
}
133+
};
134+
135+
// Set our ID.
136+
self.ongoing_timeout = TimerId::Interval(id);
137+
}
138+
139+
/// Poll for the next timer event.
140+
#[inline]
141+
pub(super) fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
142+
let mut registered = false;
143+
let mut woken = self.waker.woken.load(Ordering::Acquire);
144+
145+
loop {
146+
if woken > 0 {
147+
// Try to decrement the number of woken events.
148+
if let Err(new_woken) = self.waker.woken.compare_exchange(
149+
woken,
150+
woken - 1,
151+
Ordering::SeqCst,
152+
Ordering::Acquire,
153+
) {
154+
woken = new_woken;
155+
continue;
156+
}
157+
158+
// If we are using a one-shot timer, clear it.
159+
if let TimerId::Timeout(_) = self.ongoing_timeout {
160+
self.clear();
161+
}
162+
163+
return Poll::Ready(Some(()));
164+
}
165+
166+
if !registered {
167+
// Register the waker.
168+
self.waker.waker.register(cx.waker());
169+
registered = true;
170+
} else {
171+
// We've already registered, so we can just return pending.
172+
return Poll::Pending;
173+
}
174+
}
175+
}
176+
177+
/// Clear the current timeout.
178+
fn clear(&mut self) {
179+
match self.ongoing_timeout {
180+
TimerId::NoTimer => {}
181+
TimerId::Timeout(id) => {
182+
web_sys::window().unwrap().clear_timeout_with_handle(id);
183+
}
184+
TimerId::Interval(id) => {
185+
web_sys::window().unwrap().clear_interval_with_handle(id);
186+
}
187+
}
188+
}
189+
}
190+
191+
impl State {
192+
fn wake(&self) {
193+
self.woken.fetch_add(1, Ordering::SeqCst);
194+
self.waker.wake();
195+
}
196+
}
197+
198+
impl Drop for Timer {
199+
fn drop(&mut self) {
200+
self.clear();
201+
}
202+
}

tests/async.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![cfg(not(target_family = "wasm"))]
2+
13
use std::future::Future;
24
use std::io;
35
use std::net::{Shutdown, TcpListener, TcpStream, UdpSocket};

tests/block_on.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![cfg(not(target_family = "wasm"))]
2+
13
use async_io::block_on;
24
use std::{
35
future::Future,

0 commit comments

Comments
 (0)