diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index 5601a112205..23eb1b06281 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -24,6 +24,7 @@ futures-core = "0.3.0" [dev-dependencies] tokio = { version = "1.2.0", path = "../tokio", features = ["full"] } futures-util = "0.3.0" +futures-test = "0.3.5" [package.metadata.docs.rs] all-features = true diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index c3317092775..bd47432dac5 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -23,6 +23,7 @@ use tokio::sync::mpsc; use tokio::time::{self, Duration, Instant, Sleep}; use tokio_stream::wrappers::UnboundedReceiverStream; +use core::panic; use futures_core::Stream; use std::collections::VecDeque; use std::fmt; @@ -59,11 +60,13 @@ pub struct Builder { enum Action { Read(Vec), Write(Vec), + Shutdown, Wait(Duration), // Wrapped in Arc so that Builder can be cloned and Send. // Mock is not cloned as does not need to check Rc for ref counts. ReadError(Option>), WriteError(Option>), + ShutdownError(Option>), } struct Inner { @@ -75,6 +78,13 @@ struct Inner { name: String, } +enum Shutdown { + ShouldSuccess, + ShouldError(io::Error), + NeedWait, + NoActions, +} + impl Builder { /// Return a new, empty `Builder`. pub fn new() -> Self { @@ -129,6 +139,25 @@ impl Builder { self } + /// Sequence a shutdown operation. + /// + /// The next operation in the mock's script will be to expect a + /// [`AsyncWrite::poll_shutdown`] call. + pub fn shutdown(&mut self) -> &mut Self { + self.actions.push_back(Action::Shutdown); + self + } + + /// Sequence a shutdown operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a + /// [`AsyncWrite::poll_shutdown`] call that returns `error`. + pub fn shutdown_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::ShutdownError(error)); + self + } + /// Set name of the mock IO object to include in panic messages and debug output pub fn name(&mut self, name: impl Into) -> &mut Self { self.name = name.into(); @@ -197,6 +226,10 @@ impl Inner { let rx = UnboundedReceiverStream::new(rx); + // Actually, we should deny any write action after the shutdown action. + // However, since we currently doesn't check the write action after the error + // like BrokenPipe error, we ignore this case to keep the behavior consistent. + let inner = Inner { actions, sleep: None, @@ -235,10 +268,15 @@ impl Inner { let err = Arc::try_unwrap(err).expect("There are no other references."); Err(err) } - Some(_) => { + Some(&mut Action::Write(_)) + | Some(&mut Action::WriteError(_)) + | Some(&mut Action::Wait(_)) => { // Either waiting or expecting a write Err(io::ErrorKind::WouldBlock.into()) } + Some(&mut Action::Shutdown) | Some(&mut Action::ShutdownError(_)) => { + panic!("unexpected read, expect poll_shutdown"); + } None => Ok(()), } } @@ -280,10 +318,11 @@ impl Inner { Action::Wait(..) | Action::WriteError(..) => { break; } - _ => {} + Action::Read(_) | Action::ReadError(_) => (), + Action::Shutdown | Action::ShutdownError(_) => { + panic!("unexpected write, expect poll_shutdown"); + } } - - // TODO: remove write } Ok(ret) @@ -296,6 +335,25 @@ impl Inner { } } + fn shutdown(&mut self) -> Shutdown { + match self.action() { + Some(&mut Action::Shutdown) => Shutdown::ShouldSuccess, + Some(&mut Action::ShutdownError(ref mut err)) => { + let err = err.take().expect("Should have been removed from actions."); + let err = Arc::try_unwrap(err).expect("There are no other references."); + Shutdown::ShouldError(err) + } + Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) => { + panic!("unexpected poll_shutdown, expect read"); + } + Some(&mut Action::Write(_)) | Some(&mut Action::WriteError(_)) => { + panic!("unexpected poll_shutdown, expect write"); + } + Some(&mut Action::Wait(_)) => Shutdown::NeedWait, + None => Shutdown::NoActions, + } + } + fn action(&mut self) -> Option<&mut Action> { loop { if self.actions.is_empty() { @@ -332,6 +390,12 @@ impl Inner { break; } } + Action::Shutdown => break, + Action::ShutdownError(ref mut error) => { + if error.is_some() { + break; + } + } } let _action = self.actions.pop_front(); @@ -441,6 +505,7 @@ impl AsyncWrite for Mock { panic!("unexpected WouldBlock {}", self.pmsg()); } } + Ok(0) if buf.is_empty() => return Poll::Ready(Ok(0)), Ok(0) => { // TODO: Is this correct? if !self.inner.actions.is_empty() { @@ -470,8 +535,55 @@ impl AsyncWrite for Mock { Poll::Ready(Ok(())) } - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + loop { + if let Some(ref mut sleep) = self.inner.sleep { + ready!(Pin::new(sleep).poll(cx)); + } + + // If a sleep is set, it has already fired + self.inner.sleep = None; + + match self.inner.shutdown() { + Shutdown::ShouldSuccess => { + assert!(matches!( + self.inner.actions.pop_front(), + Some(Action::Shutdown) + )); + self.maybe_wakeup_reader(); + return Poll::Ready(Ok(())); + } + Shutdown::ShouldError(e) => { + assert!(matches!( + self.inner.actions.pop_front(), + Some(Action::ShutdownError(_)) + )); + self.maybe_wakeup_reader(); + return Poll::Ready(Err(e)); + } + Shutdown::NeedWait => { + if let Some(rem) = self.inner.remaining_wait() { + let until = Instant::now() + rem; + self.inner.sleep = Some(Box::pin(time::sleep_until(until))); + } else { + panic!( + "unexpected poll_shutdown, expect read or write {}", + self.pmsg() + ); + } + } + Shutdown::NoActions => { + if let Some(action) = ready!(self.inner.poll_action(cx)) { + self.inner.actions.push_back(action); + } else { + panic!( + "unexpected poll_shutdown, but actually no more sequenced actions {}", + self.pmsg() + ); + } + } + } + } } } @@ -494,7 +606,11 @@ impl Drop for Mock { "There is still data left to write. {}", self.pmsg() ), - _ => (), + Action::Shutdown => panic!("AsyncWrite::poll_shutdown was not called. {}", self.pmsg()), + Action::ReadError(_) | Action::WriteError(_) | Action::Wait(_) => (), + // Since the existing implementation ignores the read/write error, so we also ignore the + // shutdown error here to keep the behavior consistent. + Action::ShutdownError(_) => (), }); } } diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index effac9a51fa..8885a2a2daf 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -1,9 +1,15 @@ #![warn(rust_2018_idioms)] +use futures_test::task::{noop_context, panic_waker}; +use futures_util::pin_mut; +use std::future::Future; use std::io; +use std::task::{Context, Poll}; +use tokio::io::AsyncWrite; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::time::{Duration, Instant}; use tokio_test::io::Builder; +use tokio_test::{assert_pending, assert_ready}; #[tokio::test] async fn read() { @@ -170,3 +176,125 @@ async fn multiple_wait() { start.elapsed().as_millis() ); } + +// No matter which usecase, it doesn't make sense for a read +// hang forever. However, currently, if there is no sequenced read +// action, it will hang forever. +// +// Since we want be aware of the fixing of this bug, +// no matter intentionally or unintentionally, +// we add this test to catch the behavior change. +// +// It looks like fixing it is not hard, but not sure the downstream +// impact, which might be a breaking change due to the +// `Mock::inner::read_wait` field, so we keep it as is for now. +// +// TODO: fix this bug +#[test] +fn should_hang_forever_on_read_but_no_sequenced_read_action() { + let mut mock = Builder::new() + .write_error(io::Error::new(io::ErrorKind::Other, "cruel")) + .build(); + + let mut buf = [0; 1]; + let read_exact_fut = mock.read(&mut buf); + pin_mut!(read_exact_fut); + assert_pending!(read_exact_fut.poll(&mut Context::from_waker(&panic_waker()))); +} + +// The `Mock` is expected to always panic if there is an unconsumed error action, +// rather than silently ignoring it. However, +// currently it only panics on unconsumed read/write actions, +// not on error actions. Fixing this requires a breaking change. +// +// This test verifies that it does not panic yet, +// to prevent accidentally introducing the breaking change prematurely. +// +// TODO: fix this bug in the next major release +#[test] +fn do_not_panic_unconsumed_error() { + let _mock = Builder::new() + .read_error(io::Error::new(io::ErrorKind::Other, "cruel")) + .build(); +} + +// The `Mock` must never panic, even if cloned multiple times. +// However, at present, cloning the builder under certain +// conditions causes a panic. +// +// Fixing this would require making `Mock` non-`Clone`, +// which is a breaking change. +// +// Since we want be aware of the fixing of this bug, +// no matter intentionally or unintentionally, +// we add this test to catch the behavior change. +// +// TODO: fix this bug in the next major release +#[tokio::test] +#[should_panic = "There are no other references.: Custom { kind: Other, error: \"cruel\" }"] +async fn should_panic_if_clone_the_builder_with_error_action() { + let mut builder = Builder::new(); + builder.write_error(io::Error::new(io::ErrorKind::Other, "cruel")); + let mut builder2 = builder.clone(); + + let mut mock = builder.build(); + let _mock2 = builder2.build(); + + // this write_all will panic due to unwrapping the error from `Arc` + mock.write_all(b"hello").await.unwrap(); + unreachable!(); +} + +#[tokio::test] +async fn should_not_hang_forever_on_zero_length_write() { + let mock = Builder::new().write(b"write").build(); + pin_mut!(mock); + match mock.as_mut().poll_write(&mut noop_context(), &[0u8; 0]) { + // drain the remaining write action to avoid panic at drop of the `mock` + Poll::Ready(Ok(0)) => mock.write_all(b"write").await.unwrap(), + Poll::Ready(Ok(n)) => panic!("expected to write 0 bytes, wrote {n} bytes instead"), + Poll::Ready(Err(e)) => panic!("expected to write 0 bytes, got error {e} instead"), + Poll::Pending => panic!("expected to write 0 bytes immediately, but pending instead"), + } +} + +#[tokio::test] +async fn shutdown() { + let mut mock = Builder::new().shutdown().build(); + mock.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn shutdown_error() { + let error = io::Error::new(io::ErrorKind::Other, "cruel"); + let mut mock = Builder::new().shutdown_error(error).build(); + let err = mock.shutdown().await.unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::Other); + assert_eq!("cruel", format!("{err}")); +} + +#[tokio::test(start_paused = true)] +async fn shutdown_wait() { + const WAIT: Duration = Duration::from_secs(1); + + let mock = Builder::new().wait(WAIT).shutdown().build(); + pin_mut!(mock); + + assert_pending!(mock.as_mut().poll_shutdown(&mut noop_context())); + + tokio::time::advance(WAIT).await; + let _ = assert_ready!(mock.as_mut().poll_shutdown(&mut noop_context())); +} + +#[test] +#[should_panic = "AsyncWrite::poll_shutdown was not called. (1 actions remain)"] +fn should_panic_on_leftover_shutdown_action() { + let _mock = Builder::new().shutdown().build(); +} + +#[test] +fn should_not_panic_on_leftover_shutdown_error_action() { + let _mock = Builder::new() + .shutdown_error(io::Error::new(io::ErrorKind::Other, "cruel")) + .build(); +}