Skip to content

Commit 708e835

Browse files
authored
feat(hydro_lang): make network channels configurable with a generic Stream::send API (#2400)
This eliminates the hardcoding of networking APIs to specific serialzation formats, transport protocols, etc. Includes similar refactors across `demux` / `round_robin` / etc APIs.
1 parent 149e3e7 commit 708e835

File tree

29 files changed

+906
-134
lines changed

29 files changed

+906
-134
lines changed

docs/docs/hydro/learn/quickstart/partitioned-counter.mdx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,17 @@ The `prefix_key` method adds a new key to the front of the keyed stream. Here, y
6868

6969
:::
7070

71-
Now you'll use `demux_bincode` to send data from the leader to the cluster. This is where Hydro fundamentally differs from traditional distributed systems frameworks. In most frameworks, you write separate programs for each service (one for the leader, one for the shards), then configure external message brokers or RPC systems to connect them. You have to manually serialize messages, manage network connections, and coordinate deployment.
71+
Now you'll use `demux` to send data from the leader to the cluster using the configured network protocol and serialization format. This is where Hydro fundamentally differs from traditional distributed systems frameworks. In most frameworks, you write separate programs for each service (one for the leader, one for the shards), then configure external message brokers or RPC systems to connect them. You have to manually serialize messages, manage network connections, and coordinate deployment.
7272

73-
In Hydro, you write a **single Rust function** that describes the entire distributed service. When you call `demux_bincode`, you're performing network communication right in the middle of your function - but it feels like ordinary Rust code. The Hydro compiler automatically generates the network code, handles serialization, and deploys the right code to each machine. You can reason about your entire distributed system in one place, with full type safety and IDE support.
73+
In Hydro, you write a **single Rust function** that describes the entire distributed service. When you call `demux`, you're performing network communication right in the middle of your function - but it feels like ordinary Rust code. The Hydro compiler automatically generates the network code, handles serialization, and deploys the right code to each machine. You can reason about your entire distributed system in one place, with full type safety and IDE support.
7474

75-
The `demux_bincode` method sends each element to the cluster member specified by the first component of the key:
75+
The `demux` method sends each element to the cluster member specified by the first component of the key:
7676

7777
<CodeBlock language="rust" title={
7878
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/partitioned_counter.rs">src/partitioned_counter.rs</Link>
7979
} showLineNumbers={20}>{highlightLines(getLines(partitionedCounterSrc, 20, 34), [7, 15])}</CodeBlock>
8080

81-
After `demux_bincode`, the stream is now located at the cluster. The stream returned by a demux operator preserves the remaining keys after the `MemberId` is consumed for routing. In this case, we are left with a `KeyedStream<u32, String, Cluster<'a, CounterShard>>` - a stream keyed by client ID and key name, located on the cluster.
81+
After `demux`, the stream is now located at the cluster. The stream returned by a demux operator preserves the remaining keys after the `MemberId` is consumed for routing. In this case, we are left with a `KeyedStream<u32, String, Cluster<'a, CounterShard>>` - a stream keyed by client ID and key name, located on the cluster.
8282

8383
## Running the Counter on the Cluster
8484

@@ -96,7 +96,7 @@ Finally, you need to send the responses back to the leader process:
9696
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/partitioned_counter.rs">src/partitioned_counter.rs</Link>
9797
} showLineNumbers={41}>{getLines(partitionedCounterSrc, 41, 44)}</CodeBlock>
9898

99-
The `send_bincode` method sends data from the cluster to the leader process. When data moves from a cluster to a process, it arrives as a keyed stream with `MemberId` as the first key. The `drop_key_prefix` method removes this `MemberId` key, leaving just the original keys (client ID and response data).
99+
The `send` method sends data from the cluster to the leader process. When data moves from a cluster to a process, it arrives as a keyed stream with `MemberId` as the first key. The `drop_key_prefix` method removes this `MemberId` key, leaving just the original keys (client ID and response data).
100100

101101
This is a standard Hydro pattern for building a partitioned service in Hydro: prefix a key for routing, demux to cluster, process, send back, drop the routing key. You will find similar code in key-value stores and even consensus protocols.
102102

docs/docs/hydro/reference/live-collections/streams.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ let numbers: Stream<_, Process<_>, Unbounded> = process
2424
.source_iter(q!(vec![1, 2, 3]))
2525
.map(q!(|x| x + 1));
2626
// 2, 3, 4
27-
# numbers.send_bincode(&p_out)
27+
# numbers.send(&p_out, TCP.bincode())
2828
# }, |mut stream| async move {
2929
# for w in 2..=4 {
3030
# assert_eq!(stream.next().await, Some(w));
@@ -41,9 +41,9 @@ Streams also can be sent over the network to participate in distributed programs
4141
let p1 = flow.process::<()>();
4242
let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
4343
let p2 = flow.process::<()>();
44-
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
44+
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
4545
// 1, 2, 3
46-
# on_p2.send_bincode(&p_out)
46+
# on_p2.send(&p_out, TCP.bincode())
4747
# }, |mut stream| async move {
4848
# for w in 1..=3 {
4949
# assert_eq!(stream.next().await, Some(w));
@@ -67,7 +67,7 @@ let numbers: Stream<_, Cluster<_>, Unbounded, TotalOrder> =
6767
workers.source_iter(q!(vec![1, 2, 3]));
6868
let process: Process<()> = flow.process::<()>();
6969
let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> =
70-
numbers.send_bincode(&process).values();
70+
numbers.send(&process, TCP.bincode()).values();
7171
```
7272

7373
The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.
@@ -82,7 +82,7 @@ let process: Process<()> = flow.process::<()>();
8282
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
8383
.source_iter(q!(vec!["hello", "world"]))
8484
.map(q!(|x| x.to_string()))
85-
.send_bincode(&process)
85+
.send(&process, TCP.bincode())
8686
.values();
8787
8888
let words_concat = all_words
@@ -92,7 +92,7 @@ let words_concat = all_words
9292

9393
:::tip
9494

95-
We use `values()` here to drop the member IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.
95+
We use `values()` here to drop the member IDs which are included in `send`. See [Clusters](../locations/clusters.md) for more details.
9696

9797
Running an aggregation (`fold`, `reduce`) converts a `Stream` into a `Singleton`, as we see in the type signature here. The `Singleton` type is still "live" in the sense of a [Live Collection](./index.md), so updates to the `Stream` input cause updates to the `Singleton` output. See [Singletons and Optionals](./singletons-optionals.md) for more information.
9898

@@ -109,7 +109,7 @@ To perform an aggregation with an unordered stream, you must use [`fold_commutat
109109
# let all_words: Stream<_, Process<_>, _, hydro_lang::live_collections::stream::NoOrder> = workers
110110
# .source_iter(q!(vec!["hello", "world"]))
111111
# .map(q!(|x| x.to_string()))
112-
# .send_bincode(&process)
112+
# .send(&process, TCP.bincode())
113113
# .values();
114114
let words_count = all_words
115115
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += 1));

docs/docs/hydro/reference/locations/clusters.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ let numbers = workers.source_iter(q!(vec![1, 2, 3, 4]));
2626
```
2727

2828
## Networking
29-
When sending a live collection from a cluster to another location, **each** member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a **keyed stream** of with `ID` keys and groups of `Data` values where the ID uniquely identifies which member of the cluster the data came from. For example, you can send a stream from the worker cluster to another process using the `send_bincode` method:
29+
When sending a live collection from a cluster to another location, **each** member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a **keyed stream** of with `ID` keys and groups of `Data` values where the ID uniquely identifies which member of the cluster the data came from. For example, you can send a stream from the worker cluster to another process using the `send` method:
3030

3131
```rust
3232
# use hydro_lang::prelude::*;
3333
# use futures::StreamExt;
3434
# tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
3535
# let workers: Cluster<()> = flow.cluster::<()>();
3636
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
37-
numbers.send_bincode(&process) // KeyedStream<MemberId<()>, i32, ...>
37+
numbers.send(&process, TCP.bincode()) // KeyedStream<MemberId<()>, i32, ...>
3838
# .entries()
3939
# }, |mut stream| async move {
4040
// if there are 4 members in the cluster, we should receive 4 elements
@@ -58,7 +58,7 @@ If you do not need to know _which_ member of the cluster the data came from, you
5858
# tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
5959
# let workers: Cluster<()> = flow.cluster::<()>();
6060
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
61-
numbers.send_bincode(&process).values()
61+
numbers.send(&process, TCP.bincode()).values()
6262
# }, |mut stream| async move {
6363
// if there are 4 members in the cluster, we should receive 4 elements
6464
// 1, 1, 1, 1
@@ -73,7 +73,7 @@ numbers.send_bincode(&process).values()
7373

7474
:::
7575

76-
In the reverse direction, when sending a stream _to_ a cluster, the sender must prepare `(ID, Data)` tuples, where the ID uniquely identifies which member of the cluster the data is intended for. Then, we can send a stream from a process to the worker cluster using the `demux_bincode` method:
76+
In the reverse direction, when sending a stream _to_ a cluster, the sender must prepare `(ID, Data)` tuples, where the ID uniquely identifies which member of the cluster the data is intended for. Then, we can send a stream from a process to the worker cluster using the `demux` method:
7777

7878
```rust
7979
# use hydro_lang::prelude::*;
@@ -84,8 +84,8 @@ In the reverse direction, when sending a stream _to_ a cluster, the sender must
8484
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
8585
let on_worker: Stream<_, Cluster<_>, _> = numbers
8686
.map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87-
.demux_bincode(&workers);
88-
on_worker.send_bincode(&p2)
87+
.demux(&workers, TCP.bincode());
88+
on_worker.send(&p2, TCP.bincode())
8989
# .entries()
9090
// if there are 4 members in the cluster, we should receive 4 elements
9191
// { MemberId::<Worker>(0): [0], MemberId::<Worker>(1): [1], MemberId::<Worker>(2): [2], MemberId::<Worker>(3): [3] }
@@ -100,7 +100,7 @@ on_worker.send_bincode(&p2)
100100
```
101101

102102
## Broadcasting and Membership Lists
103-
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using `broadcast_bincode`, which takes in a stream of **only data elements** and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:
103+
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using `broadcast`, which takes in a stream of **only data elements** and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:
104104

105105
```rust
106106
# use hydro_lang::prelude::*;
@@ -109,8 +109,8 @@ A common pattern in distributed systems is to broadcast data to all members of a
109109
# let p1 = flow.process::<()>();
110110
# let workers: Cluster<()> = flow.cluster::<()>();
111111
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
112-
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
113-
on_worker.send_bincode(&p2)
112+
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
113+
on_worker.send(&p2, TCP.bincode())
114114
# .entries()
115115
// if there are 4 members in the cluster, we should receive 4 elements
116116
// { MemberId::<Worker>(0): [123], MemberId::<Worker>(1): [123], MemberId::<Worker>(2): [123, MemberId::<Worker>(3): [123] }
@@ -124,7 +124,7 @@ on_worker.send_bincode(&p2)
124124
# }));
125125
```
126126

127-
This API requires a [non-determinism guard](../live-collections/determinism.md#unsafe-operations-in-hydro), because the set of cluster members may asynchronously change over time. Depending on when we are notified of membership changes, we will broadcast to different members. Under the hood, the `broadcast_bincode` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `source_cluster_members` method to get a stream of membership events (cluster members joining or leaving):
127+
This API requires a [non-determinism guard](../live-collections/determinism.md#unsafe-operations-in-hydro), because the set of cluster members may asynchronously change over time. Depending on when we are notified of membership changes, we will broadcast to different members. Under the hood, the `broadcast` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `source_cluster_members` method to get a stream of membership events (cluster members joining or leaving):
128128

129129
```rust
130130
# use hydro_lang::prelude::*;
@@ -135,7 +135,7 @@ let workers: Cluster<()> = flow.cluster::<()>();
135135
# // do nothing on each worker
136136
# workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
137137
let cluster_members = p1.source_cluster_members(&workers);
138-
# cluster_members.entries().send_bincode(&p2)
138+
# cluster_members.entries().send(&p2, TCP.bincode())
139139
// if there are 4 members in the cluster, we would see a join event for each
140140
// { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
141141
# }, |mut stream| async move {
@@ -162,7 +162,7 @@ let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
162162
self_id_stream
163163
.filter(q!(|x| x.get_raw_id() % 2 == 0))
164164
.map(q!(|x| format!("hello from {}", x.get_raw_id())))
165-
.send_bincode(&process)
165+
.send(&process, TCP.bincode())
166166
.values()
167167
// if there are 4 members in the cluster, we should receive 2 elements
168168
// "hello from 0", "hello from 2"

docs/docs/hydro/reference/locations/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ Hydro is a **global**, **distributed** programming model. This means that the da
33

44
Each [live collection](pathname:///rustdoc/hydro_lang/live_collections/) has a type parameter `L` which will always be a type that implements the `Location` trait (e.g. [`Process`](./processes.md) and [`Cluster`](./clusters.md), documented in this section). Computation has to happen at a single place, so Hydro APIs that consume multiple live collections will require all inputs to have the same location type. Moreover, most Hydro APIs that transform live collections will emit a new live collection output with the same location type as the input.
55

6-
To create distributed programs, Hydro provides a variety of APIs to _move_ live collections between locations via network send/receive. For example, `Stream`s can be sent from one process to another process using `.send_bincode(&loc2)` (which uses [bincode](https://docs.rs/bincode/latest/bincode/) as a serialization format). The sections for each location type ([`Process`](./processes.md), [`Cluster`](./clusters.md)) discuss the networking APIs in further detail.
6+
To create distributed programs, Hydro provides a variety of APIs to _move_ live collections between locations via network send/receive. For example, `Stream`s can be sent from one process to another process using `.send(&loc2, ...)`. The sections for each location type ([`Process`](./processes.md), [`Cluster`](./clusters.md)) discuss the networking APIs in further detail.
77

88
## Creating Locations
99
Locations can be created by calling the appropriate method on the global `FlowBuilder` (e.g. `flow.process()` or `flow.cluster()`). These methods will return a handle to the location that can be used to create live collections and run computations.

docs/docs/hydro/reference/locations/processes.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ let numbers = leader.source_iter(q!(vec![1, 2, 3, 4]));
3030
```
3131

3232
## Networking
33-
Because a process represents a single machine, it is straightforward to send data to and from a process. For example, we can send a stream of integers from the leader process to another process using the `send_bincode` method (which uses [bincode](https://docs.rs/bincode/latest/bincode/) as a serialization format). This automatically sets up network senders and receivers on the two processes.
33+
Because a process represents a single machine, it is straightforward to send data to and from a process. For example, we can send a stream of integers from the leader process to another process using the `send` method (which can be configured to use a particular network protocol and serialization format). This automatically sets up network senders and receivers on the two processes.
3434

3535
```rust,no_run
3636
# use hydro_lang::prelude::*;
@@ -39,5 +39,5 @@ Because a process represents a single machine, it is straightforward to send dat
3939
# let leader: Process<Leader> = flow.process::<Leader>();
4040
let numbers = leader.source_iter(q!(vec![1, 2, 3, 4]));
4141
let process2: Process<()> = flow.process::<()>();
42-
let on_p2: Stream<_, Process<()>, _> = numbers.send_bincode(&process2);
42+
let on_p2: Stream<_, Process<()>, _> = numbers.send(&process2, TCP.bincode());
4343
```

docs/src/pages/index.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,11 @@ export default function Home() {
123123
<CodeBlock
124124
language="rust">
125125
{`fn reduce_from_cluster(
126-
data: Stream<usize, Cluster<Worker>, Unbounded>,
126+
data: Stream<usize, Cluster<Worker>>,
127127
leader: &Process<Leader>
128-
) -> Singleton<usize, Process<Leader>, Unbounded> {
128+
) -> Singleton<usize, Process<Leader>> {
129129
data
130-
.send_bincode(leader)
130+
.send(leader, TCP.bincode())
131131
// Stream<(MemberId<Worker>, usize), Process<Leader>, ..., NoOrder>
132132
.map(q!(|v| v.1)) // drop the ID
133133
.fold_commutative(q!(0), q!(|acc, v| *acc += v))

hydro_lang/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub mod prelude {
5353
pub use crate::live_collections::sliced::sliced;
5454
pub use crate::live_collections::stream::Stream;
5555
pub use crate::location::{Cluster, External, Location as _, Process, Tick};
56+
pub use crate::networking::TCP;
5657
pub use crate::nondet::{NonDet, nondet};
5758

5859
/// A macro to set up a Hydro crate.
@@ -82,6 +83,8 @@ pub mod live_collections;
8283

8384
pub mod location;
8485

86+
pub mod networking;
87+
8588
pub mod telemetry;
8689

8790
pub mod tests;

0 commit comments

Comments
 (0)