diff --git a/api/json-schema/schema.json b/api/json-schema/schema.json index 5da7255dee..b6b493449b 100644 --- a/api/json-schema/schema.json +++ b/api/json-schema/schema.json @@ -21590,7 +21590,7 @@ }, "ordered": { "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Ordered", - "description": "Ordered enables ordered processing for the entire pipeline. When enabled, messages will be processed in order based on their event time. This can be overridden at the vertex level." + "description": "Ordered enables order-preserving processing for the entire pipeline. When enabled, messages will be processed in their arrival order (FIFO within each partition). This can be overridden at the vertex level." }, "sideInputs": { "description": "SideInputs defines the Side Inputs of a pipeline.", diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 2e0a5e682b..f948838e12 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -21576,7 +21576,7 @@ "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.PipelineLimits" }, "ordered": { - "description": "Ordered enables ordered processing for the entire pipeline. When enabled, messages will be processed in order based on their event time. This can be overridden at the vertex level.", + "description": "Ordered enables order-preserving processing for the entire pipeline. When enabled, messages will be processed in their arrival order (FIFO within each partition). This can be overridden at the vertex level.", "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Ordered" }, "sideInputs": { diff --git a/docs/APIs.md b/docs/APIs.md index f980965c2d..a45796612c 100644 --- a/docs/APIs.md +++ b/docs/APIs.md @@ -8105,9 +8105,9 @@ InterStepBuffer configuration specific to this pipeline. (Optional)
-Ordered enables ordered processing for the entire pipeline. When -enabled, messages will be processed in order based on their event time. -This can be overridden at the vertex level. +Ordered enables order-preserving processing for the entire pipeline. +When enabled, messages will be processed in their arrival order (FIFO +within each partition). This can be overridden at the vertex level.
@@ -8552,9 +8552,9 @@ InterStepBuffer configuration specific to this pipeline. (Optional)-Ordered enables ordered processing for the entire pipeline. When -enabled, messages will be processed in order based on their event time. -This can be overridden at the vertex level. +Ordered enables order-preserving processing for the entire pipeline. +When enabled, messages will be processed in their arrival order (FIFO +within each partition). This can be overridden at the vertex level.
diff --git a/docs/user-guide/reference/ordered-processing.md b/docs/user-guide/reference/ordered-processing.md index ab4168df90..2286fde2ec 100644 --- a/docs/user-guide/reference/ordered-processing.md +++ b/docs/user-guide/reference/ordered-processing.md @@ -2,35 +2,55 @@ > **Available from v1.8** -By default, Numaflow improves throughput by distributing work across any available processing unit, which results in -out-of-order processing. However, some workflows require messages to be processed in a deterministic order — for -example, a create-update-delete sequence where you cannot update a record before it has been created. +By default, Numaflow improves throughput by distributing work across any available processing unit, which means +messages may be processed out of the order in which they arrived. However, some workflows require messages to be +processed in their arrival order - for example, a create-update-delete sequence where you cannot update a record +before it has been created. -Ordered processing in Numaflow provides partitioned FIFO semantics: within a partition, the N-th message is processed -only after the (N-1)-th message completes. +The **ordered processing** feature provides **input-order preservation** with partitioned FIFO semantics: within a +partition, the N-th message is processed only after the (N-1)-th message completes. The ordering is based on the +position of each message in the Inter-Step Buffer (ISB), not on event timestamps. + +## What Order Preservation Means + +It is important to distinguish **order preservation** from **timestamp ordering**: + +| Concept | Meaning | +|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Order preservation (this feature)** | Messages are processed in the same order they were written to the ISB - i.e., FIFO. Message N is processed only after message N-1 completes, within a partition. | +| **Timestamp ordering** | Messages are sorted by event time so that earlier events are processed before later ones. This feature does **not** provide timestamp ordering. | + +Sources may emit events whose event-time timestamps are not monotonically increasing (e.g., late-arriving data, +multiple producers). Order preservation guarantees that messages are processed in their **arrival order** (the order +the source emitted them into the pipeline), regardless of their event timestamps. + +In short: if your source emits messages A, B, C in that sequence, order preservation guarantees they are processed in +the sequence A, B, C - even if B has an earlier event timestamp than A. ## How It Works Ordered processing works differently depending on the vertex type: -| Vertex Type | Behavior | -|-------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **Source** | Always ordered by nature — no configuration needed. | +| Vertex Type | Behavior | +|-------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Source** | Always preserves input order - messages are emitted into the pipeline in the order received from the external source. No configuration needed. | | **Map** | Requires `partitions` to be configured. Replicas are fixed to the `partitions` count. Messages are routed to partitions by key hash, so all messages with the same key are processed by the same pod in order. | -| **Reduce** | Already partitioned and ordered — no additional configuration needed. | -| **Sink** | Same as Map — requires `partitions`. Replicas are fixed to the partition count. | +| **Reduce** | Already partitioned and order-preserving - no additional configuration needed. | +| **Sink** | Same as Map - requires `partitions`. Replicas are fixed to the partition count. | When ordered processing is enabled for a Map or Sink vertex: + - The number of replicas is automatically fixed to the partition count (one pod per partition). -- Autoscaling is disabled for that vertex — you must not set `scale.min` or `scale.max`. -- Messages are routed to partitions by hashing their keys, ensuring all messages with the same key go to the same pod in - FIFO order. +- Autoscaling is disabled for that vertex - you must not set `scale.min` or `scale.max`. +- Messages are routed to partitions by hashing their keys, ensuring all messages with the same key go to the same pod, + preserving their arrival order. ## Pipeline Specification -Enable ordered processing by setting `ordered.enabled: true` at the pipeline level. For Map and Sink vertices, set -`partitions` to the number of ordered lanes you need — the controller will fix replicas to that count automatically. +Enable order-preserving processing by setting `ordered.enabled: true` at the pipeline level. For Map and Sink +vertices, set `partitions` to the number of ordered lanes you need - the controller will fix replicas to that count +automatically. ```yaml apiVersion: numaflow.numaproj.io/v1alpha1 @@ -41,10 +61,10 @@ spec: limits: readBatchSize: 1 # recommended for strict ordering ordered: - enabled: true # enable ordered processing pipeline-wide + enabled: true # enable order-preserving processing pipeline-wide vertices: - name: my-source - source: {} # always ordered; no extra config needed + source: {} # always preserves input order; no extra config needed - name: my-map partitions: N # N replicas will be created, one per partition udf: @@ -62,7 +82,7 @@ spec: ## Per-Vertex Override -Ordered processing can also be enabled or disabled at the individual vertex level, which overrides the pipeline-level +Ordered processing can also be enabled or disabled at the individual vertex level, which overrides the pipeline-level setting. This is useful when you want most vertices to run unordered for throughput, but need ordering for specific steps. ```yaml @@ -86,7 +106,7 @@ spec: Setting `partitions` on a vertex that has ordered processing disabled (or overridden to `false`) is valid. The vertex will still have N ISB buffer partitions created, giving you a [multi-partitioned edge](multi-partition.md) for higher -throughput. However, replicas are determined by normal autoscaling — they are not fixed to N — so multiple replicas may +throughput. However, replicas are determined by normal autoscaling - they are not fixed to N - so multiple replicas may read from the same partition, or one replica may handle multiple partitions. There is no ordering guarantee in this case. @@ -94,19 +114,54 @@ case. - **Autoscaling is not supported** for Map and Sink vertices with ordered processing enabled. The replica count is fixed to the partition count. Setting `scale.min` or `scale.max` on such vertices will cause a validation error. -- **Reduce vertices** are already partitioned and ordered by design; the `ordered` setting is ignored for them. -- **Source vertices** are always ordered regardless of the `ordered` setting. -- **Key-based routing**: ordering is guaranteed per key. Messages with different keys may still be interleaved across +- **Reduce vertices** are already partitioned and order-preserving by design; the `ordered` setting is ignored for them. +- **Source vertices** always preserve input order regardless of the `ordered` setting. +- **Key-based routing**: ordering is guaranteed per key. Messages with different keys may still be interleaved across partitions. Ensure your UDF or SDK sets meaningful message keys to leverage per-key ordering. -- **`readBatchSize: 1`** is strongly recommended for strict ordering. With a larger batch size, multiple messages may +- **`readBatchSize: 1`** is required for strict ordering. With a larger batch size, multiple messages may be in-flight simultaneously within a single pod. -- **Throughput trade-off**: ordered processing limits parallelism within a partition. Consider the number of partitions +- **Throughput trade-off**: ordered processing limits parallelism within a partition. Consider the number of partitions carefully to balance ordering guarantees with throughput requirements. +- **Join vertices (multiple input edges)**: When a vertex receives messages from multiple upstream vertices + (a [join](join-vertex.md)), order preservation holds independently for each input edge, _but not across edges_. + Messages from different upstream vertices are interleaved in whatever order they arrive at the ISB. If you need a + global ordering across multiple inputs, you must merge them at a single source or use application-level sequencing. +- **Cycles**: When a [cycle](join-vertex.md#cycles) re-injects a message back into an earlier vertex, that message is + appended to the ISB after messages that have already been written. From the user's perspective, the re-injected + message appears later in the processing order than it did in the original stream. This means order preservation + relative to the original input sequence is disrupted for cycled messages. + +## Behavior at Join Vertices + +A [join vertex](join-vertex.md) receives messages from two or more upstream vertices. Each input edge has its own set +of ISB partitions, and order preservation applies **per edge**: + +- Messages arriving from upstream vertex A are processed in FIFO order relative to each other. +- Messages arriving from upstream vertex B are processed in FIFO order relative to each other. +- However, messages from A and B are **interleaved** in whatever order they arrive at the join vertex. There is no + cross-edge ordering guarantee. + +The [example below](#example) demonstrates this: sources `in-1` and `in-2` both feed into the `cat` vertex. Within each source's +stream, order is preserved, but `cat` will interleave messages from `in-1` and `in-2` based on arrival time. + +## Behavior in Cycles + +A [cycle](join-vertex.md#cycles) sends a message from a vertex back to itself or to a previous vertex. When a message +is re-injected via a cycle: + +1. The cycled message is appended to the ISB **after** messages that have already been written by the original upstream + source. +2. From the perspective of the receiving vertex, the cycled message appears as a new, later message - not in its + original position. +3. Therefore, order preservation relative to the original input stream is **not maintained** for cycled messages. + +If your use case requires strict ordering and also uses cycles (e.g., retry logic), be aware that retried messages will +be processed after messages that arrived in the interim. Design your application logic accordingly. ## Example -To enable ordered processing, set `ordered.enabled: true` in the pipeline spec. For Map and Sink vertices, also set -`partitions` to the desired number of partitions (which will also be the fixed replica count). +To enable order-preserving processing, set `ordered.enabled: true` in the pipeline spec. For Map and Sink vertices, +also set `partitions` to the desired number of partitions (which will also be the fixed replica count). ```yaml apiVersion: numaflow.numaproj.io/v1alpha1 @@ -146,8 +201,10 @@ spec: In the example above: -- `ordered.enabled: true` enables ordered processing pipeline-wide. -- `limits.readBatchSize: 1` is recommended so that each pod processes one message at a time, which is required for - strict in-order guarantees. +- `ordered.enabled: true` enables order-preserving processing pipeline-wide. +- `limits.readBatchSize: 1` is required so that each pod processes one message at a time, which is essential for +strict in-order guarantees. - The `cat` (Map) and `out` (Sink) vertices each have `partitions: 3`, so they will run with exactly 3 replicas. -- Source vertices (`in-1`, `in-2`) are always ordered and require no extra configuration. +- Source vertices (`in-1`, `in-2`) always preserve input order and require no extra configuration. +- Because `in-1` and `in-2` both feed into `cat` (a join), messages from the two sources are interleaved at `cat`. + Order preservation holds within each source's stream, not across the two sources. diff --git a/pkg/apis/numaflow/v1alpha1/generated.proto b/pkg/apis/numaflow/v1alpha1/generated.proto index a668932a24..561d19d0ce 100644 --- a/pkg/apis/numaflow/v1alpha1/generated.proto +++ b/pkg/apis/numaflow/v1alpha1/generated.proto @@ -1346,8 +1346,8 @@ message PipelineSpec { // +optional optional InterStepBuffer interStepBuffer = 9; - // Ordered enables ordered processing for the entire pipeline. - // When enabled, messages will be processed in order based on their event time. + // Ordered enables order-preserving processing for the entire pipeline. + // When enabled, messages will be processed in their arrival order (FIFO within each partition). // This can be overridden at the vertex level. // +optional optional Ordered ordered = 10; diff --git a/pkg/apis/numaflow/v1alpha1/pipeline_types.go b/pkg/apis/numaflow/v1alpha1/pipeline_types.go index 70335e9e9b..ff945cee76 100644 --- a/pkg/apis/numaflow/v1alpha1/pipeline_types.go +++ b/pkg/apis/numaflow/v1alpha1/pipeline_types.go @@ -516,8 +516,8 @@ type PipelineSpec struct { // InterStepBuffer configuration specific to this pipeline. // +optional InterStepBuffer *InterStepBuffer `json:"interStepBuffer,omitempty" protobuf:"bytes,9,opt,name=interStepBuffer"` - // Ordered enables ordered processing for the entire pipeline. - // When enabled, messages will be processed in order based on their event time. + // Ordered enables order-preserving processing for the entire pipeline. + // When enabled, messages will be processed in their arrival order (FIFO within each partition). // This can be overridden at the vertex level. // +optional Ordered *Ordered `json:"ordered,omitempty" protobuf:"bytes,10,opt,name=ordered"` diff --git a/pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go b/pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go index ac6038074b..b3ac3ef8ae 100644 --- a/pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go +++ b/pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go @@ -4092,7 +4092,7 @@ func schema_pkg_apis_numaflow_v1alpha1_PipelineSpec(ref common.ReferenceCallback }, "ordered": { SchemaProps: spec.SchemaProps{ - Description: "Ordered enables ordered processing for the entire pipeline. When enabled, messages will be processed in order based on their event time. This can be overridden at the vertex level.", + Description: "Ordered enables order-preserving processing for the entire pipeline. When enabled, messages will be processed in their arrival order (FIFO within each partition). This can be overridden at the vertex level.", Ref: ref("github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1.Ordered"), }, },