Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,10 @@ impl<T> Unpin for Receiver<T> {}

impl<T> FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
self.inner.is_none()
match &self.inner {
Some(inner) => decode_state(inner.state.load(SeqCst)).is_closed(),
None => true,
}
}
}

Expand Down Expand Up @@ -1307,7 +1310,10 @@ impl<T> UnboundedReceiver<T> {

impl<T> FusedStream for UnboundedReceiver<T> {
fn is_terminated(&self) -> bool {
self.inner.is_none()
match &self.inner {
Some(inner) => decode_state(inner.state.load(SeqCst)).is_closed(),
None => true,
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions futures-channel/tests/mpsc-close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::task::{Context, Poll};
use futures_channel::mpsc::TryRecvError;
use futures_core::FusedStream;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::thread;
Expand Down Expand Up @@ -65,6 +66,7 @@ fn multiple_senders_disconnect() {

// dropping the final sender will close the channel
drop(tx4);
assert!(rx.is_terminated());
assert_eq!(block_on(rx.next()), None);
}
}
Expand All @@ -84,6 +86,7 @@ fn multiple_senders_close_channel() {
let err = block_on(tx2.send(5)).unwrap_err();
assert!(err.is_disconnected());

assert!(rx.is_terminated());
assert_eq!(block_on(rx.next()), None);
}

Expand All @@ -100,6 +103,7 @@ fn multiple_senders_close_channel() {
let err = block_on(tx2.send(5)).unwrap_err();
assert!(err.is_disconnected());

assert!(rx.is_terminated());
assert_eq!(block_on(rx.next()), None);
}
}
Expand Down Expand Up @@ -283,6 +287,7 @@ fn unbounded_try_next_after_none() {
let (tx, mut rx) = mpsc::unbounded::<String>();
// Drop the sender, close the channel.
drop(tx);
assert!(rx.is_terminated());
// Receive the end of channel.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
// None received, check we can call `try_next` again.
Expand All @@ -295,6 +300,7 @@ fn bounded_try_next_after_none() {
let (tx, mut rx) = mpsc::channel::<String>(17);
// Drop the sender, close the channel.
drop(tx);
assert!(rx.is_terminated());
// Receive the end of channel.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
// None received, check we can call `try_next` again.
Expand All @@ -310,6 +316,7 @@ fn unbounded_try_recv_after_none() {

// Drop the sender, close the channel.
drop(tx);
assert!(rx.is_terminated());
// Receive the end of channel.
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
// Closed received, check we can call `try_next` again.
Expand All @@ -325,6 +332,7 @@ fn bounded_try_recv_after_none() {

// Drop the sender, close the channel.
drop(tx);
assert!(rx.is_terminated());
// Receive the end of channel.
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
// Closed received, check we can call `try_next` again.
Expand Down
Loading