Skip to content

Commit f9f5317

Browse files
authored
feat(hydro_lang)!: start adding APIs for various forms of keyed streams "joins", add KVS counter server (#1996)
Also fixes incorrect output metadata for `KeyedStream::fold`, and removes `KeyedSingleton::all_ticks` for now until we a) actually need it and b) can think carefully about its semantics.
1 parent d057072 commit f9f5317

12 files changed

+472
-56
lines changed

hydro_deploy/hydro_deploy_integration/src/multi_connection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,10 @@ impl<
249249
conn.is_some()
250250
});
251251

252+
if me.poll_cursor == me.active_connections.len() {
253+
me.poll_cursor = 0;
254+
}
255+
252256
out
253257
}
254258
}

hydro_lang/src/keyed_singleton.rs

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::hash::Hash;
2+
13
use stageleft::{IntoQuotedMut, QuotedWithContext, q};
24

35
use crate::cycle::{CycleCollection, CycleComplete, ForwardRefMarker};
@@ -6,7 +8,10 @@ use crate::location::{LocationId, NoTick};
68
use crate::manual_expr::ManualExpr;
79
use crate::stream::ExactlyOnce;
810
use crate::unsafety::NonDet;
9-
use crate::{Atomic, Bounded, Location, NoOrder, Singleton, Stream, Tick, Unbounded};
11+
use crate::{
12+
Atomic, Bounded, KeyedStream, Location, NoOrder, Optional, Singleton, Stream, Tick, TotalOrder,
13+
nondet,
14+
};
1015

1116
pub struct KeyedSingleton<K, V, Loc, Bound> {
1217
pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, ExactlyOnce>,
@@ -104,6 +109,77 @@ impl<'a, K, V, L: Location<'a>, B> KeyedSingleton<K, V, L, B> {
104109
}
105110
}
106111

112+
impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
113+
/// Gets the value associated with a specific key from the keyed singleton.
114+
///
115+
/// # Example
116+
/// ```rust
117+
/// # use hydro_lang::*;
118+
/// # use futures::StreamExt;
119+
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
120+
/// let tick = process.tick();
121+
/// let keyed_data = process
122+
/// .source_iter(q!(vec![(1, 2), (2, 3)]))
123+
/// .into_keyed()
124+
/// .batch(&tick, nondet!(/** test */))
125+
/// .fold(q!(|| 0), q!(|acc, x| *acc = x));
126+
/// let key = tick.singleton(q!(1));
127+
/// keyed_data.get(key).all_ticks()
128+
/// # }, |mut stream| async move {
129+
/// // 2
130+
/// # assert_eq!(stream.next().await.unwrap(), 2);
131+
/// # }));
132+
/// ```
133+
pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
134+
self.entries()
135+
.join(key.into_stream().map(q!(|k| (k, ()))))
136+
.map(q!(|(_, (v, _))| v))
137+
.assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
138+
.first()
139+
}
140+
141+
/// Given a keyed stream of lookup requests, where the key is the lookup and the value
142+
/// is some additional metadata, emits a keyed stream of lookup results where the key
143+
/// is the same as before, but the value is a tuple of the lookup result and the metadata
144+
/// of the request.
145+
///
146+
/// # Example
147+
/// ```rust
148+
/// # use hydro_lang::*;
149+
/// # use futures::StreamExt;
150+
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
151+
/// let tick = process.tick();
152+
/// let keyed_data = process
153+
/// .source_iter(q!(vec![(1, 10), (2, 20)]))
154+
/// .into_keyed()
155+
/// .batch(&tick, nondet!(/** test */))
156+
/// .fold(q!(|| 0), q!(|acc, x| *acc = x));
157+
/// let other_data = process
158+
/// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
159+
/// .into_keyed()
160+
/// .batch(&tick, nondet!(/** test */));
161+
/// keyed_data.get_many(other_data).entries().all_ticks()
162+
/// # }, |mut stream| async move {
163+
/// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
164+
/// # let mut results = vec![];
165+
/// # for _ in 0..3 {
166+
/// # results.push(stream.next().await.unwrap());
167+
/// # }
168+
/// # results.sort();
169+
/// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
170+
/// # }));
171+
/// ```
172+
pub fn get_many<O2, R2, V2>(
173+
self,
174+
with: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
175+
) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
176+
self.entries()
177+
.weaker_retries()
178+
.join(with.entries())
179+
.into_keyed()
180+
}
181+
}
182+
107183
impl<'a, K, V, L, B> KeyedSingleton<K, V, L, B>
108184
where
109185
L: Location<'a> + NoTick + NoAtomic,
@@ -151,14 +227,3 @@ where
151227
}
152228
}
153229
}
154-
155-
impl<'a, K, V, L> KeyedSingleton<K, V, Tick<L>, Bounded>
156-
where
157-
L: Location<'a>,
158-
{
159-
pub fn all_ticks(self) -> KeyedSingleton<K, V, L, Unbounded> {
160-
KeyedSingleton {
161-
underlying: self.underlying.all_ticks(),
162-
}
163-
}
164-
}

hydro_lang/src/keyed_stream.rs

Lines changed: 125 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::keyed_singleton::KeyedSingleton;
99
use crate::location::tick::NoAtomic;
1010
use crate::location::{LocationId, NoTick, check_matching_location};
1111
use crate::manual_expr::ManualExpr;
12-
use crate::stream::ExactlyOnce;
12+
use crate::stream::{ExactlyOnce, MinRetries};
1313
use crate::unsafety::NonDet;
1414
use crate::*;
1515

@@ -188,10 +188,54 @@ impl<'a, K, V, L: Location<'a>, B, O, R> KeyedStream<K, V, L, B, O, R> {
188188
}
189189
}
190190

191+
/// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
192+
/// re-grouped even they are tuples; instead they will be grouped under the original key.
193+
///
194+
/// If you do not want to modify the stream and instead only want to view
195+
/// each item use [`KeyedStream::inspect_with_key`] instead.
196+
///
197+
/// # Example
198+
/// ```rust
199+
/// # use hydro_lang::*;
200+
/// # use futures::StreamExt;
201+
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
202+
/// process
203+
/// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
204+
/// .into_keyed()
205+
/// .map_with_key(q!(|(k, v)| k + v))
206+
/// # .entries()
207+
/// # }, |mut stream| async move {
208+
/// // { 1: [3, 4], 2: [6] }
209+
/// # for w in vec![(1, 3), (1, 4), (2, 6)] {
210+
/// # assert_eq!(stream.next().await.unwrap(), w);
211+
/// # }
212+
/// # }));
213+
/// ```
214+
pub fn map_with_key<U, F>(
215+
self,
216+
f: impl IntoQuotedMut<'a, F, L> + Copy,
217+
) -> KeyedStream<K, U, L, B, O, R>
218+
where
219+
F: Fn((K, V)) -> U + 'a,
220+
K: Clone,
221+
{
222+
let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
223+
KeyedStream {
224+
underlying: self.underlying.map(q!({
225+
let orig = f;
226+
move |(k, v)| {
227+
let out = orig((k.clone(), v));
228+
(k, out)
229+
}
230+
})),
231+
_phantom_order: Default::default(),
232+
}
233+
}
234+
191235
/// Creates a stream containing only the elements of each group stream that satisfy a predicate
192236
/// `f`, preserving the order of the elements within the group.
193237
///
194-
/// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
238+
/// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
195239
/// not modify or take ownership of the values. If you need to modify the values while filtering
196240
/// use [`KeyedStream::filter_map`] instead.
197241
///
@@ -226,11 +270,12 @@ impl<'a, K, V, L: Location<'a>, B, O, R> KeyedStream<K, V, L, B, O, R> {
226270
}
227271
}
228272

229-
/// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
230-
/// re-grouped even they are tuples; instead they will be grouped under the original key.
273+
/// Creates a stream containing only the elements of each group stream that satisfy a predicate
274+
/// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
231275
///
232-
/// If you do not want to modify the stream and instead only want to view
233-
/// each item use [`KeyedStream::inspect_with_key`] instead.
276+
/// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
277+
/// not modify or take ownership of the values. If you need to modify the values while filtering
278+
/// use [`KeyedStream::filter_map_with_key`] instead.
234279
///
235280
/// # Example
236281
/// ```rust
@@ -240,32 +285,24 @@ impl<'a, K, V, L: Location<'a>, B, O, R> KeyedStream<K, V, L, B, O, R> {
240285
/// process
241286
/// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
242287
/// .into_keyed()
243-
/// .map_with_key(q!(|(k, v)| k + v))
288+
/// .filter_with_key(q!(|&(k, v)| v - k == 2))
244289
/// # .entries()
245290
/// # }, |mut stream| async move {
246-
/// // { 1: [3, 4], 2: [6] }
247-
/// # for w in vec![(1, 3), (1, 4), (2, 6)] {
291+
/// // { 1: [3], 2: [4] }
292+
/// # for w in vec![(1, 3), (2, 4)] {
248293
/// # assert_eq!(stream.next().await.unwrap(), w);
249294
/// # }
250295
/// # }));
251296
/// ```
252-
pub fn map_with_key<U, F>(
297+
pub fn filter_with_key<F>(
253298
self,
254299
f: impl IntoQuotedMut<'a, F, L> + Copy,
255-
) -> KeyedStream<K, U, L, B, O, R>
300+
) -> KeyedStream<K, V, L, B, O, R>
256301
where
257-
F: Fn((K, V)) -> U + 'a,
258-
K: Clone,
302+
F: Fn(&(K, V)) -> bool + 'a,
259303
{
260-
let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
261304
KeyedStream {
262-
underlying: self.underlying.map(q!({
263-
let orig = f;
264-
move |(k, v)| {
265-
let out = orig((k.clone(), v));
266-
(k, out)
267-
}
268-
})),
305+
underlying: self.underlying.filter(f),
269306
_phantom_order: Default::default(),
270307
}
271308
}
@@ -418,6 +455,41 @@ impl<'a, K, V, L: Location<'a>, B, O, R> KeyedStream<K, V, L, B, O, R> {
418455
}
419456
}
420457

458+
impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O, R> KeyedStream<K, V, L, Unbounded, O, R> {
459+
/// Produces a new keyed stream that "merges" the inputs by interleaving the elements
460+
/// of any overlapping groups. The result has [`NoOrder`] on each group because the
461+
/// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
462+
/// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
463+
///
464+
/// Currently, both input streams must be [`Unbounded`].
465+
///
466+
/// # Example
467+
/// ```rust
468+
/// # use hydro_lang::*;
469+
/// # use futures::StreamExt;
470+
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
471+
/// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
472+
/// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
473+
/// numbers1.interleave(numbers2)
474+
/// # .entries()
475+
/// # }, |mut stream| async move {
476+
/// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
477+
/// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
478+
/// # assert_eq!(stream.next().await.unwrap(), w);
479+
/// # }
480+
/// # }));
481+
/// ```
482+
pub fn interleave<O2, R2: MinRetries<R>>(
483+
self,
484+
other: KeyedStream<K, V, L, Unbounded, O2, R2>,
485+
) -> KeyedStream<K, V, L, Unbounded, NoOrder, R::Min>
486+
where
487+
R: MinRetries<R2, Min = R2::Min>,
488+
{
489+
self.entries().interleave(other.entries()).into_keyed()
490+
}
491+
}
492+
421493
impl<'a, K, V, L, B> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
422494
where
423495
K: Eq + Hash,
@@ -571,7 +643,7 @@ where
571643
init,
572644
acc: comb,
573645
input: Box::new(self.underlying.ir_node.into_inner()),
574-
metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
646+
metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
575647
};
576648

577649
KeyedSingleton {
@@ -1019,6 +1091,37 @@ where
10191091
.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
10201092
.reduce_watermark(other, comb)
10211093
}
1094+
1095+
/// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1096+
/// whose keys are not in the bounded stream.
1097+
///
1098+
/// # Example
1099+
/// ```rust
1100+
/// # use hydro_lang::*;
1101+
/// # use futures::StreamExt;
1102+
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1103+
/// let tick = process.tick();
1104+
/// let keyed_stream = process
1105+
/// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1106+
/// .batch(&tick, nondet!(/** test */))
1107+
/// .into_keyed();
1108+
/// let keys_to_remove = process
1109+
/// .source_iter(q!(vec![1, 2]))
1110+
/// .batch(&tick, nondet!(/** test */));
1111+
/// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1112+
/// # .entries()
1113+
/// # }, |mut stream| async move {
1114+
/// // { 3: ['c'], 4: ['d'] }
1115+
/// # for w in vec![(3, 'c'), (4, 'd')] {
1116+
/// # assert_eq!(stream.next().await.unwrap(), w);
1117+
/// # }
1118+
/// # }));
1119+
pub fn filter_key_not_in<O2, R2>(self, other: Stream<K, L, Bounded, O2, R2>) -> Self {
1120+
KeyedStream {
1121+
underlying: self.entries().anti_join(other),
1122+
_phantom_order: Default::default(),
1123+
}
1124+
}
10221125
}
10231126

10241127
impl<'a, K, V, L, B, O, R> KeyedStream<K, V, L, B, O, R>

hydro_lang/src/rewrites/snapshots/hydro_lang__rewrites__properties__tests__property_optimized.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ expression: built.ir()
4040
),
4141
),
4242
output_type: Some(
43-
(std :: string :: String , ()),
43+
(std :: string :: String , i32),
4444
),
4545
},
4646
},

hydro_lang/src/stream.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,19 @@ where
782782
}
783783
}
784784

785+
impl<'a, T, L, B, O> Stream<T, L, B, O, ExactlyOnce>
786+
where
787+
L: Location<'a>,
788+
{
789+
/// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
790+
/// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
791+
pub fn weaker_retries<R2>(self) -> Stream<T, L, B, O, R2> {
792+
self.assume_retries(
793+
nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
794+
)
795+
}
796+
}
797+
785798
impl<'a, T, L, B, O, R> Stream<&T, L, B, O, R>
786799
where
787800
L: Location<'a>,
@@ -1636,7 +1649,7 @@ where
16361649
/// # assert_eq!(stream.next().await.unwrap(), w);
16371650
/// # }
16381651
/// # }));
1639-
pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2, R>) -> Stream<(K, V1), L, B, O, R>
1652+
pub fn anti_join<O2, R2>(self, n: Stream<K, L, Bounded, O2, R2>) -> Stream<(K, V1), L, B, O, R>
16401653
where
16411654
K: Eq + Hash,
16421655
{

0 commit comments

Comments
 (0)