diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 69bb88ffb..a74d3b29d 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -1113,7 +1113,10 @@ impl Unpin for Receiver {} impl FusedStream for Receiver { fn is_terminated(&self) -> bool { - self.inner.is_none() + match &self.inner { + Some(inner) => decode_state(inner.state.load(SeqCst)).is_closed(), + None => true, + } } } @@ -1307,7 +1310,10 @@ impl UnboundedReceiver { impl FusedStream for UnboundedReceiver { fn is_terminated(&self) -> bool { - self.inner.is_none() + match &self.inner { + Some(inner) => decode_state(inner.state.load(SeqCst)).is_closed(), + None => true, + } } } diff --git a/futures-channel/tests/mpsc-close.rs b/futures-channel/tests/mpsc-close.rs index be5d08c32..18d6917ad 100644 --- a/futures-channel/tests/mpsc-close.rs +++ b/futures-channel/tests/mpsc-close.rs @@ -5,6 +5,7 @@ use futures::sink::SinkExt; use futures::stream::StreamExt; use futures::task::{Context, Poll}; use futures_channel::mpsc::TryRecvError; +use futures_core::FusedStream; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::thread; @@ -65,6 +66,7 @@ fn multiple_senders_disconnect() { // dropping the final sender will close the channel drop(tx4); + assert!(rx.is_terminated()); assert_eq!(block_on(rx.next()), None); } } @@ -84,6 +86,7 @@ fn multiple_senders_close_channel() { let err = block_on(tx2.send(5)).unwrap_err(); assert!(err.is_disconnected()); + assert!(rx.is_terminated()); assert_eq!(block_on(rx.next()), None); } @@ -100,6 +103,7 @@ fn multiple_senders_close_channel() { let err = block_on(tx2.send(5)).unwrap_err(); assert!(err.is_disconnected()); + assert!(rx.is_terminated()); assert_eq!(block_on(rx.next()), None); } } @@ -283,6 +287,7 @@ fn unbounded_try_next_after_none() { let (tx, mut rx) = mpsc::unbounded::(); // Drop the sender, close the channel. drop(tx); + assert!(rx.is_terminated()); // Receive the end of channel. assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); // None received, check we can call `try_next` again. @@ -295,6 +300,7 @@ fn bounded_try_next_after_none() { let (tx, mut rx) = mpsc::channel::(17); // Drop the sender, close the channel. drop(tx); + assert!(rx.is_terminated()); // Receive the end of channel. assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); // None received, check we can call `try_next` again. @@ -310,6 +316,7 @@ fn unbounded_try_recv_after_none() { // Drop the sender, close the channel. drop(tx); + assert!(rx.is_terminated()); // Receive the end of channel. assert_eq!(Err(TryRecvError::Closed), rx.try_recv()); // Closed received, check we can call `try_next` again. @@ -325,6 +332,7 @@ fn bounded_try_recv_after_none() { // Drop the sender, close the channel. drop(tx); + assert!(rx.is_terminated()); // Receive the end of channel. assert_eq!(Err(TryRecvError::Closed), rx.try_recv()); // Closed received, check we can call `try_next` again.