Skip to content

Commit 8158a83

Browse files
authored
add tasks to runtime (#86)
* chore: drop dep on futures-core, ready! macro is in std * feat: event loop is now based on async-task * feat: tcp-echo-server uses tasks, test runner uses wasmtime cli, tcp-listener supports v6 The tcp-echo-server example has been rewritten to spawn the echo part of each accept into a task, which means it can accept new connections while other echos are in flight. The tcp-echo-server test has been rewritten to test that connections can be accepted while other echoes are in flight. The tcp-echo-server test has been rewritten to use wasmtime cli as a process, rather than use wasmtime as a crate. This drops wasmtime from the dev-dependencies of the workspace, which is good because it was running a quite out-of-date wasmtime. Fix up the missing conversions to/from std::net::SocketAddr in wstd::net::tcp_listener, so that we can send a Display impl of the listening address from the guest to host, and parse it out in the host (see get_listening_address) * README: no longer 100% safe rust * switch to #![deny(unsafe_code)] at lib level * refactor block on loop
1 parent c6086b6 commit 8158a83

File tree

11 files changed

+280
-217
lines changed

11 files changed

+280
-217
lines changed

Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ default = ["json"]
1717
json = ["dep:serde", "dep:serde_json"]
1818

1919
[dependencies]
20-
futures-core.workspace = true
20+
async-task.workspace = true
2121
http.workspace = true
2222
itoa.workspace = true
2323
pin-project-lite.workspace = true
@@ -63,6 +63,7 @@ authors = [
6363

6464
[workspace.dependencies]
6565
anyhow = "1"
66+
async-task = "4.7"
6667
cargo_metadata = "0.22"
6768
clap = { version = "4.5.26", features = ["derive"] }
6869
futures-core = "0.3.19"
@@ -83,9 +84,6 @@ test-programs = { path = "test-programs" }
8384
test-programs-artifacts = { path = "test-programs/artifacts" }
8485
ureq = { version = "2.12.1", default-features = false }
8586
wasi = "0.14.0"
86-
wasmtime = "26"
87-
wasmtime-wasi = "26"
88-
wasmtime-wasi-http = "26"
8987
wstd = { path = "." }
9088
wstd-macro = { path = "macro", version = "=0.5.4" }
9189

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,9 @@ $ cargo add wstd
8585
```
8686

8787
## Safety
88-
This crate uses ``#![forbid(unsafe_code)]`` to ensure everything is implemented in
89-
100% Safe Rust.
88+
This crate uses ``#![deny(unsafe_code)]``, and in the very small number of
89+
exceptional cases where ``#[allow(unsafe_code)]`` is required, documentation
90+
is provided justifying its use.
9091

9192
## Contributing
9293
Want to join us? Check out our ["Contributing" guide][contributing] and take a

examples/tcp_echo_server.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ async fn main() -> io::Result<()> {
1212
while let Some(stream) = incoming.next().await {
1313
let stream = stream?;
1414
println!("Accepted from: {}", stream.peer_addr()?);
15-
io::copy(&stream, &stream).await?;
15+
wstd::runtime::spawn(async move {
16+
// If echo copy fails, we can ignore it.
17+
let _ = io::copy(&stream, &stream).await;
18+
})
19+
.detach();
1620
}
1721
Ok(())
1822
}

src/future/delay.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use futures_core::ready;
21
use std::future::Future;
32
use std::pin::Pin;
4-
use std::task::{Context, Poll};
3+
use std::task::{ready, Context, Poll};
54

65
use pin_project_lite::pin_project;
76

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![allow(async_fn_in_trait)]
22
#![warn(future_incompatible, unreachable_pub)]
3-
#![forbid(unsafe_code)]
3+
#![deny(unsafe_code)]
44
//#![deny(missing_debug_implementations)]
55
//#![warn(missing_docs)]
66
//#![forbid(rustdoc::missing_doc_code_examples)]

src/net/tcp_listener.rs

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,8 @@ impl TcpListener {
3333
wasi::sockets::tcp_create_socket::create_tcp_socket(family).map_err(to_io_err)?;
3434
let network = wasi::sockets::instance_network::instance_network();
3535

36-
let local_address = match addr {
37-
SocketAddr::V4(addr) => {
38-
let ip = addr.ip().octets();
39-
let address = (ip[0], ip[1], ip[2], ip[3]);
40-
let port = addr.port();
41-
IpSocketAddress::Ipv4(Ipv4SocketAddress { port, address })
42-
}
43-
SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpListener`"),
44-
};
36+
let local_address = sockaddr_to_wasi(addr);
37+
4538
socket
4639
.start_bind(&network, local_address)
4740
.map_err(to_io_err)?;
@@ -56,10 +49,11 @@ impl TcpListener {
5649
}
5750

5851
/// Returns the local socket address of this listener.
59-
// TODO: make this return an actual socket addr
60-
pub fn local_addr(&self) -> io::Result<String> {
61-
let addr = self.socket.local_address().map_err(to_io_err)?;
62-
Ok(format!("{addr:?}"))
52+
pub fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
53+
self.socket
54+
.local_address()
55+
.map_err(to_io_err)
56+
.map(sockaddr_from_wasi)
6357
}
6458

6559
/// Returns an iterator over the connections being received on this listener.
@@ -105,3 +99,51 @@ pub(super) fn to_io_err(err: ErrorCode) -> io::Error {
10599
_ => ErrorKind::Other.into(),
106100
}
107101
}
102+
103+
fn sockaddr_from_wasi(addr: IpSocketAddress) -> std::net::SocketAddr {
104+
use wasi::sockets::network::Ipv6SocketAddress;
105+
match addr {
106+
IpSocketAddress::Ipv4(Ipv4SocketAddress { address, port }) => {
107+
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(
108+
std::net::Ipv4Addr::new(address.0, address.1, address.2, address.3),
109+
port,
110+
))
111+
}
112+
IpSocketAddress::Ipv6(Ipv6SocketAddress {
113+
address,
114+
port,
115+
flow_info,
116+
scope_id,
117+
}) => std::net::SocketAddr::V6(std::net::SocketAddrV6::new(
118+
std::net::Ipv6Addr::new(
119+
address.0, address.1, address.2, address.3, address.4, address.5, address.6,
120+
address.7,
121+
),
122+
port,
123+
flow_info,
124+
scope_id,
125+
)),
126+
}
127+
}
128+
129+
fn sockaddr_to_wasi(addr: std::net::SocketAddr) -> IpSocketAddress {
130+
use wasi::sockets::network::Ipv6SocketAddress;
131+
match addr {
132+
std::net::SocketAddr::V4(addr) => {
133+
let ip = addr.ip().octets();
134+
IpSocketAddress::Ipv4(Ipv4SocketAddress {
135+
address: (ip[0], ip[1], ip[2], ip[3]),
136+
port: addr.port(),
137+
})
138+
}
139+
std::net::SocketAddr::V6(addr) => {
140+
let ip = addr.ip().segments();
141+
IpSocketAddress::Ipv6(Ipv6SocketAddress {
142+
address: (ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]),
143+
port: addr.port(),
144+
flow_info: addr.flowinfo(),
145+
scope_id: addr.scope_id(),
146+
})
147+
}
148+
}
149+
}

src/runtime/block_on.rs

Lines changed: 35 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ use super::{Reactor, REACTOR};
22

33
use std::future::Future;
44
use std::pin::pin;
5-
use std::sync::atomic::{AtomicBool, Ordering};
6-
use std::sync::Arc;
7-
use std::task::{Context, Poll, Wake, Waker};
5+
use std::task::{Context, Poll, Waker};
86

9-
/// Start the event loop
10-
pub fn block_on<Fut>(fut: Fut) -> Fut::Output
7+
/// Start the event loop. Blocks until the future
8+
pub fn block_on<F>(fut: F) -> F::Output
119
where
12-
Fut: Future,
10+
F: Future + 'static,
11+
F::Output: 'static,
1312
{
1413
// Construct the reactor
1514
let reactor = Reactor::new();
@@ -19,67 +18,44 @@ where
1918
panic!("cannot wstd::runtime::block_on inside an existing block_on!")
2019
}
2120

22-
// Pin the future so it can be polled
23-
let mut fut = pin!(fut);
21+
// Spawn the task onto the reactor.
22+
let root_task = reactor.spawn(fut);
2423

25-
// Create a new context to be passed to the future.
26-
let root = Arc::new(RootWaker::new());
27-
let waker = Waker::from(root.clone());
28-
let mut cx = Context::from_waker(&waker);
24+
loop {
25+
match reactor.pop_ready_list() {
26+
// No more work is possible - only a pending pollable could
27+
// possibly create a runnable, and there are none.
28+
None if reactor.pending_pollables_is_empty() => break,
29+
// Block until a pending pollable puts something on the ready
30+
// list.
31+
None => reactor.block_on_pollables(),
32+
Some(runnable) => {
33+
// Run the task popped from the head of the ready list. If the
34+
// task re-inserts itself onto the runlist during execution,
35+
// last_run_awake is a hint that guarantees us the runlist is
36+
// nonempty.
37+
let last_run_awake = runnable.run();
2938

30-
// Either the future completes and we return, or some IO is happening
31-
// and we wait.
32-
let res = loop {
33-
match fut.as_mut().poll(&mut cx) {
34-
Poll::Ready(res) => break res,
35-
Poll::Pending => {
36-
// If some non-pollable based future has marked the root task
37-
// as awake, reset and poll again. otherwise, block until a
38-
// pollable wakes a future.
39-
if root.is_awake() {
39+
// If any task is ready for running, we perform a nonblocking
40+
// check of pollables, giving any tasks waiting on a pollable
41+
// a chance to wake.
42+
if last_run_awake || !reactor.ready_list_is_empty() {
4043
reactor.nonblock_check_pollables();
41-
root.reset()
42-
} else {
43-
// If there are no futures awake or waiting on a WASI
44-
// pollable, its impossible for the reactor to make
45-
// progress, and the only valid behaviors are to sleep
46-
// forever or panic. This should only be reachable if the
47-
// user's Futures are implemented incorrectly.
48-
if !reactor.nonempty_pending_pollables() {
49-
panic!("reactor has no futures which are awake, or are waiting on a WASI pollable to be ready")
50-
}
51-
reactor.block_on_pollables()
5244
}
5345
}
5446
}
55-
};
47+
}
5648
// Clear the singleton
5749
REACTOR.replace(None);
58-
res
59-
}
60-
61-
/// This waker is used in the Context of block_on. If a Future executing in
62-
/// the block_on calls context.wake(), it sets this boolean state so that
63-
/// block_on's Future is polled again immediately, rather than waiting for
64-
/// an external (WASI pollable) event before polling again.
65-
struct RootWaker {
66-
wake: AtomicBool,
67-
}
68-
impl RootWaker {
69-
fn new() -> Self {
70-
Self {
71-
wake: AtomicBool::new(false),
50+
// Get the result out of the root task
51+
let mut root_task = pin!(root_task);
52+
let mut noop_context = Context::from_waker(Waker::noop());
53+
match root_task.as_mut().poll(&mut noop_context) {
54+
Poll::Ready(res) => res,
55+
Poll::Pending => {
56+
unreachable!(
57+
"ready list empty, therefore root task should be ready. malformed root task?"
58+
)
7259
}
7360
}
74-
fn is_awake(&self) -> bool {
75-
self.wake.load(Ordering::Relaxed)
76-
}
77-
fn reset(&self) {
78-
self.wake.store(false, Ordering::Relaxed);
79-
}
80-
}
81-
impl Wake for RootWaker {
82-
fn wake(self: Arc<Self>) {
83-
self.wake.store(true, Ordering::Relaxed);
84-
}
8561
}

src/runtime/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
mod block_on;
1414
mod reactor;
1515

16+
pub use ::async_task::Task;
1617
pub use block_on::block_on;
1718
pub use reactor::{AsyncPollable, Reactor, WaitFor};
1819
use std::cell::RefCell;
@@ -22,3 +23,14 @@ use std::cell::RefCell;
2223
std::thread_local! {
2324
pub(crate) static REACTOR: RefCell<Option<Reactor>> = const { RefCell::new(None) };
2425
}
26+
27+
/// Spawn a `Future` as a `Task` on the current `Reactor`.
28+
///
29+
/// Panics if called from outside `block_on`.
30+
pub fn spawn<F, T>(fut: F) -> Task<T>
31+
where
32+
F: std::future::Future<Output = T> + 'static,
33+
T: 'static,
34+
{
35+
Reactor::current().spawn(fut)
36+
}

0 commit comments

Comments
 (0)