Skip to content

Commit 3d6ed72

Browse files
committed
feat(flowcontrol): Implement the FlowRegistry
This commit introduces the complete, concrete implementation of the `FlowRegistry`, the stateful control plane for the flow control system. It is responsible for managing the lifecycle of all flows, queues, and shards. Key features of this implementation include: - An actor-based, serialized event loop that processes all state changes to ensure correctness and eliminate race conditions in the control plane. - A robust garbage collection system for idle flows and drained components, using a "Trust but Verify" pattern to safely handle races between the data path and control plane. - A well-defined component lifecycle (Active, Draining, Drained) with atomic state transitions and exactly-once edge signaling. - A sharded architecture where the `FlowRegistry` orchestrates the `registryShard` data plane slices.
1 parent fd39cac commit 3d6ed72

File tree

18 files changed

+5425
-866
lines changed

18 files changed

+5425
-866
lines changed

pkg/epp/flowcontrol/contracts/doc.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its
18+
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
19+
// interfaces represent the "ports" through which the engine communicates with external systems and pluggable logic.
20+
//
21+
// The two primary contracts defined here are:
22+
//
23+
// - `FlowRegistry`: The interface for the stateful control plane that manages the lifecycle of all flows, queues, and
24+
// policies.
25+
//
26+
// - `SaturationDetector`: The interface for a component that provides real-time load signals, allowing the dispatch
27+
// engine to react to backend saturation.
28+
package contracts

pkg/epp/flowcontrol/contracts/errors.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@ import "errors"
2020

2121
// Registry Errors
2222
var (
23-
// ErrFlowInstanceNotFound indicates that a requested flow instance (a `ManagedQueue`) does not exist in the registry
24-
// shard, either because the flow is not registered or the specific instance (e.g., a draining queue at a particular
25-
// priority) is not present.
23+
// ErrFlowInstanceNotFound indicates that a requested flow instance (a `ManagedQueue`) does not exist.
2624
ErrFlowInstanceNotFound = errors.New("flow instance not found")
2725

26+
// ErrFlowIDEmpty indicates that a flow specification was provided with an empty flow ID.
27+
ErrFlowIDEmpty = errors.New("flow ID cannot be empty")
28+
2829
// ErrPriorityBandNotFound indicates that a requested priority band does not exist in the registry because it was not
2930
// part of the initial configuration.
3031
ErrPriorityBandNotFound = errors.New("priority band not found")
3132

3233
// ErrPolicyQueueIncompatible indicates that a selected policy is not compatible with the capabilities of the queue it
3334
// is intended to operate on. For example, a policy requiring priority-based peeking is used with a simple FIFO queue.
3435
ErrPolicyQueueIncompatible = errors.New("policy is not compatible with queue capabilities")
36+
37+
// ErrInvalidShardCount indicates that an invalid shard count was provided (e.g., zero).
38+
ErrInvalidShardCount = errors.New("invalid shard count")
3539
)

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 156 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,147 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its
18-
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
19-
// interfaces represent the "ports" through which the engine communicates.
20-
//
21-
// This package contains the primary service contracts for the Flow Registry, which acts as the control plane for all
22-
// flow state and configuration.
2317
package contracts
2418

2519
import (
2620
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2722
)
2823

24+
// FlowRegistry is the complete interface for the global control plane, composed of administrative functions and the
25+
// ability to provide shard accessors. A concrete implementation of this interface is the single source of truth for all
26+
// flow control state and configuration.
27+
//
28+
// # Conformance
29+
//
30+
// All methods defined in this interface (including those embedded) MUST be goroutine-safe.
31+
// Implementations are expected to perform complex updates (e.g., `RegisterOrUpdateFlow`, `UpdateShardCount`) atomically
32+
// to preserve system invariants.
33+
//
34+
// # Invariants
35+
//
36+
// Concrete implementations of FlowRegistry MUST uphold the following invariants across all operations:
37+
// 1. Shard Consistency: All configured priority bands and logical flows must be represented on every Active internal
38+
// shard. Plugin instance types (e.g., the specific `framework.SafeQueue` implementation or policy plugins) must be
39+
// consistent for a given flow or band across all shards.
40+
// 2. Flow Instance Uniqueness per Band: For any given logical flow, there can be a maximum of one `ManagedQueue`
41+
// instance per priority band. An instance can be either Active or Draining.
42+
// 3. Single Active Instance per Flow: For any given logical flow, there can be a maximum of one Active `ManagedQueue`
43+
// instance across all priority bands. All other instances for that flow must be in a Draining state.
44+
// 4. Capacity Partitioning Consistency: Global and per-band capacity limits are uniformly partitioned across all
45+
// active shards. The sum of the capacity limits allocated to each shard must not exceed the globally configured
46+
// limits.
47+
//
48+
// # Flow Lifecycle States
49+
//
50+
// - Registered: A logical flow is Registered when it is known to the `FlowRegistry`. It has exactly one Active
51+
// instance across all priority bands and zero or more Draining instances.
52+
// - Active: A specific instance of a flow within a priority band is Active if it is the designated target for all
53+
// new enqueues for that logical flow.
54+
// - Draining: A flow instance is Draining if it no longer accepts new enqueues but still contains items that are
55+
// eligible for dispatch. This occurs after a priority change.
56+
// - Garbage Collected (Unregistered): A logical flow is automatically unregistered and garbage collected by the
57+
// system when it has been 'idle' for a configurable period. A flow is considered idle if its active queue instance
58+
// has been empty on all active shards for the timeout duration. Once unregistered, it has no active instances,
59+
// though draining instances from previous priority levels may still exist until their queues are also empty.
60+
//
61+
// # Shard Garbage Collection
62+
//
63+
// When a shard is decommissioned via `UpdateShardCount`, the `FlowRegistry` must ensure a graceful shutdown. It must
64+
// mark the shard as inactive to prevent new enqueues, allow the `FlowController` to continue draining its queues, and
65+
// only delete the shard's state after the associated worker has fully terminated and all queues are empty.
66+
type FlowRegistry interface {
67+
FlowRegistryAdmin
68+
ShardProvider
69+
}
70+
71+
// FlowRegistryAdmin defines the administrative interface for the global control plane. This interface is intended for
72+
// external systems to configure flows, manage system parallelism, and query aggregated statistics for observability.
73+
//
74+
// # Design Rationale for Dynamic Update Strategies
75+
//
76+
// The `FlowRegistryAdmin` contract specifies precise behaviors for handling dynamic updates. These strategies were
77+
// chosen to prioritize system stability, correctness, and minimal disruption:
78+
//
79+
// - Graceful Draining (for Priority/Shard Lifecycle Changes): For operations that change a flow's priority or
80+
// decommission a shard, the affected queue instances are marked as inactive but are not immediately deleted. They
81+
// enter a Draining state where they no longer accept new requests but are still processed for dispatch. This
82+
// ensures that requests already accepted by the system are processed to completion. Crucially, requests in a
83+
// draining queue continue to be dispatched according to the priority level and policies they were enqueued with,
84+
// ensuring consistency.
85+
//
86+
// - Atomic Queue Migration (Future Design for Incompatible Intra-Flow Policy Changes): When an intra-flow policy is
87+
// updated to one that is incompatible with the existing queue data structure, the designed future behavior is a
88+
// full "drain and re-enqueue" migration. This more disruptive operation is necessary to guarantee correctness. A
89+
// simpler "graceful drain"—by creating a second instance of the same flow in the same priority band—is not used
90+
// because it would violate the system's "one flow instance per band" invariant. This invariant is critical because
91+
// it ensures that inter-flow policies operate on a clean set of distinct flows, stateful intra-flow policies have a
92+
// single authoritative view of their flow's state, and lookups are unambiguous. Note: This atomic migration is a
93+
// future design consideration and is not implemented in the current version.
94+
//
95+
// - Self-Balancing on Shard Scale-Up: When new shards are added via `UpdateShardCount`, the framework relies on the
96+
// `FlowController`'s request distribution logic (e.g., a "Join the Shortest Queue by Bytes (JSQ-Bytes)" strategy)
97+
// to naturally funnel *new* requests to the less-loaded shards. This design choice strategically avoids the
98+
// complexity of actively migrating or rebalancing existing items that are already queued on other shards, promoting
99+
// system stability during scaling events.
100+
type FlowRegistryAdmin interface {
101+
// RegisterOrUpdateFlow handles the registration of a new flow or the update of an existing flow's specification.
102+
// This method orchestrates complex state transitions atomically across all managed shards.
103+
//
104+
// # Dynamic Update Behaviors
105+
//
106+
// - Priority Changes: If a flow's priority level changes, its current active `ManagedQueue` instance is marked
107+
// as inactive to drain existing requests. A new instance is activated at the new priority level. If a flow is
108+
// updated to a priority level where an instance is already draining (e.g., during a rapid rollback), that
109+
// draining instance is re-activated.
110+
//
111+
// # Returns
112+
//
113+
// - nil on success.
114+
// - An error wrapping `ErrFlowIDEmpty` if `spec.ID` is empty.
115+
// - An error wrapping`ErrPriorityBandNotFound` if `spec.Priority` refers to an unconfigured priority level.
116+
// - Other errors if internal creation/activation of policy or queue instances fail.
117+
RegisterOrUpdateFlow(spec types.FlowSpecification) error
118+
119+
// UpdateShardCount dynamically adjusts the number of internal state shards, triggering a state rebalance.
120+
//
121+
// # Dynamic Update Behaviors
122+
//
123+
// - On Increase: New, empty state shards are initialized with all registered flows. The
124+
// `controller.FlowController`'s request distribution logic will naturally balance load to these new shards over
125+
// time.
126+
// - On Decrease: A specified number of existing shards are marked as inactive. They stop accepting new requests
127+
// but continue to drain existing items. They are fully removed only after their queues are empty.
128+
//
129+
// The implementation MUST atomically re-partition capacity allocations across all active shards when the count
130+
// changes.
131+
UpdateShardCount(n uint) error
132+
133+
// Stats returns globally aggregated statistics for the entire `FlowRegistry`.
134+
Stats() AggregateStats
135+
136+
// ShardStats returns a slice of statistics, one for each internal shard. This provides visibility for debugging and
137+
// monitoring per-shard behavior (e.g., identifying hot or stuck shards).
138+
ShardStats() []ShardStats
139+
}
140+
141+
// ShardProvider defines a minimal interface for consumers that need to discover and iterate over available shards.
142+
//
143+
// A "shard" is an internal, parallel execution unit that allows the `FlowController`'s core dispatch logic to be
144+
// parallelized. Consumers of this interface, such as a request distributor, MUST check `RegistryShard.IsActive()`
145+
// before routing new work to a shard to ensure they do not send requests to a shard that is gracefully draining.
146+
type ShardProvider interface {
147+
// Shards returns a slice of accessors, one for each internal state shard.
148+
//
149+
// A "shard" is an internal, parallel execution unit that allows the `controller.FlowController`'s core dispatch logic
150+
// to be parallelized, preventing a CPU bottleneck at high request rates. The `FlowRegistry`'s state is sharded to
151+
// support this parallelism by reducing lock contention.
152+
//
153+
// The returned slice includes accessors for both active and draining shards. Consumers MUST use `IsActive()` to
154+
// determine if new work should be routed to a shard. Callers should not modify the returned slice.
155+
Shards() []RegistryShard
156+
}
157+
29158
// RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its
30159
// specific slice (shard) of the `FlowRegistry`'s state. It provides the necessary methods for a worker to perform its
31160
// dispatch operations by accessing queues and policies in a concurrent-safe manner.
@@ -80,14 +209,14 @@ type RegistryShard interface {
80209
}
81210

82211
// ManagedQueue defines the interface for a flow's queue instance on a specific shard.
83-
// It wraps an underlying `framework.SafeQueue`, augmenting it with lifecycle validation against the `FlowRegistry` and
84-
// integrating atomic statistics updates.
212+
// It acts as a stateful decorator around an underlying `framework.SafeQueue`, augmenting it with lifecycle validation
213+
// against the `FlowRegistry` and integrating atomic statistics updates.
85214
//
86215
// # Conformance
87216
//
88-
// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
89-
// - The `Add()` method MUST reject new items if the queue has been marked as "draining" by the `FlowRegistry`,
90-
// ensuring that lifecycle changes are respected even by consumers holding a stale pointer to the queue.
217+
// - All methods defined by this interface and the `framework.SafeQueue` it wraps MUST be goroutine-safe.
218+
// - The `Add()` method MUST reject new items if the queue has been marked as Draining by the `FlowRegistry`, ensuring
219+
// that lifecycle changes are respected even by consumers holding a stale pointer to the queue.
91220
// - All mutating methods (`Add()`, `Remove()`, `Cleanup()`, `Drain()`) MUST atomically update relevant statistics
92221
// (e.g., queue length, byte size).
93222
type ManagedQueue interface {
@@ -100,6 +229,18 @@ type ManagedQueue interface {
100229
FlowQueueAccessor() framework.FlowQueueAccessor
101230
}
102231

232+
// AggregateStats holds globally aggregated statistics for the entire `FlowRegistry`.
233+
type AggregateStats struct {
234+
// TotalCapacityBytes is the globally configured maximum total byte size limit across all priority bands and shards.
235+
TotalCapacityBytes uint64
236+
// TotalByteSize is the total byte size of all items currently queued across the entire system.
237+
TotalByteSize uint64
238+
// TotalLen is the total number of items currently queued across the entire system.
239+
TotalLen uint64
240+
// PerPriorityBandStats maps each configured priority level to its globally aggregated statistics.
241+
PerPriorityBandStats map[uint]PriorityBandStats
242+
}
243+
103244
// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
104245
type ShardStats struct {
105246
// TotalCapacityBytes is the optional, maximum total byte size limit aggregated across all priority bands within this
@@ -112,6 +253,7 @@ type ShardStats struct {
112253
// TotalLen is the total number of items currently queued across all priority bands within this shard.
113254
TotalLen uint64
114255
// PerPriorityBandStats maps each configured priority level to its statistics within this shard.
256+
// The capacity values within represent this shard's partition of the global band capacity.
115257
// The key is the numerical priority level.
116258
// All configured priority levels are guaranteed to be represented.
117259
PerPriorityBandStats map[uint]PriorityBandStats
@@ -138,9 +280,9 @@ type PriorityBandStats struct {
138280
Priority uint
139281
// PriorityName is an optional, human-readable name for the priority level (e.g., "Critical", "Sheddable").
140282
PriorityName string
141-
// CapacityBytes is the configured maximum total byte size for this priority band, aggregated across all items in
142-
// all flow queues within this band. If scoped to a shard, its value represents the configured band limit for the
143-
// `FlowRegistry` partitioned for this shard.
283+
// CapacityBytes is the configured maximum total byte size for this priority band.
284+
// When viewed via `AggregateStats`, this is the global limit. When viewed via `ShardStats`, this is the partitioned
285+
// value for that specific shard.
144286
// The `controller.FlowController` enforces this limit.
145287
// A default non-zero value is guaranteed if not configured.
146288
CapacityBytes uint64

0 commit comments

Comments
 (0)