From 1ed045fc26025a5872ee2f862b38054da2c7fe9a Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Mon, 22 Sep 2025 12:52:40 -0700 Subject: [PATCH] feat(hydro_lang): add API for safely broadcasting a stream across several keys --- hydro_lang/src/live_collections/stream/mod.rs | 49 +++++++++++++++++++ .../src/live_collections/stream/networking.rs | 23 +++------ 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/hydro_lang/src/live_collections/stream/mod.rs b/hydro_lang/src/live_collections/stream/mod.rs index 9053e4bfc4fc..997ed399f53f 100644 --- a/hydro_lang/src/live_collections/stream/mod.rs +++ b/hydro_lang/src/live_collections/stream/mod.rs @@ -12,6 +12,7 @@ use syn::parse_quote; use tokio::time::Instant; use super::boundedness::{Bounded, Boundedness, Unbounded}; +use super::keyed_singleton::KeyedSingleton; use super::keyed_stream::KeyedStream; use super::optional::Optional; use super::singleton::Singleton; @@ -1783,6 +1784,54 @@ where }, ) } + + /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in + /// `self` used as the values for *each* key. + /// + /// This is helpful when "broadcasting" a set of values so that all the keys have the same + /// values. For example, it can be used to send the same set of elements to several cluster + /// members, if the membership information is available as a [`KeyedSingleton`]. + /// + /// # Example + /// ```rust + /// # use hydro_lang::prelude::*; + /// # use futures::StreamExt; + /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| { + /// # let tick = process.tick(); + /// let keyed_singleton = // { 1: (), 2: () } + /// # process + /// # .source_iter(q!(vec![(1, ()), (2, ())])) + /// # .into_keyed() + /// # .batch(&tick, nondet!(/** test */)) + /// # .first(); + /// let stream = // [ "a", "b" ] + /// # process + /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()])) + /// # .batch(&tick, nondet!(/** test */)); + /// stream.repeat_with_keys(keyed_singleton) + /// # .entries().all_ticks() + /// # }, |mut stream| async move { + /// // { 1: ["a", "b" ], 2: ["a", "b"] } + /// # let mut results = Vec::new(); + /// # for _ in 0..4 { + /// # results.push(stream.next().await.unwrap()); + /// # } + /// # results.sort(); + /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]); + /// # })); + /// ``` + pub fn repeat_with_keys( + self, + keys: KeyedSingleton, + ) -> KeyedStream + where + K: Clone, + T: Clone, + { + keys.keys().weaken_retries().cross_product_nested_loop(self).into_keyed().assume_ordering( + nondet!(/** keyed stream does not depend on ordering of keys, cross_product_nested_loop preserves order of values */) + ) + } } impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R> diff --git a/hydro_lang/src/live_collections/stream/networking.rs b/hydro_lang/src/live_collections/stream/networking.rs index 76846f709c32..60d720d0581a 100644 --- a/hydro_lang/src/live_collections/stream/networking.rs +++ b/hydro_lang/src/live_collections/stream/networking.rs @@ -18,7 +18,7 @@ use crate::location::dynamic::DynLocation; use crate::location::external_process::ExternalBincodeStream; use crate::location::tick::NoAtomic; use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process}; -use crate::nondet::{NonDet, nondet}; +use crate::nondet::NonDet; use crate::staging_util::get_this_crate; // same as the one in `hydro_std`, but internal use only @@ -185,15 +185,10 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream { let ids = track_membership(self.location.source_cluster_members(other)); let join_tick = self.location.tick(); - let current_members = ids.snapshot(&join_tick, nondet_membership).keys(); + let current_members = ids.snapshot(&join_tick, nondet_membership); - current_members - .weaker_retries() - .assume_ordering::( - nondet!(/** we send to each member independently, order does not matter */), - ) - .cross_product_nested_loop(self.batch(&join_tick, nondet_membership)) - .weaken_ordering::() + self.batch(&join_tick, nondet_membership) + .repeat_with_keys(current_members) .all_ticks() .demux_bincode(other) } @@ -511,14 +506,10 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream { let ids = track_membership(self.location.source_cluster_members(other)); let join_tick = self.location.tick(); - let current_members = ids.snapshot(&join_tick, nondet_membership).keys(); + let current_members = ids.snapshot(&join_tick, nondet_membership); - current_members - .weaker_retries() - .assume_ordering::( - nondet!(/** we send to each member independently, order does not matter */), - ) - .cross_product_nested_loop(self.batch(&join_tick, nondet_membership)) + self.batch(&join_tick, nondet_membership) + .repeat_with_keys(current_members) .all_ticks() .demux_bincode(other) }