Skip to content

Commit 845c2a5

Browse files
committed
docs: improve multiset_delta() docs
1 parent 7e65a08 commit 845c2a5

File tree

3 files changed

+64
-26
lines changed

3 files changed

+64
-26
lines changed

hydroflow/tests/surface_multiset_delta.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use multiplatform_test::multiplatform_test;
44

55
#[multiplatform_test]
66
pub fn test_multiset_delta() {
7-
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<u32>();
8-
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<u32>();
7+
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<char>();
8+
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<char>();
99

1010
let mut flow = hydroflow_syntax! {
1111
source_stream(input_recv)
@@ -14,19 +14,31 @@ pub fn test_multiset_delta() {
1414
};
1515
assert_graphvis_snapshots!(flow);
1616

17-
input_send.send(3).unwrap();
18-
input_send.send(4).unwrap();
19-
input_send.send(3).unwrap();
17+
input_send.send('a').unwrap();
18+
input_send.send('b').unwrap();
19+
input_send.send('a').unwrap();
2020
flow.run_tick();
21-
assert_eq!(&[3, 4, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
21+
// 'a', 'b', 'a'
22+
assert_eq!(&['a', 'b', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));
2223

23-
input_send.send(3).unwrap();
24-
input_send.send(5).unwrap();
25-
input_send.send(3).unwrap();
26-
input_send.send(3).unwrap();
24+
input_send.send('a').unwrap();
25+
input_send.send('c').unwrap();
26+
input_send.send('a').unwrap();
27+
input_send.send('a').unwrap();
28+
flow.run_tick();
29+
// 'c', 'a'
30+
// First two 'a's are removed due to previous tick.
31+
assert_eq!(&['c', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));
32+
33+
input_send.send('b').unwrap();
34+
input_send.send('c').unwrap();
35+
input_send.send('a').unwrap();
36+
input_send.send('a').unwrap();
37+
input_send.send('a').unwrap();
38+
input_send.send('a').unwrap();
2739
flow.run_tick();
28-
// First two "3"s are removed due to previous tick.
29-
assert_eq!(&[5, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
40+
// 3 'a's and the 'c' are removed due to previous tick.
41+
assert_eq!(&['b', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));
3042
}
3143

3244
#[multiplatform_test]

hydroflow_lang/src/graph/ops/multiset_delta.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,54 @@ use super::{
55
WriteContextArgs, RANGE_0, RANGE_1,
66
};
77

8-
// TODO(mingwei): more doc
9-
/// Multiset delta from the previous tick.
8+
/// The multiset inverse of [`persist()`](#persist).
9+
///
10+
/// > 1 input stream of `T`, 1 output stream of `T`, where `T: Eq + Hash`
11+
///
12+
/// For set semantics, [`unique()`](#unique) can be thought of as a "delta" operator, the inverse
13+
/// of [`persist()`](#persist). In `persist`, new items come in, and all items are repeatedly
14+
/// released out. Conversely, `unique` take repeated items in, and only releases the new ones out.
15+
///
16+
/// This operator does a similar inversion but for multiset semantics, with some caveats. When it
17+
/// receives duplicate items, instead of ignoring them, it "subtracts" them from the items received
18+
/// in the previous tick: i.e. if we received `k` copies of an item in the previous tick, and we
19+
/// receive `l > k` copies in the current tick, we output `l - k` copies of the item.
20+
/// However unlike `unique`, this count is only maintained for the previous tick, not over all time.
21+
///
22+
/// In the example below, in the second tick two 'a's are removed because two 'a's were received in
23+
/// the previous tick. The third 'a' is released though.
1024
///
1125
/// ```rustbook
12-
/// let (input_send, input_recv) = hydroflow::util::unbounded_channel::<u32>();
26+
/// let (input_send, input_recv) = hydroflow::util::unbounded_channel::<char>();
1327
/// let mut flow = hydroflow::hydroflow_syntax! {
1428
/// source_stream(input_recv)
1529
/// -> multiset_delta()
1630
/// -> for_each(|n| println!("{}", n));
1731
/// };
1832
///
19-
/// input_send.send(3).unwrap();
20-
/// input_send.send(4).unwrap();
21-
/// input_send.send(3).unwrap();
33+
/// input_send.send('a').unwrap();
34+
/// input_send.send('b').unwrap();
35+
/// input_send.send('a').unwrap();
36+
/// flow.run_tick();
37+
/// // 'a', 'b', 'a'
38+
///
39+
/// input_send.send('a').unwrap();
40+
/// input_send.send('c').unwrap();
41+
/// input_send.send('a').unwrap();
42+
/// input_send.send('a').unwrap();
2243
/// flow.run_tick();
23-
/// // 3, 4,
44+
/// // 'c', 'a'
45+
/// // First two 'a's are removed due to previous tick.
2446
///
25-
/// input_send.send(3).unwrap();
26-
/// input_send.send(5).unwrap();
27-
/// input_send.send(3).unwrap();
28-
/// input_send.send(3).unwrap();
47+
/// input_send.send('b').unwrap();
48+
/// input_send.send('c').unwrap();
49+
/// input_send.send('a').unwrap();
50+
/// input_send.send('a').unwrap();
51+
/// input_send.send('a').unwrap();
52+
/// input_send.send('a').unwrap();
2953
/// flow.run_tick();
30-
/// // 5, 3
31-
/// // First two "3"s are removed due to previous tick.
54+
/// // 'b', 'a'
55+
/// // 3 'a's and the 'c' are removed due to previous tick.
3256
/// ```
3357
pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints {
3458
name: "multiset_delta",

hydroflow_lang/src/graph/ops/persist.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use super::{
77
use crate::diagnostic::{Diagnostic, Level};
88
use crate::graph::{FlowProps, LatticeFlowType};
99

10-
/// Stores each item as it passes through, and replays all item every tick.
10+
/// Stores each item as it passes through, and replays all items every tick.
11+
///
12+
/// > 1 input stream, 1 output stream
1113
///
1214
/// ```hydroflow
1315
/// // Normally `source_iter(...)` only emits once, but `persist()` will replay the `"hello"`

0 commit comments

Comments
 (0)