Skip to content

Commit c276fdb

Browse files
authored
Merge pull request #50 from sunfishcode/sunfishcode/optimize-copy
Optimize copy.
2 parents e23cdf2 + fbcc2e9 commit c276fdb

File tree

7 files changed

+141
-35
lines changed

7 files changed

+141
-35
lines changed

src/http/body.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ impl AsyncRead for IncomingBody {
139139
async fn read(&mut self, out_buf: &mut [u8]) -> crate::io::Result<usize> {
140140
self.body_stream.read(out_buf).await
141141
}
142+
143+
fn as_async_input_stream(&self) -> Option<&AsyncInputStream> {
144+
Some(&self.body_stream)
145+
}
142146
}
143147

144148
impl Body for IncomingBody {

src/io/copy.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,29 @@
1-
use crate::io::{AsyncRead, AsyncWrite};
1+
use crate::io::{AsyncRead, AsyncWrite, Error};
2+
use wasi::io::streams::StreamError;
23

34
/// Copy bytes from a reader to a writer.
45
pub async fn copy<R, W>(mut reader: R, mut writer: W) -> crate::io::Result<()>
56
where
67
R: AsyncRead,
78
W: AsyncWrite,
89
{
10+
// Optimized path when we have an `AsyncInputStream` and an
11+
// `AsyncOutputStream`.
12+
if let Some(reader) = reader.as_async_input_stream() {
13+
if let Some(writer) = writer.as_async_output_stream() {
14+
loop {
15+
match super::splice(reader, writer, u64::MAX).await {
16+
Ok(_n) => (),
17+
Err(StreamError::Closed) => return Ok(()),
18+
Err(StreamError::LastOperationFailed(err)) => {
19+
return Err(Error::other(err.to_debug_string()));
20+
}
21+
}
22+
}
23+
}
24+
}
25+
26+
// Unoptimized case: read the input and then write it.
927
let mut buf = [0; 1024];
1028
'read: loop {
1129
let bytes_read = reader.read(&mut buf).await?;

src/io/read.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,28 @@ pub trait AsyncRead {
2424
n += len;
2525
}
2626
}
27+
28+
// If the `AsyncRead` implementation is an unbuffered wrapper around an
29+
// `AsyncInputStream`, some I/O operations can be more efficient.
30+
#[inline]
31+
fn as_async_input_stream(&self) -> Option<&io::AsyncInputStream> {
32+
None
33+
}
2734
}
2835

2936
impl<R: AsyncRead + ?Sized> AsyncRead for &mut R {
3037
#[inline]
3138
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
32-
(*self).read(buf).await
39+
(**self).read(buf).await
3340
}
3441

3542
#[inline]
3643
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
37-
(*self).read_to_end(buf).await
44+
(**self).read_to_end(buf).await
45+
}
46+
47+
#[inline]
48+
fn as_async_input_stream(&self) -> Option<&io::AsyncInputStream> {
49+
(**self).as_async_input_stream()
3850
}
3951
}

src/io/streams.rs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,8 @@ impl AsyncInputStream {
3232
.wait_for()
3333
.await;
3434
}
35-
}
36-
37-
impl AsyncRead for AsyncInputStream {
38-
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
35+
/// Like [`AsyncRead::read`], but doesn't require a `&mut self`.
36+
pub async fn read(&self, buf: &mut [u8]) -> Result<usize> {
3937
self.ready().await;
4038
// Ideally, the ABI would be able to read directly into buf. However, with the default
4139
// generated bindings, it returns a newly allocated vec, which we need to copy into buf.
@@ -57,6 +55,17 @@ impl AsyncRead for AsyncInputStream {
5755
}
5856
}
5957

58+
impl AsyncRead for AsyncInputStream {
59+
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
60+
Self::read(self, buf).await
61+
}
62+
63+
#[inline]
64+
fn as_async_input_stream(&self) -> Option<&AsyncInputStream> {
65+
Some(self)
66+
}
67+
}
68+
6069
#[derive(Debug)]
6170
pub struct AsyncOutputStream {
6271
// Lazily initialized pollable, used for lifetime of stream to check readiness.
@@ -86,10 +95,8 @@ impl AsyncOutputStream {
8695
.wait_for()
8796
.await;
8897
}
89-
}
90-
impl AsyncWrite for AsyncOutputStream {
91-
// Required methods
92-
async fn write(&mut self, buf: &[u8]) -> Result<usize> {
98+
/// Like [`AsyncWrite::write`], but doesn't require a `&mut self`.
99+
pub async fn write(&self, buf: &[u8]) -> Result<usize> {
93100
// Loops at most twice.
94101
loop {
95102
match self.stream.check_write() {
@@ -119,7 +126,8 @@ impl AsyncWrite for AsyncOutputStream {
119126
}
120127
}
121128
}
122-
async fn flush(&mut self) -> Result<()> {
129+
/// Like [`AsyncWrite::flush`], but doesn't require a `&mut self`.
130+
pub async fn flush(&self) -> Result<()> {
123131
match self.stream.flush() {
124132
Ok(()) => {
125133
self.ready().await;
@@ -134,3 +142,31 @@ impl AsyncWrite for AsyncOutputStream {
134142
}
135143
}
136144
}
145+
impl AsyncWrite for AsyncOutputStream {
146+
// Required methods
147+
async fn write(&mut self, buf: &[u8]) -> Result<usize> {
148+
Self::write(self, buf).await
149+
}
150+
async fn flush(&mut self) -> Result<()> {
151+
Self::flush(self).await
152+
}
153+
154+
#[inline]
155+
fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> {
156+
Some(self)
157+
}
158+
}
159+
160+
/// Wait for both streams to be ready and then do a WASI splice.
161+
pub(crate) async fn splice(
162+
reader: &AsyncInputStream,
163+
writer: &AsyncOutputStream,
164+
len: u64,
165+
) -> core::result::Result<u64, StreamError> {
166+
// Wait for both streams to be ready.
167+
let r = reader.ready();
168+
writer.ready().await;
169+
r.await;
170+
171+
writer.stream.splice(&reader.stream, len)
172+
}

src/io/write.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,33 @@ pub trait AsyncWrite {
1616
}
1717
}
1818
}
19+
20+
// If the `AsyncWrite` implementation is an unbuffered wrapper around an
21+
// `AsyncOutputStream`, some I/O operations can be more efficient.
22+
#[inline]
23+
fn as_async_output_stream(&self) -> Option<&io::AsyncOutputStream> {
24+
None
25+
}
1926
}
2027

2128
impl<W: AsyncWrite + ?Sized> AsyncWrite for &mut W {
2229
#[inline]
2330
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
24-
(*self).write(buf).await
31+
(**self).write(buf).await
2532
}
2633

2734
#[inline]
2835
async fn flush(&mut self) -> io::Result<()> {
29-
(*self).flush().await
36+
(**self).flush().await
3037
}
3138

3239
#[inline]
3340
async fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
34-
(*self).write_all(buf).await
41+
(**self).write_all(buf).await
42+
}
43+
44+
#[inline]
45+
fn as_async_output_stream(&self) -> Option<&io::AsyncOutputStream> {
46+
(**self).as_async_output_stream()
3547
}
3648
}

src/net/tcp_stream.rs

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,22 @@
1-
use std::cell::RefCell;
2-
31
use wasi::{
42
io::streams::{InputStream, OutputStream},
53
sockets::tcp::TcpSocket,
64
};
75

8-
use crate::io::{self, AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite};
6+
use crate::io::{self, AsyncInputStream, AsyncOutputStream};
97

108
/// A TCP stream between a local and a remote socket.
119
pub struct TcpStream {
12-
input: RefCell<AsyncInputStream>,
13-
output: RefCell<AsyncOutputStream>,
10+
input: AsyncInputStream,
11+
output: AsyncOutputStream,
1412
socket: TcpSocket,
1513
}
1614

1715
impl TcpStream {
1816
pub(crate) fn new(input: InputStream, output: OutputStream, socket: TcpSocket) -> Self {
1917
TcpStream {
20-
input: RefCell::new(AsyncInputStream::new(input)),
21-
output: RefCell::new(AsyncOutputStream::new(output)),
18+
input: AsyncInputStream::new(input),
19+
output: AsyncOutputStream::new(output),
2220
socket,
2321
}
2422
}
@@ -42,43 +40,63 @@ impl Drop for TcpStream {
4240
}
4341
}
4442

45-
impl AsyncRead for TcpStream {
43+
impl io::AsyncRead for TcpStream {
4644
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
47-
self.input.borrow_mut().read(buf).await
45+
self.input.read(buf).await
46+
}
47+
48+
fn as_async_input_stream(&self) -> Option<&AsyncInputStream> {
49+
Some(&self.input)
4850
}
4951
}
5052

51-
impl AsyncRead for &TcpStream {
53+
impl io::AsyncRead for &TcpStream {
5254
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
53-
self.input.borrow_mut().read(buf).await
55+
self.input.read(buf).await
56+
}
57+
58+
fn as_async_input_stream(&self) -> Option<&AsyncInputStream> {
59+
(**self).as_async_input_stream()
5460
}
5561
}
5662

57-
impl AsyncWrite for TcpStream {
63+
impl io::AsyncWrite for TcpStream {
5864
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
59-
self.output.borrow_mut().write(buf).await
65+
self.output.write(buf).await
6066
}
6167

6268
async fn flush(&mut self) -> io::Result<()> {
63-
self.output.borrow_mut().flush().await
69+
self.output.flush().await
70+
}
71+
72+
fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> {
73+
Some(&self.output)
6474
}
6575
}
6676

67-
impl AsyncWrite for &TcpStream {
77+
impl io::AsyncWrite for &TcpStream {
6878
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
69-
self.output.borrow_mut().write(buf).await
79+
self.output.write(buf).await
7080
}
7181

7282
async fn flush(&mut self) -> io::Result<()> {
73-
self.output.borrow_mut().flush().await
83+
self.output.flush().await
84+
}
85+
86+
fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> {
87+
(**self).as_async_output_stream()
7488
}
7589
}
7690

7791
pub struct ReadHalf<'a>(&'a TcpStream);
78-
impl<'a> AsyncRead for ReadHalf<'a> {
92+
impl<'a> io::AsyncRead for ReadHalf<'a> {
7993
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
8094
self.0.read(buf).await
8195
}
96+
97+
fn as_async_input_stream(&self) -> Option<&AsyncInputStream> {
98+
self.0.as_async_input_stream()
99+
}
82100
}
83101

84102
impl<'a> Drop for ReadHalf<'a> {
@@ -91,14 +109,18 @@ impl<'a> Drop for ReadHalf<'a> {
91109
}
92110

93111
pub struct WriteHalf<'a>(&'a TcpStream);
94-
impl<'a> AsyncWrite for WriteHalf<'a> {
112+
impl<'a> io::AsyncWrite for WriteHalf<'a> {
95113
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
96114
self.0.write(buf).await
97115
}
98116

99117
async fn flush(&mut self) -> io::Result<()> {
100118
self.0.flush().await
101119
}
120+
121+
fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> {
122+
self.0.as_async_output_stream()
123+
}
102124
}
103125

104126
impl<'a> Drop for WriteHalf<'a> {

test-programs/artifacts/tests/tcp_echo_server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ fn run_in_wasmtime(wasm: &[u8], stdout: Option<MemoryOutputPipe>) -> Result<()>
7777
#[test_log::test]
7878
fn tcp_echo_server() -> Result<()> {
7979
use std::io::{Read, Write};
80-
use std::net::TcpStream;
80+
use std::net::{Shutdown, TcpStream};
8181
use std::thread::sleep;
8282
use std::time::Duration;
8383

@@ -106,6 +106,8 @@ fn tcp_echo_server() -> Result<()> {
106106
tcpstream.write_all(MESSAGE).context("write to socket")?;
107107
println!("wrote to echo server");
108108

109+
tcpstream.shutdown(Shutdown::Write)?;
110+
109111
let mut readback = Vec::new();
110112
tcpstream
111113
.read_to_end(&mut readback)

0 commit comments

Comments
 (0)