Skip to content

Conversation

@shadaj
Copy link
Member

@shadaj shadaj commented Aug 21, 2025

No description provided.

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Aug 21, 2025

Deploying hydroflow with  Cloudflare Pages  Cloudflare Pages

Latest commit: 34f0c80
Status: ✅  Deploy successful!
Preview URL: https://2c7d1fe0.hydroflow.pages.dev
Branch Preview URL: https://pr2027.hydroflow.pages.dev

View logs

@shadaj
Copy link
Member Author

shadaj commented Aug 21, 2025

@shadaj shadaj force-pushed the pr2027 branch 2 times, most recently from cb86b05 to 19cd28f Compare August 21, 2025 22:12
@conor-23
Copy link
Contributor

I agree that we both need the ability to say that things happen atomically, but that developers using this hinders our ability to do optimization so we want to discourage this as much as possible. We talk about the analogy of developers making their systems correct by putting locks everywhere. This could end up being used in a similar way if we don't direct people towards the Hydraulic way of building with this primitive.

I think the guarantee this offers by default should be mappable to a classic consistency level or it should be obvious how a developer specifies what consistency level they want this to offer. If we are joining things together across clients, we need an answer to what consistency that joining offers.

Ensuring this atomicity cross-node will be expensive, but we do need a story for it. Should the cross-node version be the same keyword? a align_point_networked() keyword? Would it feel natural to program with both the align_point and align_point_networked keywords?

@shadaj
Copy link
Member Author

shadaj commented Aug 22, 2025

Great points @conor-23. I've updated the RFC to use .local_align as the API, and also added a "possible more explicit design" that will let us throw more errors at compile time to prevent accidental attempts to use alignment points across the network. Also added a brief section on what align points don't guarantee across distribution.

Also agreed that eventually we will likely want a very similar API for cross-node atomicity. I'm thinking along the same lines of a align_point_networked API that would take additional parameters to configure the protocol used for alignment. Though I think implementing such an API is a bit further out...

@shadaj shadaj marked this pull request as ready for review August 25, 2025 20:29
> [!NOTE]
> **More Explicit Design:**
>
> Then, we need to declare _which_ upstream values will be aligned at this point. In our case, we want alignment for the increment stream, so we call `aligned_at` to declare that this stream will be aligned when accessed at that point.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep going back and forth on whether I like this more explicit approach. On one hand, it helps developers think even more carefully about what is being aligned instead of a semi-opaque tracing approach. On the other hand, it can lead to frustration if you accidentally forget to add a aligned_at annotation on something you do want to be aligned.

In a world where developers are building everything using the simulator, I'd hope that this concern is mitigated since the simulator would quickly identify misalignment failures. But I am not sure how likely that world is.

@MingweiSamuel
Copy link
Member

The API itself seems clunky, which is maybe fine. Someone like a closure or block feels more natural

I wonder how the actual implementation can be decoupled from ticks in the future; I assume that is desirable

@shadaj
Copy link
Member Author

shadaj commented Aug 27, 2025

Update: added to FAQ in the RFC doc

The API itself seems clunky, which is maybe fine. Someone like a closure or block feels more natural

I think the tendency towards a closure / block arises from a bit of historical mental model. We've been thinking of this in terms of atomic compute sections that execute in a single tick on a single machine, with the effect being that all downstreams will see the same version of data.

The new API is more like providing consistency guarantees when asking for a version of some asynchronously updated data. The necessary atomic region is actually inferred (in fact, there may be methods other than atomic regions for achieving the same result!). As a result, it's not clear where the beginning of a closure / block would need to be.

The other reason for the non-block approach is it's common to have semantically separate pieces of logic that need to share a consistent view of some shared data. For example different request handlers that need to read the same key. Ideally, these separate components can be in different Rust functions, but that means that we can't have a "block" around the portions of each function that deal with the shared data. Instead, the token approach lets us pass in a "token that points to some consistent but non-deterministic version of the data".

For co-incidence and loops, the latter situation is far less common; the entirety of a loop is almost always in a single function. So there I think we will want to move towards a block / closure approach (but that is for a separate RFC).

@shadaj shadaj force-pushed the pr2027 branch 2 times, most recently from 2a7d3b6 to 34f0c80 Compare August 27, 2025 22:40
@cdouglas
Copy link

cdouglas commented Aug 28, 2025

Is it a goal to remove ticks from the API, using alignment as constraints for reordering and scheduling? If so, aligning with cuts in the stream helps with atomicity and grouping, but can one specify ordering constraints without referring to ticks?

My only experience with this is batched incremental join, so grains of salt and all that, but in this example a batch from two streams is aligned with a 1-tick delayed snapshot of these streams. Can alignment include before/after the cut point in the stream, so we don't need to refer to ticks to say which "side" of the consistent view we're referring to?

Returning to the join, it uses the same cut point in dR and dS, but they don't need to be aligned with each other. As the streaming incremental join demonstrates, two of the branches are empty if we pull from either dR or dS, so an implementation could batch (and execute) these in different ticks if it were not constrained to align dR and dS. Something like:

let r_idx = r_stream.local_cut(nondet!(/** ... */));
let r = r_stream.clone()
    .map(q!(...))
    .into_keyed()
    .fold_commutative(...)
    .snapshot_before(&r_idx);
let r_x_ds = r_stream.clone()
    .map(q!(...))
	.batch_after(&r_idx);

So there are no messages between r and r_x_ds- they're aligned- but it's also clear what's included in the view.

If it's an ambition to capture causal or sequential consistency, cuts at scopes other than Process might be meaningful? For example, delivery for reliable broadcast could create a cut over multiple, ordered steams. It's not obvious (to me) how to use alignment without ticks to capture that relationship. Maybe this belongs at another level, though.

@shadaj
Copy link
Member Author

shadaj commented Aug 28, 2025

Is it a goal to remove ticks from the API, using alignment as constraints for reordering and scheduling? If so, aligning with cuts in the stream helps with atomicity and grouping, but can one specify ordering constraints without referring to ticks?

I think ticks will remain in some form (perhaps under a different name), but the goal that ticks should only be used when there is meaningful iterative / logical-time-dependent code. So of the three current uses of ticks, only the first should remain:

  1. Stateful iterative logic that requires manually tracking state across batches of input (i.e. handwritten semi-naive join, quorum counting)
  2. Intentionally observing transient state (to respond to asynchronous requests, inside protocols, etc.)
  3. Ensuring that multiple branches of a shared upstream can be batches / snapshotted in a consistent manner to ensure local consistency (this RFC)

My only experience with this is batched incremental join, so grains of salt and all that, but in this example a batch from two streams is aligned with a 1-tick delayed snapshot of these streams. Can alignment include before/after the cut point in the stream, so we don't need to refer to ticks to say which "side" of the consistent view we're referring to?

So in this design, when we batch a stream, we still provide a tick in addition to the token that ensures it is consistent with respect to other batches / snapshots. So in your example, I think we would do something like this with the propoosed API

let r_stream_align = process.local_align();
let r = r_stream.clone()
    .map(q!(...))
    .into_keyed()
    .fold_commutative(...)
    .snapshot_aligned(&process.tick(), &r_idx)
    .defer_tick(); // get the snapshot from the previous tick
let r_x_ds = r_stream.clone()
    .map(q!(...))
    .batch_aligned(&process.tick(), &r_idx);

I think it's a bit hard to know (for me) how much better / worse this is compared to the existing API for your use case, but the idea is that ticks should be involved whenever there is any logic dealing with consecutive steps of time.

If it's an ambition to capture causal or sequential consistency, cuts at scopes other than Process might be meaningful? For example, delivery for reliable broadcast could create a cut over multiple, ordered steams. It's not obvious (to me) how to use alignment without ticks to capture that relationship. Maybe this belongs at another level, though.

Yeah, I think this is similar to @conor-23 brought up. I think the ambition is to have a similar, but separate API for slices / alignment that span several machines. Would have to be a different API both because the guarantees will be different and also since we will want to let developers configure the mechanism / consistency level they want. But the hope is that it will be a conceptual sibling to local alignment.

@cdouglas
Copy link

the idea is that ticks should be involved whenever there is any logic dealing with consecutive steps of time.

when we batch a stream, we still provide a tick in addition to the token that ensures it is consistent with respect to other batches / snapshots

Got it. This RFC definitely improves on the existing atomic API and I don't want to slow it down. Thinking in ticks instead of dependencies may just be something to get used to.

Would cross-machine alignment be tied to ticks? Or are ticks exclusively local, and there's a different synchronization/heartbeat... token across processes? Alignment is passing something more like a scope to the stream, but even using the same tick it's not aligning across streams? In the incremental join example, it could buffer dR and pull from dS for multiple ticks, and alignment doesn't imply any relationship across them?

This could be read as at each process, create a batch/snapshot that is non-deterministically and independently chosen from the stream whereas attempting a batch/snapshot at a different scope (cluster?), it would require proof that barrier was meaningful? So something like cluster_align would require something underpinning it, like causal reliable broadcast?

Copy link
Collaborator

@jhellerstein jhellerstein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Sep 9, 2025

Deploying hydro with  Cloudflare Pages  Cloudflare Pages

Latest commit: f56e168
Status: ✅  Deploy successful!
Preview URL: https://8d0b0af2.hydroflow.pages.dev
Branch Preview URL: https://pr2027.hydroflow.pages.dev

View logs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants