Skip to content

Commit a848e50

Browse files
committed
feat(hydro_lang)!: require explicit failure policy for TCP channels
Currently, we only offer `fail_stop` as a policy, which was the implicit default TCP guarantees thus far.
1 parent 724f0ca commit a848e50

File tree

30 files changed

+331
-189
lines changed

30 files changed

+331
-189
lines changed

docs/docs/hydro/learn/quickstart/partitioned-counter.mdx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ let sharded_increment_requests = increment_requests
8383
key.hash(&mut hasher);
8484
MemberId::from_raw_id(hasher.finish() as u32 % 5)
8585
}))
86-
.demux(shard_servers, TCP.bincode());
86+
.demux(shard_servers, TCP.fail_stop().bincode());
8787
`)}</CodeBlock>
8888

8989
The `prefix_key` method adds a new key to the front of the keyed stream. Here, you compute a `MemberId` by hashing the key name and taking modulo 5 (assuming 5 shards). The result is a stream keyed by `(MemberId, client_id, key_name)`.
@@ -109,15 +109,15 @@ let sharded_increment_requests = increment_requests
109109
key.hash(&mut hasher);
110110
MemberId::from_raw_id(hasher.finish() as u32 % 5)
111111
}))
112-
.demux(shard_servers, TCP.bincode());
112+
.demux(shard_servers, TCP.fail_stop().bincode());
113113
114114
let sharded_get_requests = get_requests
115115
.prefix_key(q!(|(_client, key)| {
116116
let mut hasher = DefaultHasher::new();
117117
key.hash(&mut hasher);
118118
MemberId::from_raw_id(hasher.finish() as u32 % 5)
119119
}))
120-
.demux(shard_servers, TCP.bincode());
120+
.demux(shard_servers, TCP.fail_stop().bincode());
121121
`), [7, 15])}</CodeBlock>
122122

123123
After `demux`, the stream is now located at the cluster. The stream returned by a demux operator preserves the remaining keys after the `MemberId` is consumed for routing. In this case, we are left with a `KeyedStream<u32, String, Cluster<'a, CounterShard>>` - a stream keyed by client ID and key name, located on the cluster.
@@ -143,7 +143,7 @@ Finally, you need to send the responses back to the leader process:
143143
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/partitioned_counter.rs">src/partitioned_counter.rs</Link>
144144
} showLineNumbers={41}>{getLines(partitionedCounterSrc, 41, 44, `
145145
let increment_ack = sharded_increment_ack
146-
.send(leader, TCP.bincode())
146+
.send(leader, TCP.fail_stop().bincode())
147147
.drop_key_prefix();
148148
`)}</CodeBlock>
149149

docs/docs/hydro/reference/live-collections/streams.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ let numbers: Stream<_, Process<_>, Bounded> = process
2424
.source_iter(q!(vec![1, 2, 3]))
2525
.map(q!(|x| x + 1));
2626
// 2, 3, 4
27-
# numbers.send(&p_out, TCP.bincode())
27+
# numbers.send(&p_out, TCP.fail_stop().bincode())
2828
# }, |mut stream| async move {
2929
# for w in 2..=4 {
3030
# assert_eq!(stream.next().await, Some(w));
@@ -41,9 +41,9 @@ Streams also can be sent over the network to participate in distributed programs
4141
let p1 = flow.process::<()>();
4242
let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
4343
let p2 = flow.process::<()>();
44-
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
44+
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
4545
// 1, 2, 3
46-
# on_p2.send(&p_out, TCP.bincode())
46+
# on_p2.send(&p_out, TCP.fail_stop().bincode())
4747
# }, |mut stream| async move {
4848
# for w in 1..=3 {
4949
# assert_eq!(stream.next().await, Some(w));
@@ -67,7 +67,7 @@ let numbers: Stream<_, Cluster<_>, Bounded, TotalOrder> =
6767
workers.source_iter(q!(vec![1, 2, 3]));
6868
let process: Process<()> = flow.process::<()>();
6969
let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> =
70-
numbers.send(&process, TCP.bincode()).values();
70+
numbers.send(&process, TCP.fail_stop().bincode()).values();
7171
```
7272

7373
The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.
@@ -82,7 +82,7 @@ let process: Process<()> = flow.process::<()>();
8282
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
8383
.source_iter(q!(vec!["hello", "world"]))
8484
.map(q!(|x| x.to_string()))
85-
.send(&process, TCP.bincode())
85+
.send(&process, TCP.fail_stop().bincode())
8686
.values();
8787
8888
let words_concat = all_words
@@ -109,7 +109,7 @@ To perform an aggregation with an unordered stream, you must add a **property an
109109
# let all_words: Stream<_, Process<_>, _, hydro_lang::live_collections::stream::NoOrder> = workers
110110
# .source_iter(q!(vec!["hello", "world"]))
111111
# .map(q!(|x| x.to_string()))
112-
# .send(&process, TCP.bincode())
112+
# .send(&process, TCP.fail_stop().bincode())
113113
# .values();
114114
let words_count = all_words
115115
.fold(q!(|| 0), q!(|acc, _| *acc += 1, commutative = ManualProof(/* increment is commutative */)));

docs/docs/hydro/reference/locations/clusters.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ When sending a live collection from a cluster to another location, **each** memb
3434
# tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
3535
# let workers: Cluster<()> = flow.cluster::<()>();
3636
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
37-
numbers.send(&process, TCP.bincode()) // KeyedStream<MemberId<()>, i32, ...>
37+
numbers.send(&process, TCP.fail_stop().bincode()) // KeyedStream<MemberId<()>, i32, ...>
3838
# .entries()
3939
# }, |mut stream| async move {
4040
// if there are 4 members in the cluster, we should receive 4 elements
@@ -58,7 +58,7 @@ If you do not need to know _which_ member of the cluster the data came from, you
5858
# tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
5959
# let workers: Cluster<()> = flow.cluster::<()>();
6060
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
61-
numbers.send(&process, TCP.bincode()).values()
61+
numbers.send(&process, TCP.fail_stop().bincode()).values()
6262
# }, |mut stream| async move {
6363
// if there are 4 members in the cluster, we should receive 4 elements
6464
// 1, 1, 1, 1
@@ -84,8 +84,8 @@ In the reverse direction, when sending a stream _to_ a cluster, the sender must
8484
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
8585
let on_worker: Stream<_, Cluster<_>, _> = numbers
8686
.map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87-
.demux(&workers, TCP.bincode());
88-
on_worker.send(&p2, TCP.bincode())
87+
.demux(&workers, TCP.fail_stop().bincode());
88+
on_worker.send(&p2, TCP.fail_stop().bincode())
8989
# .entries()
9090
// if there are 4 members in the cluster, we should receive 4 elements
9191
// { MemberId::<Worker>(0): [0], MemberId::<Worker>(1): [1], MemberId::<Worker>(2): [2], MemberId::<Worker>(3): [3] }
@@ -109,8 +109,8 @@ A common pattern in distributed systems is to broadcast data to all members of a
109109
# let p1 = flow.process::<()>();
110110
# let workers: Cluster<()> = flow.cluster::<()>();
111111
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
112-
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
113-
on_worker.send(&p2, TCP.bincode())
112+
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
113+
on_worker.send(&p2, TCP.fail_stop().bincode())
114114
# .entries()
115115
// if there are 4 members in the cluster, we should receive 4 elements
116116
// { MemberId::<Worker>(0): [123], MemberId::<Worker>(1): [123], MemberId::<Worker>(2): [123, MemberId::<Worker>(3): [123] }
@@ -135,7 +135,7 @@ let workers: Cluster<()> = flow.cluster::<()>();
135135
# // do nothing on each worker
136136
# workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
137137
let cluster_members = p1.source_cluster_members(&workers);
138-
# cluster_members.entries().send(&p2, TCP.bincode())
138+
# cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
139139
// if there are 4 members in the cluster, we would see a join event for each
140140
// { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
141141
# }, |mut stream| async move {
@@ -162,7 +162,7 @@ let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
162162
self_id_stream
163163
.filter(q!(|x| x.get_raw_id() % 2 == 0))
164164
.map(q!(|x| format!("hello from {}", x.get_raw_id())))
165-
.send(&process, TCP.bincode())
165+
.send(&process, TCP.fail_stop().bincode())
166166
.values()
167167
// if there are 4 members in the cluster, we should receive 2 elements
168168
// "hello from 0", "hello from 2"

docs/docs/hydro/reference/locations/processes.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,5 @@ Because a process represents a single machine, it is straightforward to send dat
3939
# let leader: Process<Leader> = flow.process::<Leader>();
4040
let numbers = leader.source_iter(q!(vec![1, 2, 3, 4]));
4141
let process2: Process<()> = flow.process::<()>();
42-
let on_p2: Stream<_, Process<()>, _> = numbers.send(&process2, TCP.bincode());
42+
let on_p2: Stream<_, Process<()>, _> = numbers.send(&process2, TCP.fail_stop().bincode());
4343
```

0 commit comments

Comments
 (0)