Skip to content

Commit 93a8049

Browse files
petrosaggtaiki-e
authored andcommitted
futures-util: add StreamExt::count method
Signed-off-by: Petros Angelatos <[email protected]>
1 parent afff1ce commit 93a8049

File tree

2 files changed

+89
-0
lines changed

2 files changed

+89
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use futures_core::future::{FusedFuture, Future};
4+
use futures_core::ready;
5+
use futures_core::stream::{FusedStream, Stream};
6+
use futures_core::task::{Context, Poll};
7+
use pin_project_lite::pin_project;
8+
9+
pin_project! {
10+
/// Future for the [`count`](super::StreamExt::count) method.
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct Count<St> {
13+
#[pin]
14+
stream: St,
15+
count: usize
16+
}
17+
}
18+
19+
impl<St> fmt::Debug for Count<St>
20+
where
21+
St: fmt::Debug,
22+
{
23+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24+
f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish()
25+
}
26+
}
27+
28+
impl<St: Stream> Count<St> {
29+
pub(super) fn new(stream: St) -> Self {
30+
Self { stream, count: 0 }
31+
}
32+
}
33+
34+
impl<St: FusedStream> FusedFuture for Count<St> {
35+
fn is_terminated(&self) -> bool {
36+
self.stream.is_terminated()
37+
}
38+
}
39+
40+
impl<St: Stream> Future for Count<St> {
41+
type Output = usize;
42+
43+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
44+
let mut this = self.project();
45+
46+
Poll::Ready(loop {
47+
match ready!(this.stream.as_mut().poll_next(cx)) {
48+
Some(_) => *this.count += 1,
49+
None => break *this.count,
50+
}
51+
})
52+
}
53+
}

futures-util/src/stream/stream/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ mod concat;
3838
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
3939
pub use self::concat::Concat;
4040

41+
mod count;
42+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
43+
pub use self::count::Count;
44+
4145
mod cycle;
4246
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
4347
pub use self::cycle::Cycle;
@@ -590,6 +594,38 @@ pub trait StreamExt: Stream {
590594
assert_future::<Self::Item, _>(Concat::new(self))
591595
}
592596

597+
/// Drives the stream to completion, counting the number of items.
598+
///
599+
/// # Overflow Behavior
600+
///
601+
/// The method does no guarding against overflows, so counting elements of a
602+
/// stream with more than [`usize::MAX`] elements either produces the wrong
603+
/// result or panics. If debug assertions are enabled, a panic is guaranteed.
604+
///
605+
/// # Panics
606+
///
607+
/// This function might panic if the iterator has more than [`usize::MAX`]
608+
/// elements.
609+
///
610+
/// # Examples
611+
///
612+
/// ```
613+
/// # futures::executor::block_on(async {
614+
/// use futures::stream::{self, StreamExt};
615+
///
616+
/// let stream = stream::iter(1..=10);
617+
/// let count = stream.count().await;
618+
///
619+
/// assert_eq!(count, 10);
620+
/// # });
621+
/// ```
622+
fn count(self) -> Count<Self>
623+
where
624+
Self: Sized,
625+
{
626+
assert_future::<usize, _>(Count::new(self))
627+
}
628+
593629
/// Repeats a stream endlessly.
594630
///
595631
/// The stream never terminates. Note that you likely want to avoid

0 commit comments

Comments
 (0)