Skip to content

Commit 584ebf1

Browse files
authored
feat(hydro_lang): add flat_map / flatten to KeyedStream (#2127)
1 parent b59e30d commit 584ebf1

File tree

1 file changed

+155
-0
lines changed
  • hydro_lang/src/live_collections/keyed_stream

1 file changed

+155
-0
lines changed

hydro_lang/src/live_collections/keyed_stream/mod.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,161 @@ impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
412412
}
413413
}
414414

415+
/// For each value `v` in each group, transform `v` using `f` and then treat the
416+
/// result as an [`Iterator`] to produce values one by one within the same group.
417+
/// The implementation for [`Iterator`] for the output type `I` must produce items
418+
/// in a **deterministic** order.
419+
///
420+
/// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
421+
/// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
422+
///
423+
/// # Example
424+
/// ```rust
425+
/// # use hydro_lang::prelude::*;
426+
/// # use futures::StreamExt;
427+
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
428+
/// process
429+
/// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
430+
/// .into_keyed()
431+
/// .flat_map_ordered(q!(|x| x))
432+
/// # .entries()
433+
/// # }, |mut stream| async move {
434+
/// // { 1: [2, 3, 4], 2: [5, 6] }
435+
/// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
436+
/// # assert_eq!(stream.next().await.unwrap(), w);
437+
/// # }
438+
/// # }));
439+
/// ```
440+
pub fn flat_map_ordered<U, I, F>(
441+
self,
442+
f: impl IntoQuotedMut<'a, F, L> + Copy,
443+
) -> KeyedStream<K, U, L, B, O, R>
444+
where
445+
I: IntoIterator<Item = U>,
446+
F: Fn(V) -> I + 'a,
447+
K: Clone,
448+
{
449+
let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
450+
KeyedStream {
451+
underlying: self.underlying.flat_map_ordered(q!({
452+
let orig = f;
453+
move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
454+
})),
455+
_phantom_order: Default::default(),
456+
}
457+
}
458+
459+
/// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
460+
/// for the output type `I` to produce items in any order.
461+
///
462+
/// # Example
463+
/// ```rust
464+
/// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
465+
/// # use futures::StreamExt;
466+
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
467+
/// process
468+
/// .source_iter(q!(vec![
469+
/// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
470+
/// (2, std::collections::HashSet::from_iter(vec![4, 5]))
471+
/// ]))
472+
/// .into_keyed()
473+
/// .flat_map_unordered(q!(|x| x))
474+
/// # .entries()
475+
/// # }, |mut stream| async move {
476+
/// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
477+
/// # let mut results = Vec::new();
478+
/// # for _ in 0..4 {
479+
/// # results.push(stream.next().await.unwrap());
480+
/// # }
481+
/// # results.sort();
482+
/// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
483+
/// # }));
484+
/// ```
485+
pub fn flat_map_unordered<U, I, F>(
486+
self,
487+
f: impl IntoQuotedMut<'a, F, L> + Copy,
488+
) -> KeyedStream<K, U, L, B, NoOrder, R>
489+
where
490+
I: IntoIterator<Item = U>,
491+
F: Fn(V) -> I + 'a,
492+
K: Clone,
493+
{
494+
let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
495+
KeyedStream {
496+
underlying: self.underlying.flat_map_unordered(q!({
497+
let orig = f;
498+
move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
499+
})),
500+
_phantom_order: Default::default(),
501+
}
502+
}
503+
504+
/// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
505+
/// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
506+
/// items in a **deterministic** order.
507+
///
508+
/// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
509+
/// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
510+
///
511+
/// # Example
512+
/// ```rust
513+
/// # use hydro_lang::prelude::*;
514+
/// # use futures::StreamExt;
515+
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
516+
/// process
517+
/// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
518+
/// .into_keyed()
519+
/// .flatten_ordered()
520+
/// # .entries()
521+
/// # }, |mut stream| async move {
522+
/// // { 1: [2, 3, 4], 2: [5, 6] }
523+
/// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
524+
/// # assert_eq!(stream.next().await.unwrap(), w);
525+
/// # }
526+
/// # }));
527+
/// ```
528+
pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
529+
where
530+
V: IntoIterator<Item = U>,
531+
K: Clone,
532+
{
533+
self.flat_map_ordered(q!(|d| d))
534+
}
535+
536+
/// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
537+
/// for the value type `V` to produce items in any order.
538+
///
539+
/// # Example
540+
/// ```rust
541+
/// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
542+
/// # use futures::StreamExt;
543+
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
544+
/// process
545+
/// .source_iter(q!(vec![
546+
/// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
547+
/// (2, std::collections::HashSet::from_iter(vec![4, 5]))
548+
/// ]))
549+
/// .into_keyed()
550+
/// .flatten_unordered()
551+
/// # .entries()
552+
/// # }, |mut stream| async move {
553+
/// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
554+
/// # let mut results = Vec::new();
555+
/// # for _ in 0..4 {
556+
/// # results.push(stream.next().await.unwrap());
557+
/// # }
558+
/// # results.sort();
559+
/// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
560+
/// # }));
561+
/// ```
562+
pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
563+
where
564+
V: IntoIterator<Item = U>,
565+
K: Clone,
566+
{
567+
self.flat_map_unordered(q!(|d| d))
568+
}
569+
415570
/// An operator which allows you to "inspect" each element of a stream without
416571
/// modifying it. The closure `f` is called on a reference to each value. This is
417572
/// mainly useful for debugging, and should not be used to generate side-effects.

0 commit comments

Comments
 (0)