Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions hydro_lang/src/live_collections/keyed_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,161 @@ impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
}
}

/// For each value `v` in each group, transform `v` using `f` and then treat the
/// result as an [`Iterator`] to produce values one by one within the same group.
/// The implementation for [`Iterator`] for the output type `I` must produce items
/// in a **deterministic** order.
///
/// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
/// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
///
/// # Example
/// ```rust
/// # use hydro_lang::prelude::*;
/// # use futures::StreamExt;
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
/// process
/// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
/// .into_keyed()
/// .flat_map_ordered(q!(|x| x))
/// # .entries()
/// # }, |mut stream| async move {
/// // { 1: [2, 3, 4], 2: [5, 6] }
/// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
/// ```
pub fn flat_map_ordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>
where
I: IntoIterator<Item = U>,
F: Fn(V) -> I + 'a,
K: Clone,
{
let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
KeyedStream {
underlying: self.underlying.flat_map_ordered(q!({
let orig = f;
move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
})),
_phantom_order: Default::default(),
}
}

/// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
/// for the output type `I` to produce items in any order.
///
/// # Example
/// ```rust
/// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
/// # use futures::StreamExt;
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
/// process
/// .source_iter(q!(vec![
/// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
/// (2, std::collections::HashSet::from_iter(vec![4, 5]))
/// ]))
/// .into_keyed()
/// .flat_map_unordered(q!(|x| x))
/// # .entries()
/// # }, |mut stream| async move {
/// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
/// # let mut results = Vec::new();
/// # for _ in 0..4 {
/// # results.push(stream.next().await.unwrap());
/// # }
/// # results.sort();
/// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
/// # }));
/// ```
pub fn flat_map_unordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, NoOrder, R>
where
I: IntoIterator<Item = U>,
F: Fn(V) -> I + 'a,
K: Clone,
{
let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
KeyedStream {
underlying: self.underlying.flat_map_unordered(q!({
let orig = f;
move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
})),
_phantom_order: Default::default(),
}
}

/// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
/// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
/// items in a **deterministic** order.
///
/// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
/// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
///
/// # Example
/// ```rust
/// # use hydro_lang::prelude::*;
/// # use futures::StreamExt;
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
/// process
/// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
/// .into_keyed()
/// .flatten_ordered()
/// # .entries()
/// # }, |mut stream| async move {
/// // { 1: [2, 3, 4], 2: [5, 6] }
/// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
/// ```
pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
where
V: IntoIterator<Item = U>,
K: Clone,
{
self.flat_map_ordered(q!(|d| d))
}

/// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
/// for the value type `V` to produce items in any order.
///
/// # Example
/// ```rust
/// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
/// # use futures::StreamExt;
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
/// process
/// .source_iter(q!(vec![
/// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
/// (2, std::collections::HashSet::from_iter(vec![4, 5]))
/// ]))
/// .into_keyed()
/// .flatten_unordered()
/// # .entries()
/// # }, |mut stream| async move {
/// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
/// # let mut results = Vec::new();
/// # for _ in 0..4 {
/// # results.push(stream.next().await.unwrap());
/// # }
/// # results.sort();
/// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
/// # }));
/// ```
pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
where
V: IntoIterator<Item = U>,
K: Clone,
{
self.flat_map_unordered(q!(|d| d))
}

/// An operator which allows you to "inspect" each element of a stream without
/// modifying it. The closure `f` is called on a reference to each value. This is
/// mainly useful for debugging, and should not be used to generate side-effects.
Expand Down
Loading