Skip to content

Commit d7d68f8

Browse files
committed
library: don't pass around a Reactor, just get Reactor::current() when needed
1 parent 398cfee commit d7d68f8

File tree

6 files changed

+49
-66
lines changed

6 files changed

+49
-66
lines changed

src/http/client.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,12 @@ use crate::runtime::Reactor;
77

88
/// An HTTP client.
99
#[derive(Debug)]
10-
pub struct Client<'a> {
11-
reactor: &'a Reactor,
12-
}
10+
pub struct Client {}
1311

14-
impl<'a> Client<'a> {
12+
impl Client {
1513
/// Create a new instance of `Client`
16-
pub fn new(reactor: &'a Reactor) -> Self {
17-
Self { reactor }
14+
pub fn new() -> Self {
15+
Self {}
1816
}
1917

2018
/// Send an HTTP request.
@@ -27,7 +25,7 @@ impl<'a> Client<'a> {
2725
let res = wasi::http::outgoing_handler::handle(wasi_req, None).unwrap();
2826

2927
// 2. Start sending the request body
30-
io::copy(body, OutputStream::new(&self.reactor, body_stream))
28+
io::copy(body, OutputStream::new(body_stream))
3129
.await
3230
.expect("io::copy broke oh no");
3331

@@ -36,42 +34,38 @@ impl<'a> Client<'a> {
3634
OutgoingBody::finish(wasi_body, trailers).unwrap();
3735

3836
// 4. Receive the response
39-
self.reactor.wait_for(res.subscribe()).await;
37+
Reactor::current().wait_for(res.subscribe()).await;
4038
// NOTE: the first `unwrap` is to ensure readiness, the second `unwrap`
4139
// is to trap if we try and get the response more than once. The final
4240
// `?` is to raise the actual error if there is one.
4341
let res = res.get().unwrap().unwrap()?;
44-
Ok(Response::try_from_incoming_response(
45-
res,
46-
self.reactor.clone(),
47-
)?)
42+
Ok(Response::try_from_incoming_response(res)?)
4843
}
4944
}
5045

51-
struct OutputStream<'a> {
52-
reactor: &'a Reactor,
46+
struct OutputStream {
5347
stream: wasi::http::types::OutputStream,
5448
}
5549

56-
impl<'a> OutputStream<'a> {
57-
fn new(reactor: &'a Reactor, stream: wasi::http::types::OutputStream) -> Self {
58-
Self { reactor, stream }
50+
impl OutputStream {
51+
fn new(stream: wasi::http::types::OutputStream) -> Self {
52+
Self { stream }
5953
}
6054
}
6155

62-
impl<'a> AsyncWrite for OutputStream<'a> {
56+
impl AsyncWrite for OutputStream {
6357
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
6458
let max = self.stream.check_write().unwrap() as usize;
6559
let max = max.min(buf.len());
6660
let buf = &buf[0..max];
6761
self.stream.write(buf).unwrap();
68-
self.reactor.wait_for(self.stream.subscribe()).await;
62+
Reactor::current().wait_for(self.stream.subscribe()).await;
6963
Ok(max)
7064
}
7165

7266
async fn flush(&mut self) -> io::Result<()> {
7367
self.stream.flush().unwrap();
74-
self.reactor.wait_for(self.stream.subscribe()).await;
68+
Reactor::current().wait_for(self.stream.subscribe()).await;
7569
Ok(())
7670
}
7771
}

src/http/response.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ pub struct Response<B: Body> {
4545
// }
4646

4747
impl Response<IncomingBody> {
48-
pub(crate) fn try_from_incoming_response(
49-
incoming: IncomingResponse,
50-
reactor: Reactor,
51-
) -> super::Result<Self> {
48+
pub(crate) fn try_from_incoming_response(incoming: IncomingResponse) -> super::Result<Self> {
5249
let headers: Headers = incoming.headers().into();
5350
let status = incoming.status().into();
5451

@@ -64,7 +61,6 @@ impl Response<IncomingBody> {
6461
let body = IncomingBody {
6562
buf_offset: 0,
6663
buf: None,
67-
reactor,
6864
body_stream,
6965
_incoming_body: incoming_body,
7066
};
@@ -101,7 +97,6 @@ impl<B: Body> Response<B> {
10197
/// An incoming HTTP body
10298
#[derive(Debug)]
10399
pub struct IncomingBody {
104-
reactor: Reactor,
105100
buf: Option<Vec<u8>>,
106101
// How many bytes have we already read from the buf?
107102
buf_offset: usize,
@@ -119,7 +114,7 @@ impl AsyncRead for IncomingBody {
119114
None => {
120115
// Wait for an event to be ready
121116
let pollable = self.body_stream.subscribe();
122-
self.reactor.wait_for(pollable).await;
117+
Reactor::current().wait_for(pollable).await;
123118

124119
// Read the bytes from the body stream
125120
let buf = self.body_stream.read(CHUNK_SIZE).map_err(|err| match err {

src/lib.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
//! use wstd::runtime::block_on;
2020
//!
2121
//! fn main() -> io::Result<()> {
22-
//! block_on(|reactor| async move {
23-
//! let listener = TcpListener::bind(&reactor, "127.0.0.1:8080").await?;
22+
//! block_on(async move {
23+
//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
2424
//! println!("Listening on {}", listener.local_addr()?);
2525
//! println!("type `nc localhost 8080` to create a TCP client");
2626
//!
@@ -56,12 +56,6 @@
5656
//! bytes. And `wstd::runtime` provides access to async runtime primitives.
5757
//! These are unique capabilities provided by WASI 0.2, and because this library
5858
//! is specific to that are exposed from here.
59-
//!
60-
//! Finally, this library does not implicitly thread through a
61-
//! [`Reactor`][runtime::Reactor] handle. Rather than using a `thread_local!`
62-
//! async resource APIs in `wstd` will borrow an instance of `Reactor`. This is
63-
//! a little more verbose, but in turn is a little simpler to implement,
64-
//! maintain, and extend.
6559
6660
pub mod http;
6761
pub mod io;

src/net/tcp_listener.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@ use super::TcpStream;
1111

1212
/// A TCP socket server, listening for connections.
1313
#[derive(Debug)]
14-
pub struct TcpListener<'a> {
14+
pub struct TcpListener {
1515
socket: TcpSocket,
16-
reactor: &'a Reactor,
1716
}
1817

19-
impl<'a> TcpListener<'a> {
18+
impl TcpListener {
2019
/// Creates a new TcpListener which will be bound to the specified address.
2120
///
2221
/// The returned listener is ready for accepting connections.
23-
pub async fn bind(reactor: &'a Reactor, addr: &str) -> io::Result<Self> {
22+
pub async fn bind(addr: &str) -> io::Result<Self> {
2423
let addr: SocketAddr = addr
2524
.parse()
2625
.map_err(|_| io::Error::other("failed to parse string to socket addr"))?;
@@ -41,6 +40,7 @@ impl<'a> TcpListener<'a> {
4140
}
4241
SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpListener`"),
4342
};
43+
let reactor = Reactor::current();
4444

4545
socket
4646
.start_bind(&network, local_address)
@@ -51,7 +51,7 @@ impl<'a> TcpListener<'a> {
5151
socket.start_listen().map_err(to_io_err)?;
5252
reactor.wait_for(socket.subscribe()).await;
5353
socket.finish_listen().map_err(to_io_err)?;
54-
Ok(Self { socket, reactor })
54+
Ok(Self { socket })
5555
}
5656

5757
/// Returns the local socket address of this listener.
@@ -70,15 +70,14 @@ impl<'a> TcpListener<'a> {
7070
/// An iterator that infinitely accepts connections on a TcpListener.
7171
#[derive(Debug)]
7272
pub struct Incoming<'a> {
73-
listener: &'a TcpListener<'a>,
73+
listener: &'a TcpListener,
7474
}
7575

7676
impl<'a> AsyncIterator for Incoming<'a> {
77-
type Item = io::Result<TcpStream<'a>>;
77+
type Item = io::Result<TcpStream>;
7878

7979
async fn next(&mut self) -> Option<Self::Item> {
80-
self.listener
81-
.reactor
80+
Reactor::current()
8281
.wait_for(self.listener.socket.subscribe())
8382
.await;
8483
let (socket, input, output) = match self.listener.socket.accept().map_err(to_io_err) {
@@ -89,7 +88,6 @@ impl<'a> AsyncIterator for Incoming<'a> {
8988
socket,
9089
input,
9190
output,
92-
reactor: self.listener.reactor,
9391
}))
9492
}
9593
}

src/net/tcp_stream.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ use crate::{
1111
};
1212

1313
/// A TCP stream between a local and a remote socket.
14-
pub struct TcpStream<'a> {
15-
pub(super) reactor: &'a Reactor,
14+
pub struct TcpStream {
1615
pub(super) input: InputStream,
1716
pub(super) output: OutputStream,
1817
pub(super) socket: TcpSocket,
1918
}
2019

21-
impl<'a> TcpStream<'a> {
20+
impl TcpStream {
2221
/// Returns the socket address of the remote peer of this TCP connection.
2322
pub fn peer_addr(&self) -> io::Result<String> {
2423
let addr = self
@@ -29,29 +28,29 @@ impl<'a> TcpStream<'a> {
2928
}
3029
}
3130

32-
impl<'a> AsyncRead for TcpStream<'a> {
31+
impl AsyncRead for TcpStream {
3332
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
34-
self.reactor.wait_for(self.input.subscribe()).await;
33+
Reactor::current().wait_for(self.input.subscribe()).await;
3534
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
3635
let bytes_read = slice.len();
3736
buf[..bytes_read].clone_from_slice(&slice);
3837
Ok(bytes_read)
3938
}
4039
}
4140

42-
impl<'a> AsyncRead for &TcpStream<'a> {
41+
impl AsyncRead for &TcpStream {
4342
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
44-
self.reactor.wait_for(self.input.subscribe()).await;
43+
Reactor::current().wait_for(self.input.subscribe()).await;
4544
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
4645
let bytes_read = slice.len();
4746
buf[..bytes_read].clone_from_slice(&slice);
4847
Ok(bytes_read)
4948
}
5049
}
5150

52-
impl<'a> AsyncWrite for TcpStream<'a> {
51+
impl AsyncWrite for TcpStream {
5352
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
54-
self.reactor.wait_for(self.output.subscribe()).await;
53+
Reactor::current().wait_for(self.output.subscribe()).await;
5554
self.output.write(buf).map_err(to_io_err)?;
5655
Ok(buf.len())
5756
}
@@ -61,9 +60,9 @@ impl<'a> AsyncWrite for TcpStream<'a> {
6160
}
6261
}
6362

64-
impl<'a> AsyncWrite for &TcpStream<'a> {
63+
impl AsyncWrite for &TcpStream {
6564
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
66-
self.reactor.wait_for(self.output.subscribe()).await;
65+
Reactor::current().wait_for(self.output.subscribe()).await;
6766
self.output.write(buf).map_err(to_io_err)?;
6867
Ok(buf.len())
6968
}

src/time/mod.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,32 +36,35 @@ impl SystemTime {
3636
}
3737

3838
/// An async iterator representing notifications at fixed interval.
39-
pub fn interval(reactor: &Reactor, duration: Duration) -> Interval {
40-
Interval { reactor, duration }
39+
pub fn interval(duration: Duration) -> Interval {
40+
Interval { duration }
4141
}
4242

4343
/// An async iterator representing notifications at fixed interval.
4444
///
4545
/// See the [`interval`] function for more.
46-
pub struct Interval<'a> {
46+
pub struct Interval {
4747
duration: Duration,
48-
reactor: &'a Reactor,
4948
}
50-
impl<'a> AsyncIterator for Interval<'a> {
49+
impl AsyncIterator for Interval {
5150
type Item = Instant;
5251

5352
async fn next(&mut self) -> Option<Self::Item> {
54-
wait_for(self.reactor, self.duration).await;
53+
wait_for(self.duration).await;
5554
Some(Instant(wasi::clocks::monotonic_clock::now()))
5655
}
5756
}
5857

5958
/// Wait until the passed duration has elapsed.
60-
pub async fn wait_for(reactor: &Reactor, duration: Duration) {
61-
reactor.wait_for(subscribe_duration(duration.0)).await;
59+
pub async fn wait_for(duration: Duration) {
60+
Reactor::current()
61+
.wait_for(subscribe_duration(duration.0))
62+
.await;
6263
}
6364

6465
/// Wait until the passed instant.
65-
pub async fn wait_until(reactor: &Reactor, deadline: Instant) {
66-
reactor.wait_for(subscribe_instant(deadline.0)).await;
66+
pub async fn wait_until(deadline: Instant) {
67+
Reactor::current()
68+
.wait_for(subscribe_instant(deadline.0))
69+
.await;
6770
}

0 commit comments

Comments
 (0)