Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -21590,7 +21590,7 @@
},
"ordered": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Ordered",
"description": "Ordered enables ordered processing for the entire pipeline. When enabled, messages will be processed in order based on their event time. This can be overridden at the vertex level."
"description": "Ordered enables order-preserving processing for the entire pipeline. When enabled, messages will be processed in their arrival order (FIFO within each partition). This can be overridden at the vertex level."
},
"sideInputs": {
"description": "SideInputs defines the Side Inputs of a pipeline.",
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -21576,7 +21576,7 @@
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.PipelineLimits"
},
"ordered": {
"description": "Ordered enables ordered processing for the entire pipeline. When enabled, messages will be processed in order based on their event time. This can be overridden at the vertex level.",
"description": "Ordered enables order-preserving processing for the entire pipeline. When enabled, messages will be processed in their arrival order (FIFO within each partition). This can be overridden at the vertex level.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Ordered"
},
"sideInputs": {
Expand Down
12 changes: 6 additions & 6 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -8105,9 +8105,9 @@ InterStepBuffer configuration specific to this pipeline.
<em>(Optional)</em>
<p>

Ordered enables ordered processing for the entire pipeline. When
enabled, messages will be processed in order based on their event time.
This can be overridden at the vertex level.
Ordered enables order-preserving processing for the entire pipeline.
When enabled, messages will be processed in their arrival order (FIFO
within each partition). This can be overridden at the vertex level.
</p>

</td>
Expand Down Expand Up @@ -8552,9 +8552,9 @@ InterStepBuffer configuration specific to this pipeline.
<em>(Optional)</em>
<p>

Ordered enables ordered processing for the entire pipeline. When
enabled, messages will be processed in order based on their event time.
This can be overridden at the vertex level.
Ordered enables order-preserving processing for the entire pipeline.
When enabled, messages will be processed in their arrival order (FIFO
within each partition). This can be overridden at the vertex level.
</p>

</td>
Expand Down
117 changes: 87 additions & 30 deletions docs/user-guide/reference/ordered-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,55 @@

> **Available from v1.8**

By default, Numaflow improves throughput by distributing work across any available processing unit, which results in
out-of-order processing. However, some workflows require messages to be processed in a deterministic order — for
example, a create-update-delete sequence where you cannot update a record before it has been created.
By default, Numaflow improves throughput by distributing work across any available processing unit, which means
messages may be processed out of the order in which they arrived. However, some workflows require messages to be
processed in their arrival order - for example, a create-update-delete sequence where you cannot update a record
before it has been created.

Ordered processing in Numaflow provides partitioned FIFO semantics: within a partition, the N-th message is processed
only after the (N-1)-th message completes.
The **ordered processing** feature provides **input-order preservation** with partitioned FIFO semantics: within a
partition, the N-th message is processed only after the (N-1)-th message completes. The ordering is based on the
position of each message in the Inter-Step Buffer (ISB), not on event timestamps.

## What Order Preservation Means

It is important to distinguish **order preservation** from **timestamp ordering**:

| Concept | Meaning |
|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Order preservation (this feature)** | Messages are processed in the same order they were written to the ISB - i.e., FIFO. Message N is processed only after message N-1 completes, within a partition. |
| **Timestamp ordering** | Messages are sorted by event time so that earlier events are processed before later ones. This feature does **not** provide timestamp ordering. |

Sources may emit events whose event-time timestamps are not monotonically increasing (e.g., late-arriving data,
multiple producers). Order preservation guarantees that messages are processed in their **arrival order** (the order
the source emitted them into the pipeline), regardless of their event timestamps.

In short: if your source emits messages A, B, C in that sequence, order preservation guarantees they are processed in
the sequence A, B, C - even if B has an earlier event timestamp than A.

## How It Works

Ordered processing works differently depending on the vertex type:

| Vertex Type | Behavior |
|-------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Source** | Always ordered by nature — no configuration needed. |
| Vertex Type | Behavior |
|-------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Source** | Always preserves input order - messages are emitted into the pipeline in the order received from the external source. No configuration needed. |
| **Map** | Requires `partitions` to be configured. Replicas are fixed to the `partitions` count. Messages are routed to partitions by key hash, so all messages with the same key are processed by the same pod in order. |
| **Reduce** | Already partitioned and ordered — no additional configuration needed. |
| **Sink** | Same as Map requires `partitions`. Replicas are fixed to the partition count. |
| **Reduce** | Already partitioned and order-preserving - no additional configuration needed. |
| **Sink** | Same as Map - requires `partitions`. Replicas are fixed to the partition count. |

When ordered processing is enabled for a Map or Sink vertex:

- The number of replicas is automatically fixed to the partition count (one pod per partition).
- Autoscaling is disabled for that vertex you must not set `scale.min` or `scale.max`.
- Messages are routed to partitions by hashing their keys, ensuring all messages with the same key go to the same pod in
FIFO order.
- Autoscaling is disabled for that vertex - you must not set `scale.min` or `scale.max`.
- Messages are routed to partitions by hashing their keys, ensuring all messages with the same key go to the same pod,
preserving their arrival order.


## Pipeline Specification

Enable ordered processing by setting `ordered.enabled: true` at the pipeline level. For Map and Sink vertices, set
`partitions` to the number of ordered lanes you need — the controller will fix replicas to that count automatically.
Enable order-preserving processing by setting `ordered.enabled: true` at the pipeline level. For Map and Sink
vertices, set `partitions` to the number of ordered lanes you need - the controller will fix replicas to that count
automatically.

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
Expand All @@ -41,10 +61,10 @@ spec:
limits:
readBatchSize: 1 # recommended for strict ordering
ordered:
enabled: true # enable ordered processing pipeline-wide
enabled: true # enable order-preserving processing pipeline-wide
vertices:
- name: my-source
source: {} # always ordered; no extra config needed
source: {} # always preserves input order; no extra config needed
- name: my-map
partitions: N # N replicas will be created, one per partition
udf:
Expand All @@ -62,7 +82,7 @@ spec:

## Per-Vertex Override

Ordered processing can also be enabled or disabled at the individual vertex level, which overrides the pipeline-level
Ordered processing can also be enabled or disabled at the individual vertex level, which overrides the pipeline-level
setting. This is useful when you want most vertices to run unordered for throughput, but need ordering for specific steps.

```yaml
Expand All @@ -86,27 +106,62 @@ spec:

Setting `partitions` on a vertex that has ordered processing disabled (or overridden to `false`) is valid. The vertex
will still have N ISB buffer partitions created, giving you a [multi-partitioned edge](multi-partition.md) for higher
throughput. However, replicas are determined by normal autoscaling they are not fixed to N so multiple replicas may
throughput. However, replicas are determined by normal autoscaling - they are not fixed to N - so multiple replicas may
read from the same partition, or one replica may handle multiple partitions. There is no ordering guarantee in this
case.

## Caveats and Limitations

- **Autoscaling is not supported** for Map and Sink vertices with ordered processing enabled. The replica count is fixed
to the partition count. Setting `scale.min` or `scale.max` on such vertices will cause a validation error.
- **Reduce vertices** are already partitioned and ordered by design; the `ordered` setting is ignored for them.
- **Source vertices** are always ordered regardless of the `ordered` setting.
- **Key-based routing**: ordering is guaranteed per key. Messages with different keys may still be interleaved across
- **Reduce vertices** are already partitioned and order-preserving by design; the `ordered` setting is ignored for them.
- **Source vertices** always preserve input order regardless of the `ordered` setting.
- **Key-based routing**: ordering is guaranteed per key. Messages with different keys may still be interleaved across
partitions. Ensure your UDF or SDK sets meaningful message keys to leverage per-key ordering.
- **`readBatchSize: 1`** is strongly recommended for strict ordering. With a larger batch size, multiple messages may
- **`readBatchSize: 1`** is required for strict ordering. With a larger batch size, multiple messages may
be in-flight simultaneously within a single pod.
- **Throughput trade-off**: ordered processing limits parallelism within a partition. Consider the number of partitions
- **Throughput trade-off**: ordered processing limits parallelism within a partition. Consider the number of partitions
carefully to balance ordering guarantees with throughput requirements.
- **Join vertices (multiple input edges)**: When a vertex receives messages from multiple upstream vertices
(a [join](join-vertex.md)), order preservation holds independently for each input edge, _but not across edges_.
Messages from different upstream vertices are interleaved in whatever order they arrive at the ISB. If you need a
global ordering across multiple inputs, you must merge them at a single source or use application-level sequencing.
- **Cycles**: When a [cycle](join-vertex.md#cycles) re-injects a message back into an earlier vertex, that message is
appended to the ISB after messages that have already been written. From the user's perspective, the re-injected
message appears later in the processing order than it did in the original stream. This means order preservation
relative to the original input sequence is disrupted for cycled messages.

## Behavior at Join Vertices

A [join vertex](join-vertex.md) receives messages from two or more upstream vertices. Each input edge has its own set
of ISB partitions, and order preservation applies **per edge**:

- Messages arriving from upstream vertex A are processed in FIFO order relative to each other.
- Messages arriving from upstream vertex B are processed in FIFO order relative to each other.
- However, messages from A and B are **interleaved** in whatever order they arrive at the join vertex. There is no
cross-edge ordering guarantee.

The [example below](#example) demonstrates this: sources `in-1` and `in-2` both feed into the `cat` vertex. Within each source's
stream, order is preserved, but `cat` will interleave messages from `in-1` and `in-2` based on arrival time.

## Behavior in Cycles

A [cycle](join-vertex.md#cycles) sends a message from a vertex back to itself or to a previous vertex. When a message
is re-injected via a cycle:

1. The cycled message is appended to the ISB **after** messages that have already been written by the original upstream
source.
2. From the perspective of the receiving vertex, the cycled message appears as a new, later message - not in its
original position.
3. Therefore, order preservation relative to the original input stream is **not maintained** for cycled messages.

If your use case requires strict ordering and also uses cycles (e.g., retry logic), be aware that retried messages will
be processed after messages that arrived in the interim. Design your application logic accordingly.

## Example

To enable ordered processing, set `ordered.enabled: true` in the pipeline spec. For Map and Sink vertices, also set
`partitions` to the desired number of partitions (which will also be the fixed replica count).
To enable order-preserving processing, set `ordered.enabled: true` in the pipeline spec. For Map and Sink vertices,
also set `partitions` to the desired number of partitions (which will also be the fixed replica count).

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
Expand Down Expand Up @@ -146,8 +201,10 @@ spec:

In the example above:

- `ordered.enabled: true` enables ordered processing pipeline-wide.
- `limits.readBatchSize: 1` is recommended so that each pod processes one message at a time, which is required for
strict in-order guarantees.
- `ordered.enabled: true` enables order-preserving processing pipeline-wide.
- `limits.readBatchSize: 1` is required so that each pod processes one message at a time, which is essential for
strict in-order guarantees.
- The `cat` (Map) and `out` (Sink) vertices each have `partitions: 3`, so they will run with exactly 3 replicas.
- Source vertices (`in-1`, `in-2`) are always ordered and require no extra configuration.
- Source vertices (`in-1`, `in-2`) always preserve input order and require no extra configuration.
- Because `in-1` and `in-2` both feed into `cat` (a join), messages from the two sources are interleaved at `cat`.
Order preservation holds within each source's stream, not across the two sources.
4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,8 @@ type PipelineSpec struct {
// InterStepBuffer configuration specific to this pipeline.
// +optional
InterStepBuffer *InterStepBuffer `json:"interStepBuffer,omitempty" protobuf:"bytes,9,opt,name=interStepBuffer"`
// Ordered enables ordered processing for the entire pipeline.
// When enabled, messages will be processed in order based on their event time.
// Ordered enables order-preserving processing for the entire pipeline.
// When enabled, messages will be processed in their arrival order (FIFO within each partition).
// This can be overridden at the vertex level.
// +optional
Ordered *Ordered `json:"ordered,omitempty" protobuf:"bytes,10,opt,name=ordered"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading