Skip to content

Commit 283f11f

Browse files
authored
Merge pull request #21 from yoshuawuyts/pch/read_to_end
Add AsyncRead::read_to_end and AsyncWrite::write_all
2 parents 84519fd + c6ba81c commit 283f11f

File tree

6 files changed

+56
-22
lines changed

6 files changed

+56
-22
lines changed

examples/http_get.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
1717
.ok_or_else(|| "response expected to have Content-Type header")?;
1818
assert_eq!(content_type, "application/json; charset=utf-8");
1919

20-
// Would much prefer read_to_end here:
21-
let mut body_buf = vec![0; 4096];
22-
let body_len = response.body().read(&mut body_buf).await?;
23-
body_buf.truncate(body_len);
20+
let mut body_buf = Vec::new();
21+
let _body_len = response.body().read_to_end(&mut body_buf).await?;
2422

2523
let val: serde_json::Value = serde_json::from_slice(&body_buf)?;
2624
let body_url = val

src/http/response.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ pub struct IncomingBody {
101101
// How many bytes have we already read from the buf?
102102
buf_offset: usize,
103103

104-
// IMPORTANT: the order of these fields here matters. `incoming_body` must
105-
// be dropped before `body_stream`.
104+
// IMPORTANT: the order of these fields here matters. `body_stream` must
105+
// be dropped before `_incoming_body`.
106106
body_stream: InputStream,
107107
_incoming_body: WasiIncomingBody,
108108
}
@@ -117,12 +117,16 @@ impl AsyncRead for IncomingBody {
117117
Reactor::current().wait_for(pollable).await;
118118

119119
// Read the bytes from the body stream
120-
let buf = self.body_stream.read(CHUNK_SIZE).map_err(|err| match err {
121-
StreamError::LastOperationFailed(err) => {
122-
std::io::Error::other(format!("{}", err.to_debug_string()))
120+
let buf = match self.body_stream.read(CHUNK_SIZE) {
121+
Ok(buf) => buf,
122+
Err(StreamError::Closed) => return Ok(0),
123+
Err(StreamError::LastOperationFailed(err)) => {
124+
return Err(std::io::Error::other(format!(
125+
"last operation failed: {}",
126+
err.to_debug_string()
127+
)))
123128
}
124-
StreamError::Closed => std::io::Error::other("Connection closed"),
125-
})?;
129+
};
126130
self.buf.insert(buf)
127131
}
128132
};

src/io/copy.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,6 @@ where
1212
if bytes_read == 0 {
1313
break 'read Ok(());
1414
}
15-
let mut slice = &buf[0..bytes_read];
16-
17-
'write: loop {
18-
let bytes_written = writer.write(slice).await?;
19-
slice = &slice[bytes_written..];
20-
if slice.is_empty() {
21-
break 'write;
22-
}
23-
}
15+
writer.write_all(&buf[0..bytes_read]).await?;
2416
}
2517
}

src/io/read.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,27 @@
11
use crate::io;
22

3+
const CHUNK_SIZE: usize = 2048;
4+
35
/// Read bytes from a source.
46
pub trait AsyncRead {
57
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
8+
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
9+
// total bytes written to buf
10+
let mut n = 0;
11+
12+
loop {
13+
// grow buf if empty
14+
if buf.len() == n {
15+
buf.resize(n + CHUNK_SIZE, 0u8);
16+
}
17+
18+
let len = self.read(&mut buf[n..]).await?;
19+
if len == 0 {
20+
buf.truncate(n);
21+
return Ok(n);
22+
}
23+
24+
n += len;
25+
}
26+
}
627
}

src/io/write.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,15 @@ pub trait AsyncWrite {
55
// Required methods
66
async fn write(&mut self, buf: &[u8]) -> io::Result<usize>;
77
async fn flush(&mut self) -> io::Result<()>;
8+
9+
async fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
10+
let mut to_write = &buf[0..];
11+
loop {
12+
let bytes_written = self.write(to_write).await?;
13+
to_write = &to_write[bytes_written..];
14+
if to_write.is_empty() {
15+
return Ok(());
16+
}
17+
}
18+
}
819
}

src/net/tcp_stream.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ impl TcpStream {
3131
impl AsyncRead for TcpStream {
3232
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
3333
Reactor::current().wait_for(self.input.subscribe()).await;
34-
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
34+
let slice = match self.input.read(buf.len() as u64) {
35+
Ok(slice) => slice,
36+
Err(StreamError::Closed) => return Ok(0),
37+
Err(e) => return Err(to_io_err(e)),
38+
};
3539
let bytes_read = slice.len();
3640
buf[..bytes_read].clone_from_slice(&slice);
3741
Ok(bytes_read)
@@ -41,7 +45,11 @@ impl AsyncRead for TcpStream {
4145
impl AsyncRead for &TcpStream {
4246
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
4347
Reactor::current().wait_for(self.input.subscribe()).await;
44-
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
48+
let slice = match self.input.read(buf.len() as u64) {
49+
Ok(slice) => slice,
50+
Err(StreamError::Closed) => return Ok(0),
51+
Err(e) => return Err(to_io_err(e)),
52+
};
4553
let bytes_read = slice.len();
4654
buf[..bytes_read].clone_from_slice(&slice);
4755
Ok(bytes_read)

0 commit comments

Comments
 (0)