Skip to content

Workaround for unexpected stream break #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions src/http/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,24 @@ impl AsyncRead for IncomingBody {
let buf = match &mut self.buf {
Some(ref mut buf) => buf,
None => {
// Wait for an event to be ready
let pollable = self.body_stream.subscribe();
Reactor::current().wait_for(pollable).await;

// Read the bytes from the body stream
let buf = match self.body_stream.read(CHUNK_SIZE) {
Ok(buf) => buf,
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(std::io::Error::other(format!(
"last operation failed: {}",
err.to_debug_string()
)))
// workaround for unexpected stream break. https://github.com/bytecodealliance/wasmtime/issues/9667
let reactor = Reactor::current();
let buf = loop {
reactor.wait_for(self.body_stream.subscribe()).await;
match self.body_stream.read(CHUNK_SIZE) {
Ok(buf) => {
if buf.is_empty() {
continue;
}
break buf;
}
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(std::io::Error::other(format!(
"last operation failed: {}",
err.to_debug_string()
)))
}
}
};
self.buf.insert(buf)
Expand Down
38 changes: 28 additions & 10 deletions src/net/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,20 @@ impl TcpStream {

impl AsyncRead for TcpStream {
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Reactor::current().wait_for(self.input.subscribe()).await;
let slice = match self.input.read(buf.len() as u64) {
Ok(slice) => slice,
Err(StreamError::Closed) => return Ok(0),
Err(e) => return Err(to_io_err(e)),
// workaround for unexpected stream break. https://github.com/bytecodealliance/wasmtime/issues/9667
let reactor = Reactor::current();
let slice = loop {
reactor.wait_for(self.input.subscribe()).await;
match self.input.read(buf.len() as u64) {
Ok(slice) => {
if slice.is_empty() {
continue;
}
break slice;
}
Err(StreamError::Closed) => return Ok(0),
Err(e) => return Err(to_io_err(e)),
};
};
let bytes_read = slice.len();
buf[..bytes_read].clone_from_slice(&slice);
Expand All @@ -44,11 +53,20 @@ impl AsyncRead for TcpStream {

impl AsyncRead for &TcpStream {
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Reactor::current().wait_for(self.input.subscribe()).await;
let slice = match self.input.read(buf.len() as u64) {
Ok(slice) => slice,
Err(StreamError::Closed) => return Ok(0),
Err(e) => return Err(to_io_err(e)),
// workaround for unexpected stream break. https://github.com/bytecodealliance/wasmtime/issues/9667
let reactor = Reactor::current();
let slice = loop {
reactor.wait_for(self.input.subscribe()).await;
match self.input.read(buf.len() as u64) {
Ok(slice) => {
if slice.is_empty() {
continue;
}
break slice;
}
Err(StreamError::Closed) => return Ok(0),
Err(e) => return Err(to_io_err(e)),
};
};
let bytes_read = slice.len();
buf[..bytes_read].clone_from_slice(&slice);
Expand Down
18 changes: 10 additions & 8 deletions test-programs/artifacts/tests/tcp_echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,18 @@ fn tcp_echo_server() -> Result<()> {

const MESSAGE: &[u8] = b"hello, echoserver!\n";

tcpstream.write_all(MESSAGE).context("write to socket")?;
println!("wrote to echo server");
let n = 2;
for _ in 0..n {
tcpstream.write_all(MESSAGE).context("write to socket")?;
println!("wrote to echo server");

let mut readback = Vec::new();
tcpstream
.read_to_end(&mut readback)
.context("read from socket")?;
let mut buf = [0; 1024];
let n = tcpstream.read(&mut buf).context("read from socket")?;
let readback = &buf[..n];

println!("read from wasm server");
assert_eq!(MESSAGE, readback);
println!("read from wasm server");
assert_eq!(MESSAGE, readback);
}

if wasmtime_thread.is_finished() {
wasmtime_thread.join().expect("wasmtime panicked")?;
Expand Down