Skip to content

Commit 1436c05

Browse files
authored
Stream::size_hint for mpsc channels (#2660)
1 parent 769632c commit 1436c05

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed

futures-channel/src/mpsc/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,6 +1072,14 @@ impl<T> Stream for Receiver<T> {
10721072
}
10731073
}
10741074
}
1075+
1076+
fn size_hint(&self) -> (usize, Option<usize>) {
1077+
if let Some(inner) = &self.inner {
1078+
decode_state(inner.state.load(SeqCst)).size_hint()
1079+
} else {
1080+
(0, Some(0))
1081+
}
1082+
}
10751083
}
10761084

10771085
impl<T> Drop for Receiver<T> {
@@ -1216,6 +1224,14 @@ impl<T> Stream for UnboundedReceiver<T> {
12161224
}
12171225
}
12181226
}
1227+
1228+
fn size_hint(&self) -> (usize, Option<usize>) {
1229+
if let Some(inner) = &self.inner {
1230+
decode_state(inner.state.load(SeqCst)).size_hint()
1231+
} else {
1232+
(0, Some(0))
1233+
}
1234+
}
12191235
}
12201236

12211237
impl<T> Drop for UnboundedReceiver<T> {
@@ -1306,6 +1322,14 @@ impl State {
13061322
fn is_closed(&self) -> bool {
13071323
!self.is_open && self.num_messages == 0
13081324
}
1325+
1326+
fn size_hint(&self) -> (usize, Option<usize>) {
1327+
if self.is_open {
1328+
(self.num_messages, None)
1329+
} else {
1330+
(self.num_messages, Some(self.num_messages))
1331+
}
1332+
}
13091333
}
13101334

13111335
/*
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use futures::channel::mpsc;
2+
use futures::stream::Stream;
3+
4+
#[test]
5+
fn unbounded_size_hint() {
6+
let (tx, mut rx) = mpsc::unbounded::<u32>();
7+
assert_eq!((0, None), rx.size_hint());
8+
tx.unbounded_send(1).unwrap();
9+
assert_eq!((1, None), rx.size_hint());
10+
rx.try_next().unwrap().unwrap();
11+
assert_eq!((0, None), rx.size_hint());
12+
tx.unbounded_send(2).unwrap();
13+
tx.unbounded_send(3).unwrap();
14+
assert_eq!((2, None), rx.size_hint());
15+
drop(tx);
16+
assert_eq!((2, Some(2)), rx.size_hint());
17+
rx.try_next().unwrap().unwrap();
18+
assert_eq!((1, Some(1)), rx.size_hint());
19+
rx.try_next().unwrap().unwrap();
20+
assert_eq!((0, Some(0)), rx.size_hint());
21+
}
22+
23+
#[test]
24+
fn channel_size_hint() {
25+
let (mut tx, mut rx) = mpsc::channel::<u32>(10);
26+
assert_eq!((0, None), rx.size_hint());
27+
tx.try_send(1).unwrap();
28+
assert_eq!((1, None), rx.size_hint());
29+
rx.try_next().unwrap().unwrap();
30+
assert_eq!((0, None), rx.size_hint());
31+
tx.try_send(2).unwrap();
32+
tx.try_send(3).unwrap();
33+
assert_eq!((2, None), rx.size_hint());
34+
drop(tx);
35+
assert_eq!((2, Some(2)), rx.size_hint());
36+
rx.try_next().unwrap().unwrap();
37+
assert_eq!((1, Some(1)), rx.size_hint());
38+
rx.try_next().unwrap().unwrap();
39+
assert_eq!((0, Some(0)), rx.size_hint());
40+
}

0 commit comments

Comments
 (0)