Skip to content

Commit badef50

Browse files
committed
feat(hydro_lang): add API for safely broadcasting a stream across several keys
1 parent d5430c3 commit badef50

File tree

2 files changed

+53
-14
lines changed

2 files changed

+53
-14
lines changed

hydro_lang/src/live_collections/keyed_singleton.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,54 @@ where
759759
}
760760

761761
impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
762+
/// Creates a [`KeyedStream`] with the same set of keys as `self`, but with the `other` stream
763+
/// used as the values for *each* key.
764+
///
765+
/// This is helpful when "broadcasting" a set of values so that all the keys have the same
766+
/// values. For example, it can be used to send the same set of elements to several cluster
767+
/// members, if the membership information is available as a [`KeyedSingleton`].
768+
///
769+
/// # Example
770+
/// ```rust
771+
/// # use hydro_lang::prelude::*;
772+
/// # use futures::StreamExt;
773+
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
774+
/// # let tick = process.tick();
775+
/// let keyed_singleton = // { 1: (), 2: () }
776+
/// # process
777+
/// # .source_iter(q!(vec![(1, ()), (2, ())]))
778+
/// # .into_keyed()
779+
/// # .batch(&tick, nondet!(/** test */))
780+
/// # .first();
781+
/// let stream = // [ "a", "b" ]
782+
/// # process
783+
/// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
784+
/// # .batch(&tick, nondet!(/** test */));
785+
/// keyed_singleton.flat_map_identical(stream)
786+
/// # .entries().all_ticks()
787+
/// # }, |mut stream| async move {
788+
/// // { 1: ["a", "b" ], 2: ["a", "b"] }
789+
/// # let mut results = Vec::new();
790+
/// # for _ in 0..4 {
791+
/// # results.push(stream.next().await.unwrap());
792+
/// # }
793+
/// # results.sort();
794+
/// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
795+
/// # }));
796+
/// ```
797+
pub fn flat_map_identical<T2, O2: Ordering, R: Retries>(
798+
self,
799+
other: Stream<T2, Tick<L>, Bounded, O2, R>,
800+
) -> KeyedStream<K, T2, Tick<L>, Bounded, O2, R>
801+
where
802+
K: Clone,
803+
T2: Clone,
804+
{
805+
self.keys().weaken_retries().cross_product_nested_loop(other).into_keyed().assume_ordering(
806+
nondet!(/** keyed stream does not depend on ordering of keys, cross_product_nested_loop preserves order of values */)
807+
)
808+
}
809+
762810
/// Asynchronously yields this keyed singleton outside the tick, which will
763811
/// be asynchronously updated with the latest set of entries inside the tick.
764812
///

hydro_lang/src/live_collections/stream/networking.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::location::dynamic::DynLocation;
1818
use crate::location::external_process::ExternalBincodeStream;
1919
use crate::location::tick::NoAtomic;
2020
use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21-
use crate::nondet::{NonDet, nondet};
21+
use crate::nondet::NonDet;
2222
use crate::staging_util::get_this_crate;
2323

2424
// same as the one in `hydro_std`, but internal use only
@@ -185,15 +185,10 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>
185185
{
186186
let ids = track_membership(self.location.source_cluster_members(other));
187187
let join_tick = self.location.tick();
188-
let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
188+
let current_members = ids.snapshot(&join_tick, nondet_membership);
189189

190190
current_members
191-
.weaker_retries()
192-
.assume_ordering::<TotalOrder>(
193-
nondet!(/** we send to each member independently, order does not matter */),
194-
)
195-
.cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
196-
.weaken_ordering::<O>()
191+
.flat_map_identical(self.batch(&join_tick, nondet_membership))
197192
.all_ticks()
198193
.demux_bincode(other)
199194
}
@@ -511,14 +506,10 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>
511506
{
512507
let ids = track_membership(self.location.source_cluster_members(other));
513508
let join_tick = self.location.tick();
514-
let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
509+
let current_members = ids.snapshot(&join_tick, nondet_membership);
515510

516511
current_members
517-
.weaker_retries()
518-
.assume_ordering::<TotalOrder>(
519-
nondet!(/** we send to each member independently, order does not matter */),
520-
)
521-
.cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
512+
.flat_map_identical(self.batch(&join_tick, nondet_membership))
522513
.all_ticks()
523514
.demux_bincode(other)
524515
}

0 commit comments

Comments
 (0)