Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/docs/hydro/learn/quickstart/partitioned-counter.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ let sharded_increment_requests = increment_requests
key.hash(&mut hasher);
MemberId::from_raw_id(hasher.finish() as u32 % 5)
}))
.demux(shard_servers, TCP.bincode());
.demux(shard_servers, TCP.fail_stop().bincode());
`)}</CodeBlock>

The `prefix_key` method adds a new key to the front of the keyed stream. Here, you compute a `MemberId` by hashing the key name and taking modulo 5 (assuming 5 shards). The result is a stream keyed by `(MemberId, client_id, key_name)`.
Expand All @@ -109,15 +109,15 @@ let sharded_increment_requests = increment_requests
key.hash(&mut hasher);
MemberId::from_raw_id(hasher.finish() as u32 % 5)
}))
.demux(shard_servers, TCP.bincode());
.demux(shard_servers, TCP.fail_stop().bincode());
let sharded_get_requests = get_requests
.prefix_key(q!(|(_client, key)| {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
MemberId::from_raw_id(hasher.finish() as u32 % 5)
}))
.demux(shard_servers, TCP.bincode());
.demux(shard_servers, TCP.fail_stop().bincode());
`), [7, 15])}</CodeBlock>

After `demux`, the stream is now located at the cluster. The stream returned by a demux operator preserves the remaining keys after the `MemberId` is consumed for routing. In this case, we are left with a `KeyedStream<u32, String, Cluster<'a, CounterShard>>` - a stream keyed by client ID and key name, located on the cluster.
Expand All @@ -143,7 +143,7 @@ Finally, you need to send the responses back to the leader process:
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/partitioned_counter.rs">src/partitioned_counter.rs</Link>
} showLineNumbers={41}>{getLines(partitionedCounterSrc, 41, 44, `
let increment_ack = sharded_increment_ack
.send(leader, TCP.bincode())
.send(leader, TCP.fail_stop().bincode())
.drop_key_prefix();
`)}</CodeBlock>

Expand Down
12 changes: 6 additions & 6 deletions docs/docs/hydro/reference/live-collections/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ let numbers: Stream<_, Process<_>, Bounded> = process
.source_iter(q!(vec![1, 2, 3]))
.map(q!(|x| x + 1));
// 2, 3, 4
# numbers.send(&p_out, TCP.bincode())
# numbers.send(&p_out, TCP.fail_stop().bincode())
# }, |mut stream| async move {
# for w in 2..=4 {
# assert_eq!(stream.next().await, Some(w));
Expand All @@ -41,9 +41,9 @@ Streams also can be sent over the network to participate in distributed programs
let p1 = flow.process::<()>();
let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
let p2 = flow.process::<()>();
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
// 1, 2, 3
# on_p2.send(&p_out, TCP.bincode())
# on_p2.send(&p_out, TCP.fail_stop().bincode())
# }, |mut stream| async move {
# for w in 1..=3 {
# assert_eq!(stream.next().await, Some(w));
Expand All @@ -67,7 +67,7 @@ let numbers: Stream<_, Cluster<_>, Bounded, TotalOrder> =
workers.source_iter(q!(vec![1, 2, 3]));
let process: Process<()> = flow.process::<()>();
let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> =
numbers.send(&process, TCP.bincode()).values();
numbers.send(&process, TCP.fail_stop().bincode()).values();
```

The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.
Expand All @@ -82,7 +82,7 @@ let process: Process<()> = flow.process::<()>();
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
.source_iter(q!(vec!["hello", "world"]))
.map(q!(|x| x.to_string()))
.send(&process, TCP.bincode())
.send(&process, TCP.fail_stop().bincode())
.values();
let words_concat = all_words
Expand All @@ -109,7 +109,7 @@ To perform an aggregation with an unordered stream, you must add a **property an
# let all_words: Stream<_, Process<_>, _, hydro_lang::live_collections::stream::NoOrder> = workers
# .source_iter(q!(vec!["hello", "world"]))
# .map(q!(|x| x.to_string()))
# .send(&process, TCP.bincode())
# .send(&process, TCP.fail_stop().bincode())
# .values();
let words_count = all_words
.fold(q!(|| 0), q!(|acc, _| *acc += 1, commutative = ManualProof(/* increment is commutative */)));
Expand Down
16 changes: 8 additions & 8 deletions docs/docs/hydro/reference/locations/clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ When sending a live collection from a cluster to another location, **each** memb
# tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send(&process, TCP.bincode()) // KeyedStream<MemberId<()>, i32, ...>
numbers.send(&process, TCP.fail_stop().bincode()) // KeyedStream<MemberId<()>, i32, ...>
# .entries()
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
Expand All @@ -58,7 +58,7 @@ If you do not need to know _which_ member of the cluster the data came from, you
# tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send(&process, TCP.bincode()).values()
numbers.send(&process, TCP.fail_stop().bincode()).values()
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
Expand All @@ -84,8 +84,8 @@ In the reverse direction, when sending a stream _to_ a cluster, the sender must
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
.demux(&workers, TCP.bincode());
on_worker.send(&p2, TCP.bincode())
.demux(&workers, TCP.fail_stop().bincode());
on_worker.send(&p2, TCP.fail_stop().bincode())
# .entries()
// if there are 4 members in the cluster, we should receive 4 elements
// { MemberId::<Worker>(0): [0], MemberId::<Worker>(1): [1], MemberId::<Worker>(2): [2], MemberId::<Worker>(3): [3] }
Expand All @@ -109,8 +109,8 @@ A common pattern in distributed systems is to broadcast data to all members of a
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
on_worker.send(&p2, TCP.bincode())
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
on_worker.send(&p2, TCP.fail_stop().bincode())
# .entries()
// if there are 4 members in the cluster, we should receive 4 elements
// { MemberId::<Worker>(0): [123], MemberId::<Worker>(1): [123], MemberId::<Worker>(2): [123, MemberId::<Worker>(3): [123] }
Expand All @@ -135,7 +135,7 @@ let workers: Cluster<()> = flow.cluster::<()>();
# // do nothing on each worker
# workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
let cluster_members = p1.source_cluster_members(&workers);
# cluster_members.entries().send(&p2, TCP.bincode())
# cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
// if there are 4 members in the cluster, we would see a join event for each
// { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
# }, |mut stream| async move {
Expand All @@ -162,7 +162,7 @@ let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
self_id_stream
.filter(q!(|x| x.get_raw_id() % 2 == 0))
.map(q!(|x| format!("hello from {}", x.get_raw_id())))
.send(&process, TCP.bincode())
.send(&process, TCP.fail_stop().bincode())
.values()
// if there are 4 members in the cluster, we should receive 2 elements
// "hello from 0", "hello from 2"
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/hydro/reference/locations/processes.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ Because a process represents a single machine, it is straightforward to send dat
# let leader: Process<Leader> = flow.process::<Leader>();
let numbers = leader.source_iter(q!(vec![1, 2, 3, 4]));
let process2: Process<()> = flow.process::<()>();
let on_p2: Stream<_, Process<()>, _> = numbers.send(&process2, TCP.bincode());
let on_p2: Stream<_, Process<()>, _> = numbers.send(&process2, TCP.fail_stop().bincode());
```
Loading
Loading