Skip to content

Commit aa1e7e2

Browse files
authored
stream: impl FromStream for BTreeSet (#7954)
1 parent 961757f commit aa1e7e2

File tree

2 files changed

+43
-0
lines changed

2 files changed

+43
-0
lines changed

tokio-stream/src/stream_ext/collect.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use core::mem;
66
use core::pin::Pin;
77
use core::task::{ready, Context, Poll};
88
use pin_project_lite::pin_project;
9+
use std::collections::BTreeSet;
910

1011
// Do not export this struct until `FromStream` can be unsealed.
1112
pin_project! {
@@ -135,6 +136,25 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> {
135136
}
136137
}
137138

139+
impl<T: Ord> FromStream<T> for BTreeSet<T> {}
140+
141+
impl<T: Ord> sealed::FromStreamPriv<T> for BTreeSet<T> {
142+
type InternalCollection = BTreeSet<T>;
143+
144+
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BTreeSet<T> {
145+
BTreeSet::new()
146+
}
147+
148+
fn extend(_: sealed::Internal, collection: &mut BTreeSet<T>, item: T) -> bool {
149+
collection.insert(item);
150+
true
151+
}
152+
153+
fn finalize(_: sealed::Internal, collection: &mut BTreeSet<T>) -> BTreeSet<T> {
154+
mem::take(collection)
155+
}
156+
}
157+
138158
impl<T> FromStream<T> for Box<[T]> {}
139159

140160
impl<T> sealed::FromStreamPriv<T> for Box<[T]> {

tokio-stream/tests/stream_collect.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::BTreeSet;
2+
13
use tokio_stream::{self as stream, StreamExt};
24
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
35

@@ -61,6 +63,27 @@ async fn collect_vec_items() {
6163
assert_eq!(vec![1, 2], coll);
6264
}
6365

66+
#[tokio::test]
67+
async fn collect_btreeset_items() {
68+
let (tx, rx) = mpsc::unbounded_channel_stream();
69+
let mut fut = task::spawn(rx.collect::<BTreeSet<i32>>());
70+
71+
assert_pending!(fut.poll());
72+
73+
tx.send(2).unwrap();
74+
assert!(fut.is_woken());
75+
assert_pending!(fut.poll());
76+
77+
tx.send(1).unwrap();
78+
assert!(fut.is_woken());
79+
assert_pending!(fut.poll());
80+
81+
drop(tx);
82+
assert!(fut.is_woken());
83+
let coll = assert_ready!(fut.poll());
84+
assert_eq!(BTreeSet::from([1, 2]), coll);
85+
}
86+
6487
#[tokio::test]
6588
async fn collect_string_items() {
6689
let (tx, rx) = mpsc::unbounded_channel_stream();

0 commit comments

Comments
 (0)