Skip to content

Commit 9e460ee

Browse files
committed
net: TcpListener uses an AsyncPollable, and TcpStream uses AsyncInput/OutputStream
1 parent b4b3469 commit 9e460ee

File tree

2 files changed

+29
-57
lines changed

2 files changed

+29
-57
lines changed

src/net/tcp_listener.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@ use wasi::sockets::tcp::{ErrorCode, IpAddressFamily, IpSocketAddress, TcpSocket}
33

44
use crate::io;
55
use crate::iter::AsyncIterator;
6-
use crate::runtime::Reactor;
76
use std::io::ErrorKind;
87
use std::net::SocketAddr;
98

109
use super::TcpStream;
10+
use crate::runtime::AsyncPollable;
1111

1212
/// A TCP socket server, listening for connections.
1313
#[derive(Debug)]
1414
pub struct TcpListener {
15+
// Field order matters: must drop this child before parent below
16+
pollable: AsyncPollable,
1517
socket: TcpSocket,
1618
}
1719

@@ -40,18 +42,17 @@ impl TcpListener {
4042
}
4143
SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpListener`"),
4244
};
43-
let reactor = Reactor::current();
44-
4545
socket
4646
.start_bind(&network, local_address)
4747
.map_err(to_io_err)?;
48-
reactor.wait_for(socket.subscribe()).await;
48+
let pollable = AsyncPollable::new(socket.subscribe());
49+
pollable.wait_for().await;
4950
socket.finish_bind().map_err(to_io_err)?;
5051

5152
socket.start_listen().map_err(to_io_err)?;
52-
reactor.wait_for(socket.subscribe()).await;
53+
pollable.wait_for().await;
5354
socket.finish_listen().map_err(to_io_err)?;
54-
Ok(Self { socket })
55+
Ok(Self { pollable, socket })
5556
}
5657

5758
/// Returns the local socket address of this listener.
@@ -77,18 +78,12 @@ impl<'a> AsyncIterator for Incoming<'a> {
7778
type Item = io::Result<TcpStream>;
7879

7980
async fn next(&mut self) -> Option<Self::Item> {
80-
Reactor::current()
81-
.wait_for(self.listener.socket.subscribe())
82-
.await;
81+
self.listener.pollable.wait_for().await;
8382
let (socket, input, output) = match self.listener.socket.accept().map_err(to_io_err) {
8483
Ok(accepted) => accepted,
8584
Err(err) => return Some(Err(err)),
8685
};
87-
Some(Ok(TcpStream {
88-
socket,
89-
input,
90-
output,
91-
}))
86+
Some(Ok(TcpStream::new(input, output, socket)))
9287
}
9388
}
9489

src/net/tcp_stream.rs

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
1-
use std::io::Error;
1+
use std::cell::RefCell;
22

33
use wasi::{
4-
io::streams::StreamError,
5-
sockets::tcp::{InputStream, OutputStream, TcpSocket},
4+
io::streams::{InputStream, OutputStream},
5+
sockets::tcp::TcpSocket,
66
};
77

8-
use crate::{
9-
io::{self, AsyncRead, AsyncWrite},
10-
runtime::Reactor,
11-
};
8+
use crate::io::{self, AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite};
129

1310
/// A TCP stream between a local and a remote socket.
1411
pub struct TcpStream {
15-
pub(super) input: InputStream,
16-
pub(super) output: OutputStream,
17-
pub(super) socket: TcpSocket,
12+
input: RefCell<AsyncInputStream>,
13+
output: RefCell<AsyncOutputStream>,
14+
socket: TcpSocket,
1815
}
1916

2017
impl TcpStream {
18+
pub(crate) fn new(input: InputStream, output: OutputStream, socket: TcpSocket) -> Self {
19+
TcpStream {
20+
input: RefCell::new(AsyncInputStream::new(input)),
21+
output: RefCell::new(AsyncOutputStream::new(output)),
22+
socket,
23+
}
24+
}
2125
/// Returns the socket address of the remote peer of this TCP connection.
2226
pub fn peer_addr(&self) -> io::Result<String> {
2327
let addr = self
@@ -40,53 +44,33 @@ impl Drop for TcpStream {
4044

4145
impl AsyncRead for TcpStream {
4246
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
43-
Reactor::current().wait_for(self.input.subscribe()).await;
44-
let slice = match self.input.read(buf.len() as u64) {
45-
Ok(slice) => slice,
46-
Err(StreamError::Closed) => return Ok(0),
47-
Err(e) => return Err(to_io_err(e)),
48-
};
49-
let bytes_read = slice.len();
50-
buf[..bytes_read].clone_from_slice(&slice);
51-
Ok(bytes_read)
47+
self.input.borrow_mut().read(buf).await
5248
}
5349
}
5450

5551
impl AsyncRead for &TcpStream {
5652
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
57-
Reactor::current().wait_for(self.input.subscribe()).await;
58-
let slice = match self.input.read(buf.len() as u64) {
59-
Ok(slice) => slice,
60-
Err(StreamError::Closed) => return Ok(0),
61-
Err(e) => return Err(to_io_err(e)),
62-
};
63-
let bytes_read = slice.len();
64-
buf[..bytes_read].clone_from_slice(&slice);
65-
Ok(bytes_read)
53+
self.input.borrow_mut().read(buf).await
6654
}
6755
}
6856

6957
impl AsyncWrite for TcpStream {
7058
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
71-
Reactor::current().wait_for(self.output.subscribe()).await;
72-
self.output.write(buf).map_err(to_io_err)?;
73-
Ok(buf.len())
59+
self.output.borrow_mut().write(buf).await
7460
}
7561

7662
async fn flush(&mut self) -> io::Result<()> {
77-
self.output.flush().map_err(to_io_err)
63+
self.output.borrow_mut().flush().await
7864
}
7965
}
8066

8167
impl AsyncWrite for &TcpStream {
8268
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83-
Reactor::current().wait_for(self.output.subscribe()).await;
84-
self.output.write(buf).map_err(to_io_err)?;
85-
Ok(buf.len())
69+
self.output.borrow_mut().write(buf).await
8670
}
8771

8872
async fn flush(&mut self) -> io::Result<()> {
89-
self.output.flush().map_err(to_io_err)
73+
self.output.borrow_mut().flush().await
9074
}
9175
}
9276

@@ -125,10 +109,3 @@ impl<'a> Drop for WriteHalf<'a> {
125109
.shutdown(wasi::sockets::tcp::ShutdownType::Send);
126110
}
127111
}
128-
129-
fn to_io_err(err: StreamError) -> std::io::Error {
130-
match err {
131-
StreamError::LastOperationFailed(err) => Error::other(err.to_debug_string()),
132-
StreamError::Closed => Error::other("Stream was closed"),
133-
}
134-
}

0 commit comments

Comments
 (0)