Skip to content

Commit d680b2e

Browse files
doc: improve ordered doc (#3304)
Signed-off-by: Vigith Maurice <vigith@gmail.com> Co-authored-by: Vaibhav Kant Tiwari <vaibhav.tiwari33@gmail.com>
1 parent 7cee4a6 commit d680b2e

File tree

7 files changed

+100
-43
lines changed

7 files changed

+100
-43
lines changed

api/json-schema/schema.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21590,7 +21590,7 @@
2159021590
},
2159121591
"ordered": {
2159221592
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Ordered",
21593-
"description": "Ordered enables ordered processing for the entire pipeline. When enabled, messages will be processed in order based on their event time. This can be overridden at the vertex level."
21593+
"description": "Ordered enables order-preserving processing for the entire pipeline. When enabled, messages will be processed in their arrival order (FIFO within each partition). This can be overridden at the vertex level."
2159421594
},
2159521595
"sideInputs": {
2159621596
"description": "SideInputs defines the Side Inputs of a pipeline.",

api/openapi-spec/swagger.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21576,7 +21576,7 @@
2157621576
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.PipelineLimits"
2157721577
},
2157821578
"ordered": {
21579-
"description": "Ordered enables ordered processing for the entire pipeline. When enabled, messages will be processed in order based on their event time. This can be overridden at the vertex level.",
21579+
"description": "Ordered enables order-preserving processing for the entire pipeline. When enabled, messages will be processed in their arrival order (FIFO within each partition). This can be overridden at the vertex level.",
2158021580
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Ordered"
2158121581
},
2158221582
"sideInputs": {

docs/APIs.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8105,9 +8105,9 @@ InterStepBuffer configuration specific to this pipeline.
81058105
<em>(Optional)</em>
81068106
<p>
81078107

8108-
Ordered enables ordered processing for the entire pipeline. When
8109-
enabled, messages will be processed in order based on their event time.
8110-
This can be overridden at the vertex level.
8108+
Ordered enables order-preserving processing for the entire pipeline.
8109+
When enabled, messages will be processed in their arrival order (FIFO
8110+
within each partition). This can be overridden at the vertex level.
81118111
</p>
81128112

81138113
</td>
@@ -8552,9 +8552,9 @@ InterStepBuffer configuration specific to this pipeline.
85528552
<em>(Optional)</em>
85538553
<p>
85548554

8555-
Ordered enables ordered processing for the entire pipeline. When
8556-
enabled, messages will be processed in order based on their event time.
8557-
This can be overridden at the vertex level.
8555+
Ordered enables order-preserving processing for the entire pipeline.
8556+
When enabled, messages will be processed in their arrival order (FIFO
8557+
within each partition). This can be overridden at the vertex level.
85588558
</p>
85598559

85608560
</td>

docs/user-guide/reference/ordered-processing.md

Lines changed: 87 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,55 @@
22

33
> **Available from v1.8**
44
5-
By default, Numaflow improves throughput by distributing work across any available processing unit, which results in
6-
out-of-order processing. However, some workflows require messages to be processed in a deterministic order — for
7-
example, a create-update-delete sequence where you cannot update a record before it has been created.
5+
By default, Numaflow improves throughput by distributing work across any available processing unit, which means
6+
messages may be processed out of the order in which they arrived. However, some workflows require messages to be
7+
processed in their arrival order - for example, a create-update-delete sequence where you cannot update a record
8+
before it has been created.
89

9-
Ordered processing in Numaflow provides partitioned FIFO semantics: within a partition, the N-th message is processed
10-
only after the (N-1)-th message completes.
10+
The **ordered processing** feature provides **input-order preservation** with partitioned FIFO semantics: within a
11+
partition, the N-th message is processed only after the (N-1)-th message completes. The ordering is based on the
12+
position of each message in the Inter-Step Buffer (ISB), not on event timestamps.
13+
14+
## What Order Preservation Means
15+
16+
It is important to distinguish **order preservation** from **timestamp ordering**:
17+
18+
| Concept | Meaning |
19+
|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
20+
| **Order preservation (this feature)** | Messages are processed in the same order they were written to the ISB - i.e., FIFO. Message N is processed only after message N-1 completes, within a partition. |
21+
| **Timestamp ordering** | Messages are sorted by event time so that earlier events are processed before later ones. This feature does **not** provide timestamp ordering. |
22+
23+
Sources may emit events whose event-time timestamps are not monotonically increasing (e.g., late-arriving data,
24+
multiple producers). Order preservation guarantees that messages are processed in their **arrival order** (the order
25+
the source emitted them into the pipeline), regardless of their event timestamps.
26+
27+
In short: if your source emits messages A, B, C in that sequence, order preservation guarantees they are processed in
28+
the sequence A, B, C - even if B has an earlier event timestamp than A.
1129

1230
## How It Works
1331

1432
Ordered processing works differently depending on the vertex type:
1533

16-
| Vertex Type | Behavior |
17-
|-------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
18-
| **Source** | Always ordered by nature — no configuration needed. |
34+
| Vertex Type | Behavior |
35+
|-------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
36+
| **Source** | Always preserves input order - messages are emitted into the pipeline in the order received from the external source. No configuration needed. |
1937
| **Map** | Requires `partitions` to be configured. Replicas are fixed to the `partitions` count. Messages are routed to partitions by key hash, so all messages with the same key are processed by the same pod in order. |
20-
| **Reduce** | Already partitioned and ordered — no additional configuration needed. |
21-
| **Sink** | Same as Map requires `partitions`. Replicas are fixed to the partition count. |
38+
| **Reduce** | Already partitioned and order-preserving - no additional configuration needed. |
39+
| **Sink** | Same as Map - requires `partitions`. Replicas are fixed to the partition count. |
2240

2341
When ordered processing is enabled for a Map or Sink vertex:
42+
2443
- The number of replicas is automatically fixed to the partition count (one pod per partition).
25-
- Autoscaling is disabled for that vertex you must not set `scale.min` or `scale.max`.
26-
- Messages are routed to partitions by hashing their keys, ensuring all messages with the same key go to the same pod in
27-
FIFO order.
44+
- Autoscaling is disabled for that vertex - you must not set `scale.min` or `scale.max`.
45+
- Messages are routed to partitions by hashing their keys, ensuring all messages with the same key go to the same pod,
46+
preserving their arrival order.
2847

2948

3049
## Pipeline Specification
3150

32-
Enable ordered processing by setting `ordered.enabled: true` at the pipeline level. For Map and Sink vertices, set
33-
`partitions` to the number of ordered lanes you need — the controller will fix replicas to that count automatically.
51+
Enable order-preserving processing by setting `ordered.enabled: true` at the pipeline level. For Map and Sink
52+
vertices, set `partitions` to the number of ordered lanes you need - the controller will fix replicas to that count
53+
automatically.
3454

3555
```yaml
3656
apiVersion: numaflow.numaproj.io/v1alpha1
@@ -41,10 +61,10 @@ spec:
4161
limits:
4262
readBatchSize: 1 # recommended for strict ordering
4363
ordered:
44-
enabled: true # enable ordered processing pipeline-wide
64+
enabled: true # enable order-preserving processing pipeline-wide
4565
vertices:
4666
- name: my-source
47-
source: {} # always ordered; no extra config needed
67+
source: {} # always preserves input order; no extra config needed
4868
- name: my-map
4969
partitions: N # N replicas will be created, one per partition
5070
udf:
@@ -62,7 +82,7 @@ spec:
6282
6383
## Per-Vertex Override
6484
65-
Ordered processing can also be enabled or disabled at the individual vertex level, which overrides the pipeline-level
85+
Ordered processing can also be enabled or disabled at the individual vertex level, which overrides the pipeline-level
6686
setting. This is useful when you want most vertices to run unordered for throughput, but need ordering for specific steps.
6787
6888
```yaml
@@ -86,27 +106,62 @@ spec:
86106

87107
Setting `partitions` on a vertex that has ordered processing disabled (or overridden to `false`) is valid. The vertex
88108
will still have N ISB buffer partitions created, giving you a [multi-partitioned edge](multi-partition.md) for higher
89-
throughput. However, replicas are determined by normal autoscaling they are not fixed to N so multiple replicas may
109+
throughput. However, replicas are determined by normal autoscaling - they are not fixed to N - so multiple replicas may
90110
read from the same partition, or one replica may handle multiple partitions. There is no ordering guarantee in this
91111
case.
92112

93113
## Caveats and Limitations
94114

95115
- **Autoscaling is not supported** for Map and Sink vertices with ordered processing enabled. The replica count is fixed
96116
to the partition count. Setting `scale.min` or `scale.max` on such vertices will cause a validation error.
97-
- **Reduce vertices** are already partitioned and ordered by design; the `ordered` setting is ignored for them.
98-
- **Source vertices** are always ordered regardless of the `ordered` setting.
99-
- **Key-based routing**: ordering is guaranteed per key. Messages with different keys may still be interleaved across
117+
- **Reduce vertices** are already partitioned and order-preserving by design; the `ordered` setting is ignored for them.
118+
- **Source vertices** always preserve input order regardless of the `ordered` setting.
119+
- **Key-based routing**: ordering is guaranteed per key. Messages with different keys may still be interleaved across
100120
partitions. Ensure your UDF or SDK sets meaningful message keys to leverage per-key ordering.
101-
- **`readBatchSize: 1`** is strongly recommended for strict ordering. With a larger batch size, multiple messages may
121+
- **`readBatchSize: 1`** is required for strict ordering. With a larger batch size, multiple messages may
102122
be in-flight simultaneously within a single pod.
103-
- **Throughput trade-off**: ordered processing limits parallelism within a partition. Consider the number of partitions
123+
- **Throughput trade-off**: ordered processing limits parallelism within a partition. Consider the number of partitions
104124
carefully to balance ordering guarantees with throughput requirements.
125+
- **Join vertices (multiple input edges)**: When a vertex receives messages from multiple upstream vertices
126+
(a [join](join-vertex.md)), order preservation holds independently for each input edge, _but not across edges_.
127+
Messages from different upstream vertices are interleaved in whatever order they arrive at the ISB. If you need a
128+
global ordering across multiple inputs, you must merge them at a single source or use application-level sequencing.
129+
- **Cycles**: When a [cycle](join-vertex.md#cycles) re-injects a message back into an earlier vertex, that message is
130+
appended to the ISB after messages that have already been written. From the user's perspective, the re-injected
131+
message appears later in the processing order than it did in the original stream. This means order preservation
132+
relative to the original input sequence is disrupted for cycled messages.
133+
134+
## Behavior at Join Vertices
135+
136+
A [join vertex](join-vertex.md) receives messages from two or more upstream vertices. Each input edge has its own set
137+
of ISB partitions, and order preservation applies **per edge**:
138+
139+
- Messages arriving from upstream vertex A are processed in FIFO order relative to each other.
140+
- Messages arriving from upstream vertex B are processed in FIFO order relative to each other.
141+
- However, messages from A and B are **interleaved** in whatever order they arrive at the join vertex. There is no
142+
cross-edge ordering guarantee.
143+
144+
The [example below](#example) demonstrates this: sources `in-1` and `in-2` both feed into the `cat` vertex. Within each source's
145+
stream, order is preserved, but `cat` will interleave messages from `in-1` and `in-2` based on arrival time.
146+
147+
## Behavior in Cycles
148+
149+
A [cycle](join-vertex.md#cycles) sends a message from a vertex back to itself or to a previous vertex. When a message
150+
is re-injected via a cycle:
151+
152+
1. The cycled message is appended to the ISB **after** messages that have already been written by the original upstream
153+
source.
154+
2. From the perspective of the receiving vertex, the cycled message appears as a new, later message - not in its
155+
original position.
156+
3. Therefore, order preservation relative to the original input stream is **not maintained** for cycled messages.
157+
158+
If your use case requires strict ordering and also uses cycles (e.g., retry logic), be aware that retried messages will
159+
be processed after messages that arrived in the interim. Design your application logic accordingly.
105160

106161
## Example
107162

108-
To enable ordered processing, set `ordered.enabled: true` in the pipeline spec. For Map and Sink vertices, also set
109-
`partitions` to the desired number of partitions (which will also be the fixed replica count).
163+
To enable order-preserving processing, set `ordered.enabled: true` in the pipeline spec. For Map and Sink vertices,
164+
also set `partitions` to the desired number of partitions (which will also be the fixed replica count).
110165

111166
```yaml
112167
apiVersion: numaflow.numaproj.io/v1alpha1
@@ -146,8 +201,10 @@ spec:
146201

147202
In the example above:
148203

149-
- `ordered.enabled: true` enables ordered processing pipeline-wide.
150-
- `limits.readBatchSize: 1` is recommended so that each pod processes one message at a time, which is required for
151-
strict in-order guarantees.
204+
- `ordered.enabled: true` enables order-preserving processing pipeline-wide.
205+
- `limits.readBatchSize: 1` is required so that each pod processes one message at a time, which is essential for
206+
strict in-order guarantees.
152207
- The `cat` (Map) and `out` (Sink) vertices each have `partitions: 3`, so they will run with exactly 3 replicas.
153-
- Source vertices (`in-1`, `in-2`) are always ordered and require no extra configuration.
208+
- Source vertices (`in-1`, `in-2`) always preserve input order and require no extra configuration.
209+
- Because `in-1` and `in-2` both feed into `cat` (a join), messages from the two sources are interleaved at `cat`.
210+
Order preservation holds within each source's stream, not across the two sources.

pkg/apis/numaflow/v1alpha1/generated.proto

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/numaflow/v1alpha1/pipeline_types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,8 +516,8 @@ type PipelineSpec struct {
516516
// InterStepBuffer configuration specific to this pipeline.
517517
// +optional
518518
InterStepBuffer *InterStepBuffer `json:"interStepBuffer,omitempty" protobuf:"bytes,9,opt,name=interStepBuffer"`
519-
// Ordered enables ordered processing for the entire pipeline.
520-
// When enabled, messages will be processed in order based on their event time.
519+
// Ordered enables order-preserving processing for the entire pipeline.
520+
// When enabled, messages will be processed in their arrival order (FIFO within each partition).
521521
// This can be overridden at the vertex level.
522522
// +optional
523523
Ordered *Ordered `json:"ordered,omitempty" protobuf:"bytes,10,opt,name=ordered"`

pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)