Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/docs/hydro/learn/quickstart/partitioned-counter.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ The `prefix_key` method adds a new key to the front of the keyed stream. Here, y

:::

Now you'll use `demux_bincode` to send data from the leader to the cluster. This is where Hydro fundamentally differs from traditional distributed systems frameworks. In most frameworks, you write separate programs for each service (one for the leader, one for the shards), then configure external message brokers or RPC systems to connect them. You have to manually serialize messages, manage network connections, and coordinate deployment.
Now you'll use `demux` to send data from the leader to the cluster using the configured network protocol and serialization format. This is where Hydro fundamentally differs from traditional distributed systems frameworks. In most frameworks, you write separate programs for each service (one for the leader, one for the shards), then configure external message brokers or RPC systems to connect them. You have to manually serialize messages, manage network connections, and coordinate deployment.

In Hydro, you write a **single Rust function** that describes the entire distributed service. When you call `demux_bincode`, you're performing network communication right in the middle of your function - but it feels like ordinary Rust code. The Hydro compiler automatically generates the network code, handles serialization, and deploys the right code to each machine. You can reason about your entire distributed system in one place, with full type safety and IDE support.
In Hydro, you write a **single Rust function** that describes the entire distributed service. When you call `demux`, you're performing network communication right in the middle of your function - but it feels like ordinary Rust code. The Hydro compiler automatically generates the network code, handles serialization, and deploys the right code to each machine. You can reason about your entire distributed system in one place, with full type safety and IDE support.

The `demux_bincode` method sends each element to the cluster member specified by the first component of the key:
The `demux` method sends each element to the cluster member specified by the first component of the key:

<CodeBlock language="rust" title={
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/partitioned_counter.rs">src/partitioned_counter.rs</Link>
} showLineNumbers={20}>{highlightLines(getLines(partitionedCounterSrc, 20, 34), [7, 15])}</CodeBlock>

After `demux_bincode`, the stream is now located at the cluster. The stream returned by a demux operator preserves the remaining keys after the `MemberId` is consumed for routing. In this case, we are left with a `KeyedStream<u32, String, Cluster<'a, CounterShard>>` - a stream keyed by client ID and key name, located on the cluster.
After `demux`, the stream is now located at the cluster. The stream returned by a demux operator preserves the remaining keys after the `MemberId` is consumed for routing. In this case, we are left with a `KeyedStream<u32, String, Cluster<'a, CounterShard>>` - a stream keyed by client ID and key name, located on the cluster.

## Running the Counter on the Cluster

Expand All @@ -96,7 +96,7 @@ Finally, you need to send the responses back to the leader process:
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/partitioned_counter.rs">src/partitioned_counter.rs</Link>
} showLineNumbers={41}>{getLines(partitionedCounterSrc, 41, 44)}</CodeBlock>

The `send_bincode` method sends data from the cluster to the leader process. When data moves from a cluster to a process, it arrives as a keyed stream with `MemberId` as the first key. The `drop_key_prefix` method removes this `MemberId` key, leaving just the original keys (client ID and response data).
The `send` method sends data from the cluster to the leader process. When data moves from a cluster to a process, it arrives as a keyed stream with `MemberId` as the first key. The `drop_key_prefix` method removes this `MemberId` key, leaving just the original keys (client ID and response data).

This is a standard Hydro pattern for building a partitioned service in Hydro: prefix a key for routing, demux to cluster, process, send back, drop the routing key. You will find similar code in key-value stores and even consensus protocols.

Expand Down
14 changes: 7 additions & 7 deletions docs/docs/hydro/reference/live-collections/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ let numbers: Stream<_, Process<_>, Unbounded> = process
.source_iter(q!(vec![1, 2, 3]))
.map(q!(|x| x + 1));
// 2, 3, 4
# numbers.send_bincode(&p_out)
# numbers.send(&p_out, TCP.bincode())
# }, |mut stream| async move {
# for w in 2..=4 {
# assert_eq!(stream.next().await, Some(w));
Expand All @@ -41,9 +41,9 @@ Streams also can be sent over the network to participate in distributed programs
let p1 = flow.process::<()>();
let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
let p2 = flow.process::<()>();
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
// 1, 2, 3
# on_p2.send_bincode(&p_out)
# on_p2.send(&p_out, TCP.bincode())
# }, |mut stream| async move {
# for w in 1..=3 {
# assert_eq!(stream.next().await, Some(w));
Expand All @@ -67,7 +67,7 @@ let numbers: Stream<_, Cluster<_>, Unbounded, TotalOrder> =
workers.source_iter(q!(vec![1, 2, 3]));
let process: Process<()> = flow.process::<()>();
let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> =
numbers.send_bincode(&process).values();
numbers.send(&process, TCP.bincode()).values();
```

The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.
Expand All @@ -82,7 +82,7 @@ let process: Process<()> = flow.process::<()>();
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
.source_iter(q!(vec!["hello", "world"]))
.map(q!(|x| x.to_string()))
.send_bincode(&process)
.send(&process, TCP.bincode())
.values();

let words_concat = all_words
Expand All @@ -92,7 +92,7 @@ let words_concat = all_words

:::tip

We use `values()` here to drop the member IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.
We use `values()` here to drop the member IDs which are included in `send`. See [Clusters](../locations/clusters.md) for more details.

Running an aggregation (`fold`, `reduce`) converts a `Stream` into a `Singleton`, as we see in the type signature here. The `Singleton` type is still "live" in the sense of a [Live Collection](./index.md), so updates to the `Stream` input cause updates to the `Singleton` output. See [Singletons and Optionals](./singletons-optionals.md) for more information.

Expand All @@ -109,7 +109,7 @@ To perform an aggregation with an unordered stream, you must use [`fold_commutat
# let all_words: Stream<_, Process<_>, _, hydro_lang::live_collections::stream::NoOrder> = workers
# .source_iter(q!(vec!["hello", "world"]))
# .map(q!(|x| x.to_string()))
# .send_bincode(&process)
# .send(&process, TCP.bincode())
# .values();
let words_count = all_words
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += 1));
Expand Down
24 changes: 12 additions & 12 deletions docs/docs/hydro/reference/locations/clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ let numbers = workers.source_iter(q!(vec![1, 2, 3, 4]));
```

## Networking
When sending a live collection from a cluster to another location, **each** member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a **keyed stream** of with `ID` keys and groups of `Data` values where the ID uniquely identifies which member of the cluster the data came from. For example, you can send a stream from the worker cluster to another process using the `send_bincode` method:
When sending a live collection from a cluster to another location, **each** member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a **keyed stream** of with `ID` keys and groups of `Data` values where the ID uniquely identifies which member of the cluster the data came from. For example, you can send a stream from the worker cluster to another process using the `send` method:

```rust
# use hydro_lang::prelude::*;
# use futures::StreamExt;
# tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode(&process) // KeyedStream<MemberId<()>, i32, ...>
numbers.send(&process, TCP.bincode()) // KeyedStream<MemberId<()>, i32, ...>
# .entries()
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
Expand All @@ -58,7 +58,7 @@ If you do not need to know _which_ member of the cluster the data came from, you
# tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode(&process).values()
numbers.send(&process, TCP.bincode()).values()
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
Expand All @@ -73,7 +73,7 @@ numbers.send_bincode(&process).values()

:::

In the reverse direction, when sending a stream _to_ a cluster, the sender must prepare `(ID, Data)` tuples, where the ID uniquely identifies which member of the cluster the data is intended for. Then, we can send a stream from a process to the worker cluster using the `demux_bincode` method:
In the reverse direction, when sending a stream _to_ a cluster, the sender must prepare `(ID, Data)` tuples, where the ID uniquely identifies which member of the cluster the data is intended for. Then, we can send a stream from a process to the worker cluster using the `demux` method:

```rust
# use hydro_lang::prelude::*;
Expand All @@ -84,8 +84,8 @@ In the reverse direction, when sending a stream _to_ a cluster, the sender must
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
.demux_bincode(&workers);
on_worker.send_bincode(&p2)
.demux(&workers, TCP.bincode());
on_worker.send(&p2, TCP.bincode())
# .entries()
// if there are 4 members in the cluster, we should receive 4 elements
// { MemberId::<Worker>(0): [0], MemberId::<Worker>(1): [1], MemberId::<Worker>(2): [2], MemberId::<Worker>(3): [3] }
Expand All @@ -100,7 +100,7 @@ on_worker.send_bincode(&p2)
```

## Broadcasting and Membership Lists
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using `broadcast_bincode`, which takes in a stream of **only data elements** and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using `broadcast`, which takes in a stream of **only data elements** and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:

```rust
# use hydro_lang::prelude::*;
Expand All @@ -109,8 +109,8 @@ A common pattern in distributed systems is to broadcast data to all members of a
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
on_worker.send_bincode(&p2)
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
on_worker.send(&p2, TCP.bincode())
# .entries()
// if there are 4 members in the cluster, we should receive 4 elements
// { MemberId::<Worker>(0): [123], MemberId::<Worker>(1): [123], MemberId::<Worker>(2): [123, MemberId::<Worker>(3): [123] }
Expand All @@ -124,7 +124,7 @@ on_worker.send_bincode(&p2)
# }));
```

This API requires a [non-determinism guard](../live-collections/determinism.md#unsafe-operations-in-hydro), because the set of cluster members may asynchronously change over time. Depending on when we are notified of membership changes, we will broadcast to different members. Under the hood, the `broadcast_bincode` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `source_cluster_members` method to get a stream of membership events (cluster members joining or leaving):
This API requires a [non-determinism guard](../live-collections/determinism.md#unsafe-operations-in-hydro), because the set of cluster members may asynchronously change over time. Depending on when we are notified of membership changes, we will broadcast to different members. Under the hood, the `broadcast` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `source_cluster_members` method to get a stream of membership events (cluster members joining or leaving):

```rust
# use hydro_lang::prelude::*;
Expand All @@ -135,7 +135,7 @@ let workers: Cluster<()> = flow.cluster::<()>();
# // do nothing on each worker
# workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
let cluster_members = p1.source_cluster_members(&workers);
# cluster_members.entries().send_bincode(&p2)
# cluster_members.entries().send(&p2, TCP.bincode())
// if there are 4 members in the cluster, we would see a join event for each
// { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
# }, |mut stream| async move {
Expand All @@ -162,7 +162,7 @@ let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
self_id_stream
.filter(q!(|x| x.get_raw_id() % 2 == 0))
.map(q!(|x| format!("hello from {}", x.get_raw_id())))
.send_bincode(&process)
.send(&process, TCP.bincode())
.values()
// if there are 4 members in the cluster, we should receive 2 elements
// "hello from 0", "hello from 2"
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/hydro/reference/locations/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Hydro is a **global**, **distributed** programming model. This means that the da

Each [live collection](pathname:///rustdoc/hydro_lang/live_collections/) has a type parameter `L` which will always be a type that implements the `Location` trait (e.g. [`Process`](./processes.md) and [`Cluster`](./clusters.md), documented in this section). Computation has to happen at a single place, so Hydro APIs that consume multiple live collections will require all inputs to have the same location type. Moreover, most Hydro APIs that transform live collections will emit a new live collection output with the same location type as the input.

To create distributed programs, Hydro provides a variety of APIs to _move_ live collections between locations via network send/receive. For example, `Stream`s can be sent from one process to another process using `.send_bincode(&loc2)` (which uses [bincode](https://docs.rs/bincode/latest/bincode/) as a serialization format). The sections for each location type ([`Process`](./processes.md), [`Cluster`](./clusters.md)) discuss the networking APIs in further detail.
To create distributed programs, Hydro provides a variety of APIs to _move_ live collections between locations via network send/receive. For example, `Stream`s can be sent from one process to another process using `.send(&loc2, ...)`. The sections for each location type ([`Process`](./processes.md), [`Cluster`](./clusters.md)) discuss the networking APIs in further detail.

## Creating Locations
Locations can be created by calling the appropriate method on the global `FlowBuilder` (e.g. `flow.process()` or `flow.cluster()`). These methods will return a handle to the location that can be used to create live collections and run computations.
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/hydro/reference/locations/processes.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ let numbers = leader.source_iter(q!(vec![1, 2, 3, 4]));
```

## Networking
Because a process represents a single machine, it is straightforward to send data to and from a process. For example, we can send a stream of integers from the leader process to another process using the `send_bincode` method (which uses [bincode](https://docs.rs/bincode/latest/bincode/) as a serialization format). This automatically sets up network senders and receivers on the two processes.
Because a process represents a single machine, it is straightforward to send data to and from a process. For example, we can send a stream of integers from the leader process to another process using the `send` method (which can be configured to use a particular network protocol and serialization format). This automatically sets up network senders and receivers on the two processes.

```rust,no_run
# use hydro_lang::prelude::*;
Expand All @@ -39,5 +39,5 @@ Because a process represents a single machine, it is straightforward to send dat
# let leader: Process<Leader> = flow.process::<Leader>();
let numbers = leader.source_iter(q!(vec![1, 2, 3, 4]));
let process2: Process<()> = flow.process::<()>();
let on_p2: Stream<_, Process<()>, _> = numbers.send_bincode(&process2);
let on_p2: Stream<_, Process<()>, _> = numbers.send(&process2, TCP.bincode());
```
6 changes: 3 additions & 3 deletions docs/src/pages/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ export default function Home() {
<CodeBlock
language="rust">
{`fn reduce_from_cluster(
data: Stream<usize, Cluster<Worker>, Unbounded>,
data: Stream<usize, Cluster<Worker>>,
leader: &Process<Leader>
) -> Singleton<usize, Process<Leader>, Unbounded> {
) -> Singleton<usize, Process<Leader>> {
data
.send_bincode(leader)
.send(leader, TCP.bincode())
// Stream<(MemberId<Worker>, usize), Process<Leader>, ..., NoOrder>
.map(q!(|v| v.1)) // drop the ID
.fold_commutative(q!(0), q!(|acc, v| *acc += v))
Expand Down
3 changes: 3 additions & 0 deletions hydro_lang/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod prelude {
pub use crate::live_collections::sliced::sliced;
pub use crate::live_collections::stream::Stream;
pub use crate::location::{Cluster, External, Location as _, Process, Tick};
pub use crate::networking::TCP;
pub use crate::nondet::{NonDet, nondet};

/// A macro to set up a Hydro crate.
Expand Down Expand Up @@ -81,6 +82,8 @@ pub mod live_collections;

pub mod location;

pub mod networking;

pub mod telemetry;

pub mod tests;
Expand Down
Loading
Loading