Skip to content

Commit fabf596

Browse files
authored
Merge pull request #8 from yoshuawuyts/pch/global_reactor
thread Reactor through with a global
2 parents e19bf33 + f25af48 commit fabf596

File tree

10 files changed

+93
-80
lines changed

10 files changed

+93
-80
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ use wstd::net::TcpListener;
5959
use wstd::runtime::block_on;
6060

6161
fn main() -> io::Result<()> {
62-
block_on(|reactor| async move {
63-
let listener = TcpListener::bind(&reactor, "127.0.0.1:8080").await?;
62+
block_on(async move {
63+
let listener = TcpListener::bind("127.0.0.1:8080").await?;
6464
println!("Listening on {}", listener.local_addr()?);
6565
println!("type `nc localhost 8080` to create a TCP client");
6666

src/http/client.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@ use super::{response::IncomingBody, Body, Request, Response, Result};
66
use crate::runtime::Reactor;
77

88
/// An HTTP client.
9+
// Empty for now, but permits adding support for RequestOptions soon:
910
#[derive(Debug)]
10-
pub struct Client<'a> {
11-
reactor: &'a Reactor,
12-
}
11+
pub struct Client {}
1312

14-
impl<'a> Client<'a> {
13+
impl Client {
1514
/// Create a new instance of `Client`
16-
pub fn new(reactor: &'a Reactor) -> Self {
17-
Self { reactor }
15+
pub fn new() -> Self {
16+
Self {}
1817
}
1918

2019
/// Send an HTTP request.
@@ -27,7 +26,7 @@ impl<'a> Client<'a> {
2726
let res = wasi::http::outgoing_handler::handle(wasi_req, None).unwrap();
2827

2928
// 2. Start sending the request body
30-
io::copy(body, OutputStream::new(&self.reactor, body_stream))
29+
io::copy(body, OutputStream::new(body_stream))
3130
.await
3231
.expect("io::copy broke oh no");
3332

@@ -36,42 +35,38 @@ impl<'a> Client<'a> {
3635
OutgoingBody::finish(wasi_body, trailers).unwrap();
3736

3837
// 4. Receive the response
39-
self.reactor.wait_for(res.subscribe()).await;
38+
Reactor::current().wait_for(res.subscribe()).await;
4039
// NOTE: the first `unwrap` is to ensure readiness, the second `unwrap`
4140
// is to trap if we try and get the response more than once. The final
4241
// `?` is to raise the actual error if there is one.
4342
let res = res.get().unwrap().unwrap()?;
44-
Ok(Response::try_from_incoming_response(
45-
res,
46-
self.reactor.clone(),
47-
)?)
43+
Ok(Response::try_from_incoming_response(res)?)
4844
}
4945
}
5046

51-
struct OutputStream<'a> {
52-
reactor: &'a Reactor,
47+
struct OutputStream {
5348
stream: wasi::http::types::OutputStream,
5449
}
5550

56-
impl<'a> OutputStream<'a> {
57-
fn new(reactor: &'a Reactor, stream: wasi::http::types::OutputStream) -> Self {
58-
Self { reactor, stream }
51+
impl OutputStream {
52+
fn new(stream: wasi::http::types::OutputStream) -> Self {
53+
Self { stream }
5954
}
6055
}
6156

62-
impl<'a> AsyncWrite for OutputStream<'a> {
57+
impl AsyncWrite for OutputStream {
6358
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
6459
let max = self.stream.check_write().unwrap() as usize;
6560
let max = max.min(buf.len());
6661
let buf = &buf[0..max];
6762
self.stream.write(buf).unwrap();
68-
self.reactor.wait_for(self.stream.subscribe()).await;
63+
Reactor::current().wait_for(self.stream.subscribe()).await;
6964
Ok(max)
7065
}
7166

7267
async fn flush(&mut self) -> io::Result<()> {
7368
self.stream.flush().unwrap();
74-
self.reactor.wait_for(self.stream.subscribe()).await;
69+
Reactor::current().wait_for(self.stream.subscribe()).await;
7570
Ok(())
7671
}
7772
}

src/http/response.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ pub struct Response<B: Body> {
4545
// }
4646

4747
impl Response<IncomingBody> {
48-
pub(crate) fn try_from_incoming_response(
49-
incoming: IncomingResponse,
50-
reactor: Reactor,
51-
) -> super::Result<Self> {
48+
pub(crate) fn try_from_incoming_response(incoming: IncomingResponse) -> super::Result<Self> {
5249
let headers: Headers = incoming.headers().into();
5350
let status = incoming.status().into();
5451

@@ -64,7 +61,6 @@ impl Response<IncomingBody> {
6461
let body = IncomingBody {
6562
buf_offset: 0,
6663
buf: None,
67-
reactor,
6864
body_stream,
6965
_incoming_body: incoming_body,
7066
};
@@ -101,7 +97,6 @@ impl<B: Body> Response<B> {
10197
/// An incoming HTTP body
10298
#[derive(Debug)]
10399
pub struct IncomingBody {
104-
reactor: Reactor,
105100
buf: Option<Vec<u8>>,
106101
// How many bytes have we already read from the buf?
107102
buf_offset: usize,
@@ -119,7 +114,7 @@ impl AsyncRead for IncomingBody {
119114
None => {
120115
// Wait for an event to be ready
121116
let pollable = self.body_stream.subscribe();
122-
self.reactor.wait_for(pollable).await;
117+
Reactor::current().wait_for(pollable).await;
123118

124119
// Read the bytes from the body stream
125120
let buf = self.body_stream.read(CHUNK_SIZE).map_err(|err| match err {

src/lib.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
//! use wstd::runtime::block_on;
2020
//!
2121
//! fn main() -> io::Result<()> {
22-
//! block_on(|reactor| async move {
23-
//! let listener = TcpListener::bind(&reactor, "127.0.0.1:8080").await?;
22+
//! block_on(async move {
23+
//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
2424
//! println!("Listening on {}", listener.local_addr()?);
2525
//! println!("type `nc localhost 8080` to create a TCP client");
2626
//!
@@ -56,12 +56,6 @@
5656
//! bytes. And `wstd::runtime` provides access to async runtime primitives.
5757
//! These are unique capabilities provided by WASI 0.2, and because this library
5858
//! is specific to that are exposed from here.
59-
//!
60-
//! Finally, this library does not implicitly thread through a
61-
//! [`Reactor`][runtime::Reactor] handle. Rather than using a `thread_local!`
62-
//! async resource APIs in `wstd` will borrow an instance of `Reactor`. This is
63-
//! a little more verbose, but in turn is a little simpler to implement,
64-
//! maintain, and extend.
6559
6660
pub mod http;
6761
pub mod io;

src/net/tcp_listener.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@ use super::TcpStream;
1111

1212
/// A TCP socket server, listening for connections.
1313
#[derive(Debug)]
14-
pub struct TcpListener<'a> {
14+
pub struct TcpListener {
1515
socket: TcpSocket,
16-
reactor: &'a Reactor,
1716
}
1817

19-
impl<'a> TcpListener<'a> {
18+
impl TcpListener {
2019
/// Creates a new TcpListener which will be bound to the specified address.
2120
///
2221
/// The returned listener is ready for accepting connections.
23-
pub async fn bind(reactor: &'a Reactor, addr: &str) -> io::Result<Self> {
22+
pub async fn bind(addr: &str) -> io::Result<Self> {
2423
let addr: SocketAddr = addr
2524
.parse()
2625
.map_err(|_| io::Error::other("failed to parse string to socket addr"))?;
@@ -41,6 +40,7 @@ impl<'a> TcpListener<'a> {
4140
}
4241
SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpListener`"),
4342
};
43+
let reactor = Reactor::current();
4444

4545
socket
4646
.start_bind(&network, local_address)
@@ -51,7 +51,7 @@ impl<'a> TcpListener<'a> {
5151
socket.start_listen().map_err(to_io_err)?;
5252
reactor.wait_for(socket.subscribe()).await;
5353
socket.finish_listen().map_err(to_io_err)?;
54-
Ok(Self { socket, reactor })
54+
Ok(Self { socket })
5555
}
5656

5757
/// Returns the local socket address of this listener.
@@ -70,15 +70,14 @@ impl<'a> TcpListener<'a> {
7070
/// An iterator that infinitely accepts connections on a TcpListener.
7171
#[derive(Debug)]
7272
pub struct Incoming<'a> {
73-
listener: &'a TcpListener<'a>,
73+
listener: &'a TcpListener,
7474
}
7575

7676
impl<'a> AsyncIterator for Incoming<'a> {
77-
type Item = io::Result<TcpStream<'a>>;
77+
type Item = io::Result<TcpStream>;
7878

7979
async fn next(&mut self) -> Option<Self::Item> {
80-
self.listener
81-
.reactor
80+
Reactor::current()
8281
.wait_for(self.listener.socket.subscribe())
8382
.await;
8483
let (socket, input, output) = match self.listener.socket.accept().map_err(to_io_err) {
@@ -89,7 +88,6 @@ impl<'a> AsyncIterator for Incoming<'a> {
8988
socket,
9089
input,
9190
output,
92-
reactor: self.listener.reactor,
9391
}))
9492
}
9593
}

src/net/tcp_stream.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ use crate::{
1111
};
1212

1313
/// A TCP stream between a local and a remote socket.
14-
pub struct TcpStream<'a> {
15-
pub(super) reactor: &'a Reactor,
14+
pub struct TcpStream {
1615
pub(super) input: InputStream,
1716
pub(super) output: OutputStream,
1817
pub(super) socket: TcpSocket,
1918
}
2019

21-
impl<'a> TcpStream<'a> {
20+
impl TcpStream {
2221
/// Returns the socket address of the remote peer of this TCP connection.
2322
pub fn peer_addr(&self) -> io::Result<String> {
2423
let addr = self
@@ -29,29 +28,29 @@ impl<'a> TcpStream<'a> {
2928
}
3029
}
3130

32-
impl<'a> AsyncRead for TcpStream<'a> {
31+
impl AsyncRead for TcpStream {
3332
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
34-
self.reactor.wait_for(self.input.subscribe()).await;
33+
Reactor::current().wait_for(self.input.subscribe()).await;
3534
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
3635
let bytes_read = slice.len();
3736
buf[..bytes_read].clone_from_slice(&slice);
3837
Ok(bytes_read)
3938
}
4039
}
4140

42-
impl<'a> AsyncRead for &TcpStream<'a> {
41+
impl AsyncRead for &TcpStream {
4342
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
44-
self.reactor.wait_for(self.input.subscribe()).await;
43+
Reactor::current().wait_for(self.input.subscribe()).await;
4544
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
4645
let bytes_read = slice.len();
4746
buf[..bytes_read].clone_from_slice(&slice);
4847
Ok(bytes_read)
4948
}
5049
}
5150

52-
impl<'a> AsyncWrite for TcpStream<'a> {
51+
impl AsyncWrite for TcpStream {
5352
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
54-
self.reactor.wait_for(self.output.subscribe()).await;
53+
Reactor::current().wait_for(self.output.subscribe()).await;
5554
self.output.write(buf).map_err(to_io_err)?;
5655
Ok(buf.len())
5756
}
@@ -61,9 +60,9 @@ impl<'a> AsyncWrite for TcpStream<'a> {
6160
}
6261
}
6362

64-
impl<'a> AsyncWrite for &TcpStream<'a> {
63+
impl AsyncWrite for &TcpStream {
6564
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
66-
self.reactor.wait_for(self.output.subscribe()).await;
65+
Reactor::current().wait_for(self.output.subscribe()).await;
6766
self.output.write(buf).map_err(to_io_err)?;
6867
Ok(buf.len())
6968
}

src/runtime/block_on.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::Reactor;
1+
use super::{Reactor, REACTOR};
22

33
use core::future::Future;
44
use core::pin::pin;
@@ -7,16 +7,19 @@ use core::task::Waker;
77
use core::task::{Context, Poll, RawWaker, RawWakerVTable};
88

99
/// Start the event loop
10-
pub fn block_on<F, Fut>(f: F) -> Fut::Output
10+
pub fn block_on<Fut>(fut: Fut) -> Fut::Output
1111
where
12-
F: FnOnce(Reactor) -> Fut,
1312
Fut: Future,
1413
{
1514
// Construct the reactor
1615
let reactor = Reactor::new();
16+
// Store a copy as a singleton to be used elsewhere:
17+
let prev = REACTOR.replace(Some(reactor.clone()));
18+
if prev.is_some() {
19+
panic!("cannot wstd::runtime::block_on inside an existing block_on!")
20+
}
1721

18-
// Create the future and pin it so it can be polled
19-
let fut = (f)(reactor.clone());
22+
// Pin the future so it can be polled
2023
let mut fut = pin!(fut);
2124

2225
// Create a new context to be passed to the future.
@@ -25,12 +28,15 @@ where
2528

2629
// Either the future completes and we return, or some IO is happening
2730
// and we wait.
28-
loop {
31+
let res = loop {
2932
match fut.as_mut().poll(&mut cx) {
30-
Poll::Ready(res) => return res,
33+
Poll::Ready(res) => break res,
3134
Poll::Pending => reactor.block_until(),
3235
}
33-
}
36+
};
37+
// Clear the singleton
38+
REACTOR.replace(None);
39+
res
3440
}
3541

3642
/// Construct a new no-op waker

src/runtime/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
//! Async event loop support.
22
//!
3-
//! The way to use this is to call [`block_on()`] to obtain an instance of
4-
//! [`Reactor`]. You can then share the reactor in code that needs it to insert
5-
//! instances of
3+
//! The way to use this is to call [`block_on()`]. Inside the future, [`Reactor::current`]
4+
//! will give an instance of the [`Reactor`] running the event loop, which can be
5+
//! to [`Reactor::wait_for`] instances of
66
//! [`wasi::Pollable`](https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html).
77
//! This will automatically wait for the futures to resolve, and call the
88
//! necessary wakers to work.
@@ -16,3 +16,10 @@ mod reactor;
1616

1717
pub use block_on::block_on;
1818
pub use reactor::Reactor;
19+
use std::cell::RefCell;
20+
21+
// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all
22+
// use sites in the background.
23+
std::thread_local! {
24+
pub(crate) static REACTOR: RefCell<Option<Reactor>> = RefCell::new(None);
25+
}

src/runtime/reactor.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use super::polling::{EventKey, Poller};
1+
use super::{
2+
polling::{EventKey, Poller},
3+
REACTOR,
4+
};
25

36
use core::cell::RefCell;
47
use core::future;
@@ -23,6 +26,19 @@ struct InnerReactor {
2326
}
2427

2528
impl Reactor {
29+
/// Return a `Reactor` for the currently running `wstd::runtime::block_on`.
30+
///
31+
/// # Panic
32+
/// This will panic if called outside of `wstd::runtime::block_on`.
33+
pub fn current() -> Self {
34+
REACTOR.with(|r| {
35+
r.borrow()
36+
.as_ref()
37+
.expect("Reactor::current must be called within a wstd runtime")
38+
.clone()
39+
})
40+
}
41+
2642
/// Create a new instance of `Reactor`
2743
pub(crate) fn new() -> Self {
2844
Self {

0 commit comments

Comments
 (0)