Skip to content

Commit bdbabf8

Browse files
committed
feat(hydro_lang): add API for safely broadcasting a stream across several keys
1 parent 584ebf1 commit bdbabf8

File tree

2 files changed

+56
-16
lines changed

2 files changed

+56
-16
lines changed

hydro_lang/src/live_collections/stream/mod.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use super::boundedness::{Bounded, Boundedness, Unbounded};
1515
use super::keyed_stream::KeyedStream;
1616
use super::optional::Optional;
1717
use super::singleton::Singleton;
18+
use super::keyed_singleton::KeyedSingleton;
1819
use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
1920
#[cfg(stageleft_runtime)]
2021
use crate::forward_handle::{CycleCollection, ReceiverComplete};
@@ -1783,6 +1784,54 @@ where
17831784
},
17841785
)
17851786
}
1787+
1788+
/// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1789+
/// `self` used as the values for *each* key.
1790+
///
1791+
/// This is helpful when "broadcasting" a set of values so that all the keys have the same
1792+
/// values. For example, it can be used to send the same set of elements to several cluster
1793+
/// members, if the membership information is available as a [`KeyedSingleton`].
1794+
///
1795+
/// # Example
1796+
/// ```rust
1797+
/// # use hydro_lang::prelude::*;
1798+
/// # use futures::StreamExt;
1799+
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1800+
/// # let tick = process.tick();
1801+
/// let keyed_singleton = // { 1: (), 2: () }
1802+
/// # process
1803+
/// # .source_iter(q!(vec![(1, ()), (2, ())]))
1804+
/// # .into_keyed()
1805+
/// # .batch(&tick, nondet!(/** test */))
1806+
/// # .first();
1807+
/// let stream = // [ "a", "b" ]
1808+
/// # process
1809+
/// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1810+
/// # .batch(&tick, nondet!(/** test */));
1811+
/// stream.repeat_with_keys(keyed_singleton)
1812+
/// # .entries().all_ticks()
1813+
/// # }, |mut stream| async move {
1814+
/// // { 1: ["a", "b" ], 2: ["a", "b"] }
1815+
/// # let mut results = Vec::new();
1816+
/// # for _ in 0..4 {
1817+
/// # results.push(stream.next().await.unwrap());
1818+
/// # }
1819+
/// # results.sort();
1820+
/// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1821+
/// # }));
1822+
/// ```
1823+
pub fn repeat_with_keys<K, V2>(
1824+
self,
1825+
keys: KeyedSingleton<K, V2, L, Bounded>,
1826+
) -> KeyedStream<K, T, L, Bounded, O, R>
1827+
where
1828+
K: Clone,
1829+
T: Clone,
1830+
{
1831+
keys.keys().weaken_retries().cross_product_nested_loop(self).into_keyed().assume_ordering(
1832+
nondet!(/** keyed stream does not depend on ordering of keys, cross_product_nested_loop preserves order of values */)
1833+
)
1834+
}
17861835
}
17871836

17881837
impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>

hydro_lang/src/live_collections/stream/networking.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::location::dynamic::DynLocation;
1818
use crate::location::external_process::ExternalBincodeStream;
1919
use crate::location::tick::NoAtomic;
2020
use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21-
use crate::nondet::{NonDet, nondet};
21+
use crate::nondet::NonDet;
2222
use crate::staging_util::get_this_crate;
2323

2424
// same as the one in `hydro_std`, but internal use only
@@ -185,15 +185,10 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>
185185
{
186186
let ids = track_membership(self.location.source_cluster_members(other));
187187
let join_tick = self.location.tick();
188-
let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
188+
let current_members = ids.snapshot(&join_tick, nondet_membership);
189189

190-
current_members
191-
.weaker_retries()
192-
.assume_ordering::<TotalOrder>(
193-
nondet!(/** we send to each member independently, order does not matter */),
194-
)
195-
.cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
196-
.weaken_ordering::<O>()
190+
self.batch(&join_tick, nondet_membership)
191+
.repeat_with_keys(current_members)
197192
.all_ticks()
198193
.demux_bincode(other)
199194
}
@@ -511,14 +506,10 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>
511506
{
512507
let ids = track_membership(self.location.source_cluster_members(other));
513508
let join_tick = self.location.tick();
514-
let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
509+
let current_members = ids.snapshot(&join_tick, nondet_membership);
515510

516-
current_members
517-
.weaker_retries()
518-
.assume_ordering::<TotalOrder>(
519-
nondet!(/** we send to each member independently, order does not matter */),
520-
)
521-
.cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
511+
self.batch(&join_tick, nondet_membership)
512+
.repeat_with_keys(current_members)
522513
.all_ticks()
523514
.demux_bincode(other)
524515
}

0 commit comments

Comments
 (0)