Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

Commit 70b10cc

Browse files
authored
Time (#36)
* init time Signed-off-by: Yoshua Wuyts <[email protected]> * unimplemented Signed-off-by: Yoshua Wuyts <[email protected]> * impl native delay Signed-off-by: Yoshua Wuyts <[email protected]> * delay at Signed-off-by: Yoshua Wuyts <[email protected]> * better errs Signed-off-by: Yoshua Wuyts <[email protected]> * it works! Signed-off-by: Yoshua Wuyts <[email protected]> * fmt Signed-off-by: Yoshua Wuyts <[email protected]> * fix checks Signed-off-by: Yoshua Wuyts <[email protected]> * futureext Signed-off-by: Yoshua Wuyts <[email protected]> * time docs Signed-off-by: Yoshua Wuyts <[email protected]> * add debug impls for native Signed-off-by: Yoshua Wuyts <[email protected]> * update names Signed-off-by: Yoshua Wuyts <[email protected]> * add tokio impl Signed-off-by: Yoshua Wuyts <[email protected]> * update desc Signed-off-by: Yoshua Wuyts <[email protected]> * rustfmt + simpler Debugs Signed-off-by: Yoshua Wuyts <[email protected]> * initial docs Signed-off-by: Yoshua Wuyts <[email protected]> * polish interface Signed-off-by: Yoshua Wuyts <[email protected]> * add debug Signed-off-by: Yoshua Wuyts <[email protected]> * better debug Signed-off-by: Yoshua Wuyts <[email protected]> * prelude Signed-off-by: Yoshua Wuyts <[email protected]> * impl ext Signed-off-by: Yoshua Wuyts <[email protected]> * finish ext Signed-off-by: Yoshua Wuyts <[email protected]> * streamext Signed-off-by: Yoshua Wuyts <[email protected]> * cargo fmt Signed-off-by: Yoshua Wuyts <[email protected]> * AsyncReadExt Signed-off-by: Yoshua Wuyts <[email protected]> * finish tokio + better docs Signed-off-by: Yoshua Wuyts <[email protected]> * fix trait name conflicts Signed-off-by: Yoshua Wuyts <[email protected]> * fix all docs Signed-off-by: Yoshua Wuyts <[email protected]> * fix async-read example Signed-off-by: Yoshua Wuyts <[email protected]> * fix prelude Signed-off-by: Yoshua Wuyts <[email protected]> * remove pin-utils Signed-off-by: Yoshua Wuyts <[email protected]> * wrap up loose ends Signed-off-by: Yoshua Wuyts <[email protected]> * fix tests Signed-off-by: Yoshua Wuyts <[email protected]> * update linux install
1 parent a1b7576 commit 70b10cc

File tree

14 files changed

+661
-2
lines changed

14 files changed

+661
-2
lines changed

ci/install-rust.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ steps:
22
# Linux and macOS.
33
- script: |
44
set -e
5-
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUSTUP_TOOLCHAIN
5+
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain none
6+
export PATH=$PATH:$HOME/.cargo/bin
7+
rustup toolchain install $RUSTUP_TOOLCHAIN
8+
rustup default $RUSTUP_TOOLCHAIN
69
echo "##vso[task.setvariable variable=PATH;]$PATH:$HOME/.cargo/bin"
710
env:
811
RUSTUP_TOOLCHAIN: ${{parameters.rust_version}}

runtime-native/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ async-datagram = "2.2.0"
2121
juliex = "0.3.0-alpha.6"
2222
lazy_static = "1.3.0"
2323
romio = "0.3.0-alpha.7"
24+
futures-timer = "0.2.1"
2425

2526
[target.'cfg(target_arch = "wasm32")'.dependencies]
2627
futures01 = { package = "futures", version = "0.1" }

runtime-native/src/not_wasm32.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
use futures::prelude::*;
22
use futures::{future::BoxFuture, task::SpawnError};
3+
use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval};
34
use lazy_static::lazy_static;
45

56
use std::io;
67
use std::net::SocketAddr;
78
use std::pin::Pin;
9+
use std::time::{Duration, Instant};
810

911
mod tcp;
12+
mod time;
1013
mod udp;
1114

1215
use tcp::{TcpListener, TcpStream};
16+
use time::{Delay, Interval};
1317
use udp::UdpSocket;
1418

1519
lazy_static! {
@@ -58,4 +62,19 @@ impl runtime_raw::Runtime for Native {
5862
let romio_socket = romio::UdpSocket::bind(&addr)?;
5963
Ok(Box::pin(UdpSocket { romio_socket }))
6064
}
65+
66+
fn new_delay(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
67+
let async_delay = AsyncDelay::new(dur);
68+
Box::pin(Delay { async_delay })
69+
}
70+
71+
fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
72+
let async_delay = AsyncDelay::new_at(at);
73+
Box::pin(Delay { async_delay })
74+
}
75+
76+
fn new_interval(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
77+
let async_interval = AsyncInterval::new(dur);
78+
Box::pin(Interval { async_interval })
79+
}
6180
}

runtime-native/src/not_wasm32/time.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
use std::time::Instant;
4+
5+
use futures::prelude::*;
6+
use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval};
7+
8+
#[derive(Debug)]
9+
pub(crate) struct Delay {
10+
pub(crate) async_delay: AsyncDelay,
11+
}
12+
13+
impl runtime_raw::Delay for Delay {}
14+
15+
impl Future for Delay {
16+
type Output = Instant;
17+
18+
#[inline]
19+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
20+
futures::ready!(Pin::new(&mut self.async_delay).poll(cx)).unwrap();
21+
Poll::Ready(Instant::now())
22+
}
23+
}
24+
25+
#[derive(Debug)]
26+
pub(crate) struct Interval {
27+
pub(crate) async_interval: AsyncInterval,
28+
}
29+
30+
impl runtime_raw::Interval for Interval {}
31+
32+
impl Stream for Interval {
33+
type Item = Instant;
34+
35+
#[inline]
36+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37+
futures::ready!(Pin::new(&mut self.async_interval).poll_next(cx)).unwrap();
38+
Poll::Ready(Some(Instant::now()))
39+
}
40+
}

runtime-native/src/wasm32.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use futures::{future::BoxFuture, task::SpawnError};
55
use std::io;
66
use std::net::SocketAddr;
77
use std::pin::Pin;
8+
use std::time::{Duration, Instant};
89

910
use wasm_bindgen::prelude::*;
1011
use wasm_bindgen_futures::future_to_promise;
@@ -41,4 +42,16 @@ impl runtime_raw::Runtime for Native {
4142
) -> io::Result<Pin<Box<dyn runtime_raw::UdpSocket>>> {
4243
panic!("Binding UDP sockets is currently not supported in wasm");
4344
}
45+
46+
fn new_delay(&self, _dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
47+
panic!("Timers are currently not supported in wasm");
48+
}
49+
50+
fn new_delay_at(&self, _at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
51+
panic!("Timers are currently not supported in wasm");
52+
}
53+
54+
fn new_interval(&self, _dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
55+
panic!("Timers are currently not supported in wasm");
56+
}
4457
}

runtime-raw/src/lib.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ use std::cell::Cell;
2323
use std::io;
2424
use std::net::SocketAddr;
2525
use std::pin::Pin;
26+
use std::time::{Duration, Instant};
2627

2728
mod tcp;
29+
mod time;
2830
mod udp;
2931

3032
pub use tcp::*;
33+
pub use time::*;
3134
pub use udp::*;
3235

3336
thread_local! {
34-
static RUNTIME: Cell<Option<&'static dyn Runtime>> = Cell::new(None);
37+
static RUNTIME: Cell<Option<&'static dyn Runtime>> = Cell::new(None);
3538
}
3639

3740
/// Get the current runtime.
@@ -95,4 +98,22 @@ pub trait Runtime: Send + Sync + 'static {
9598
/// This method is defined on the `Runtime` trait because defining it on
9699
/// `UdpSocket` would prevent it from being a trait object.
97100
fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result<Pin<Box<dyn UdpSocket>>>;
101+
102+
/// Create a new Future that wakes up after the given duration
103+
///
104+
/// This method is defined on the `Runtime` trait because defining it on
105+
/// `Delay` would prevent it from being a trait object.
106+
fn new_delay(&self, dur: Duration) -> Pin<Box<dyn Delay>>;
107+
108+
/// Create a new Future that wakes up at the given time.
109+
///
110+
/// This method is defined on the `Runtime` trait because defining it on
111+
/// `Delay` would prevent it from being a trait object.
112+
fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn Delay>>;
113+
114+
/// A stream representing notifications at a fixed interval.
115+
///
116+
/// This method is defined on the `Runtime` trait because defining it on
117+
/// `Interval` would prevent it from being a trait object.
118+
fn new_interval(&self, dur: Duration) -> Pin<Box<dyn Interval>>;
98119
}

runtime-raw/src/time.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use std::fmt::Debug;
2+
use std::future::Future;
3+
use std::time::Instant;
4+
5+
use futures::Stream;
6+
7+
/// A future representing the notification that an elapsed duration has occurred.
8+
pub trait Delay: Future<Output = Instant> + Debug + Send {}
9+
10+
/// A stream representing notifications at a fixed interval.
11+
pub trait Interval: Stream<Item = Instant> + Debug + Send {}

runtime-tokio/src/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@ use futures::{
1616
task::SpawnError,
1717
};
1818
use lazy_static::lazy_static;
19+
use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval};
1920

2021
use std::io;
2122
use std::net::SocketAddr;
2223
use std::pin::Pin;
2324
use std::sync::{mpsc, Mutex};
2425
use std::thread;
26+
use std::time::{Duration, Instant};
2527

2628
mod tcp;
29+
mod time;
2730
mod udp;
2831

2932
use tcp::{TcpListener, TcpStream};
33+
use time::{Delay, Interval};
3034
use udp::UdpSocket;
3135

3236
/// The default Tokio runtime.
@@ -78,6 +82,21 @@ impl runtime_raw::Runtime for Tokio {
7882
let tokio_socket = tokio::net::UdpSocket::bind(&addr)?;
7983
Ok(Box::pin(UdpSocket { tokio_socket }))
8084
}
85+
86+
fn new_delay(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
87+
let tokio_delay = TokioDelay::new(Instant::now() + dur);
88+
Box::pin(Delay { tokio_delay })
89+
}
90+
91+
fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
92+
let tokio_delay = TokioDelay::new(at);
93+
Box::pin(Delay { tokio_delay })
94+
}
95+
96+
fn new_interval(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
97+
let tokio_interval = TokioInterval::new(Instant::now(), dur);
98+
Box::pin(Interval { tokio_interval })
99+
}
81100
}
82101

83102
/// The single-threaded Tokio runtime based on `tokio-current-thread`.
@@ -143,4 +162,19 @@ impl runtime_raw::Runtime for TokioCurrentThread {
143162
let tokio_socket = tokio::net::UdpSocket::bind(&addr)?;
144163
Ok(Box::pin(UdpSocket { tokio_socket }))
145164
}
165+
166+
fn new_delay(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
167+
let tokio_delay = TokioDelay::new(Instant::now() + dur);
168+
Box::pin(Delay { tokio_delay })
169+
}
170+
171+
fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
172+
let tokio_delay = TokioDelay::new(at);
173+
Box::pin(Delay { tokio_delay })
174+
}
175+
176+
fn new_interval(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
177+
let tokio_interval = TokioInterval::new(Instant::now(), dur);
178+
Box::pin(Interval { tokio_interval })
179+
}
146180
}

runtime-tokio/src/time.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
use std::time::Instant;
4+
5+
use futures::compat::Compat01As03;
6+
use futures::prelude::*;
7+
use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval};
8+
9+
#[derive(Debug)]
10+
pub(crate) struct Delay {
11+
pub(crate) tokio_delay: TokioDelay,
12+
}
13+
14+
impl runtime_raw::Delay for Delay {}
15+
16+
impl Future for Delay {
17+
type Output = Instant;
18+
19+
#[inline]
20+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
21+
let mut fut = Compat01As03::new(&mut self.tokio_delay);
22+
futures::ready!(Pin::new(&mut fut).poll(cx)).unwrap();
23+
Poll::Ready(Instant::now())
24+
}
25+
}
26+
27+
#[derive(Debug)]
28+
pub(crate) struct Interval {
29+
pub(crate) tokio_interval: TokioInterval,
30+
}
31+
32+
impl runtime_raw::Interval for Interval {}
33+
34+
impl Stream for Interval {
35+
type Item = Instant;
36+
37+
#[inline]
38+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39+
let mut stream = Compat01As03::new(&mut self.tokio_interval);
40+
// https://docs.rs/tokio/0.1.20/tokio/timer/struct.Error.html
41+
futures::ready!(Pin::new(&mut stream).poll_next(cx))
42+
.unwrap()
43+
.unwrap();
44+
Poll::Ready(Some(Instant::now()))
45+
}
46+
}

src/lib.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,29 @@
9696

9797
pub mod net;
9898
pub mod task;
99+
pub mod time;
100+
101+
/// The Runtime Prelude.
102+
///
103+
/// Rust comes with a variety of things in its standard library. However, Runtime and Futures
104+
/// provide new functionality outside of it. We want to make Runtime feel as close to standard Rust
105+
/// as possible. We care deeply about usability.
106+
///
107+
/// The _prelude_ is the list of things we recommend importing into Runtime programs. It's kept as
108+
/// small as possible, and is focused on things, particularly traits.
109+
///
110+
/// To use the `prelude` do:
111+
/// ```
112+
/// use runtime::prelude::*;
113+
/// ```
114+
pub mod prelude {
115+
#[doc(inline)]
116+
pub use super::time::AsyncReadExt as _;
117+
#[doc(inline)]
118+
pub use super::time::FutureExt as _;
119+
#[doc(inline)]
120+
pub use super::time::StreamExt as _;
121+
}
99122

100123
#[doc(inline)]
101124
pub use task::spawn;

0 commit comments

Comments
 (0)