Skip to content

Commit 34f429c

Browse files
committed
feat(flowcontrol): Implement the FlowRegistry
This commit introduces the complete, concrete implementation of the `FlowRegistry`, the stateful control plane for the flow control system. It provides a scalable, concurrent-safe, and robust foundation for managing the lifecycle of all flows, queues, and shards. The architecture is designed to prioritize data path performance and strict correctness for control plane state transitions, decoupling the two via a sharded, asynchronously signaled design. Key architectural features include: - **Serialized Control Plane (Actor Model):** All administrative operations and internal state change events are processed serially by a single background event loop. This fundamental design choice eliminates race conditions for complex, multi-step operations like shard scaling and garbage collection, guaranteeing correctness. - **Sharded Data Plane:** The registry's state is partitioned across multiple `registryShard` instances. This allows the data path (enqueue/dispatch operations) to scale linearly with the number of workers and CPU cores by using fine-grained, per-priority-band locks to minimize global contention. - **Asynchronous, Lock-Free Signaling:** A sophisticated, lock-free atomic state machine is used for signaling between the data path and the control plane (e.g., for queue empty/non-empty transitions). This ensures the data path is never blocked by control plane backpressure, guarantees strictly ordered signals, and prevents lost transitions even under high contention. - **"Trust but Verify" Garbage Collection:** A periodic scanner identifies candidate idle flows using an eventually-consistent cache ("Trust"). Before deletion, it performs a "stop-the-world" live check (locking a single priority band across all shards) to confirm idleness against the ground truth ("Verify"). This provides strong consistency precisely when needed while minimizing data path disruption. - **Immutable Flow Identity (`FlowKey`):** The `FlowKey` (ID + Priority) is treated as an immutable identifier. To change the priority of traffic, a caller simply registers a new flow. The old flow is gracefully and automatically garbage collected once it becomes idle. This design avoids complex and error-prone state migration logic.
1 parent d1fe78f commit 34f429c

File tree

19 files changed

+6006
-1179
lines changed

19 files changed

+6006
-1179
lines changed

GEMINI.md

Lines changed: 358 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package contracts defines the boundaries and service interfaces for the Flow Control system.
18+
//
19+
// Adhering to a "Ports and Adapters" (Hexagonal) architectural style, these interfaces decouple the core
20+
// `controller.FlowController` engine from its dependencies. They establish the required behaviors and system invariants
21+
// that concrete implementations must uphold.
22+
//
23+
// The primary contracts are:
24+
//
25+
// - `FlowRegistry`: The interface for the stateful control plane that manages the lifecycle of flows, queues, and
26+
// policies.
27+
//
28+
// - `SaturationDetector`: The interface for a component that provides real-time load signals.
29+
package contracts

pkg/epp/flowcontrol/contracts/errors.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,23 @@ package contracts
1818

1919
import "errors"
2020

21-
// Registry Errors
2221
var (
23-
// ErrFlowInstanceNotFound indicates that a requested flow instance (a `ManagedQueue`) does not exist in the registry
24-
// shard, either because the flow is not registered or the specific instance (e.g., a draining queue at a particular
25-
// priority) is not present.
22+
// ErrFlowInstanceNotFound indicates that a requested flow instance (a `ManagedQueue`) does not exist.
2623
ErrFlowInstanceNotFound = errors.New("flow instance not found")
2724

28-
// ErrPriorityBandNotFound indicates that a requested priority band does not exist in the registry because it was not
29-
// part of the initial configuration.
25+
// ErrFlowIDEmpty indicates that a flow specification was provided with an empty flow ID.
26+
ErrFlowIDEmpty = errors.New("flow ID cannot be empty")
27+
28+
// ErrPriorityBandNotFound indicates that a requested priority band does not exist in the registry configuration.
3029
ErrPriorityBandNotFound = errors.New("priority band not found")
3130

32-
// ErrPolicyQueueIncompatible indicates that a selected policy is not compatible with the capabilities of the queue it
33-
// is intended to operate on. For example, a policy requiring priority-based peeking is used with a simple FIFO queue.
31+
// ErrPolicyQueueIncompatible indicates that a selected policy is not compatible with the capabilities of the queue.
3432
ErrPolicyQueueIncompatible = errors.New("policy is not compatible with queue capabilities")
33+
34+
// ErrInvalidShardCount indicates that an invalid shard count was provided (e.g., zero or negative).
35+
ErrInvalidShardCount = errors.New("invalid shard count")
36+
37+
// ErrMaxFlowsExceeded indicates that a new flow could not be registered because the configured maximum number of
38+
// concurrent flows has been reached.
39+
ErrMaxFlowsExceeded = errors.New("maximum number of concurrent flows exceeded")
3540
)

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 123 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,109 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its
18-
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
19-
// interfaces represent the "ports" through which the engine communicates.
20-
//
21-
// This package contains the primary service contracts for the Flow Registry, which acts as the control plane for all
22-
// flow state and configuration.
2317
package contracts
2418

2519
import (
2620
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
2721
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2822
)
2923

30-
// RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its
31-
// specific slice (shard) of the `FlowRegistry's` state. It provides a concurrent-safe view of all flow instances, which
32-
// are uniquely identified by their composite `types.FlowKey`. It is the primary contract for performing dispatch
33-
// operations.
24+
// FlowRegistry is the complete interface for the global control plane. An implementation of this interface is the single
25+
// source of truth for all flow control state and configuration.
26+
//
27+
// # Conformance
28+
//
29+
// All methods MUST be goroutine-safe. Implementations are expected to perform complex updates (e.g.,
30+
// `RegisterOrUpdateFlow`) atomically.
31+
//
32+
// # System Invariants
33+
//
34+
// Concrete implementations MUST uphold the following invariants:
35+
// 1. Shard Consistency: All configured priority bands and registered flow instances must exist on every Active shard.
36+
// Plugin instance types must be consistent for a given flow across all shards.
37+
// 2. Flow Instance Uniqueness: Each unique `types.FlowKey` (`ID` + `Priority`) corresponds to exactly one managed flow
38+
// instance.
39+
// 3. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active
40+
// shards.
41+
//
42+
// # Flow Lifecycle
43+
//
44+
// A flow instance (identified by its immutable `FlowKey`) has a simple lifecycle:
45+
//
46+
// - Registered: Known to the `FlowRegistry` via `RegisterOrUpdateFlow`.
47+
// - Idle: Queues are empty across all Active and Draining shards.
48+
// - Garbage Collected (Unregistered): The registry MUST automatically garbage collect flows after they have remained
49+
// Idle for a configurable duration.
50+
//
51+
// # Shard Lifecycle
52+
//
53+
// When a shard is decommissioned, it is marked inactive (`IsActive() == false`) to prevent new enqueues. The shard
54+
// continues to drain and is deleted only after it is empty.
55+
type FlowRegistry interface {
56+
FlowRegistryAdmin
57+
ShardProvider
58+
}
59+
60+
// FlowRegistryAdmin defines the administrative interface for the global control plane.
61+
//
62+
// # Dynamic Update Strategies
63+
//
64+
// The contract specifies behaviors for handling dynamic updates, prioritizing stability and correctness:
65+
//
66+
// - Immutable Flow Identity (`types.FlowKey`): The system treats the `FlowKey` (`ID` + `Priority`) as the immutable
67+
// identifier. Changing the priority of traffic requires registering a new `FlowKey`. The old flow instance is
68+
// automatically garbage collected when Idle. This design eliminates complex priority migration logic.
69+
//
70+
// - Graceful Draining (Shard Scale-Down): Decommissioned shards enter a Draining state. They stop accepting new
71+
// requests but continue to be processed for dispatch until empty.
72+
//
73+
// - Self-Balancing (Shard Scale-Up): When new shards are added, the `controller.FlowController`'s distribution logic
74+
// naturally utilizes them, funneling new requests to the less-loaded shards. Existing queued items are not
75+
// migrated.
76+
type FlowRegistryAdmin interface {
77+
// RegisterOrUpdateFlow handles the registration of a new flow instance or the update of an existing instance's
78+
// specification (for the same `types.FlowKey`). The operation is atomic across all shards.
79+
//
80+
// Since the `FlowKey` (including `Priority`) is immutable, this method cannot change a flow's priority.
81+
// To change priority, the caller should simply register the new `FlowKey`; the old flow instance will be
82+
// automatically garbage collected when it becomes Idle.
83+
//
84+
// Returns errors wrapping:
85+
// - `ErrFlowIDEmpty`: if the provided specification has an empty `FlowKey.ID`.
86+
// - `ErrPriorityBandNotFound`: if the priority level in the `FlowKey` is not configured in the registry.
87+
// - `ErrMaxFlowsExceeded`: if registering this new flow would exceed the configured global maximum.
88+
// It may also return internal errors if plugin instantiation fails.
89+
RegisterOrUpdateFlow(spec types.FlowSpecification) error
90+
91+
// UpdateShardCount dynamically adjusts the number of internal state shards.
92+
//
93+
// The implementation MUST atomically re-partition capacity allocations across all active shards.
94+
// Returns an error wrapping `ErrInvalidShardCount` if `n` is not positive.
95+
UpdateShardCount(n int) error
96+
97+
// Stats returns globally aggregated statistics for the entire `FlowRegistry`.
98+
Stats() AggregateStats
99+
100+
// ShardStats returns a slice of statistics, one for each internal shard. This provides visibility for debugging and
101+
// monitoring per-shard behavior (e.g., identifying hot or stuck shards).
102+
ShardStats() []ShardStats
103+
}
104+
105+
// ShardProvider defines the interface for discovering available shards.
106+
//
107+
// A "shard" is an internal, parallel execution unit that allows the `controller.FlowController`'s core dispatch logic
108+
// to be parallelized, preventing a CPU bottleneck at high request rates. The `FlowRegistry`'s state is sharded to
109+
// support this parallelism by reducing lock contention.
110+
//
111+
// Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard to avoid sending requests to a
112+
// Draining shard.
113+
type ShardProvider interface {
114+
// Shards returns a slice of accessors, one for each internal state shard (Active and Draining).
115+
Shards() []RegistryShard
116+
}
117+
118+
// RegistryShard defines the interface for accessing a specific slice (shard) of the `FlowRegistry's` state.
119+
// It provides a concurrent-safe view for `controller.FlowController` workers.
34120
//
35121
// # Conformance
36122
//
@@ -46,12 +132,12 @@ type RegistryShard interface {
46132
// ManagedQueue retrieves the managed queue for the given, unique `types.FlowKey`. This is the primary method for
47133
// accessing a specific flow's queue for either enqueueing or dispatching requests.
48134
//
49-
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority specified in the key is not configured, or
135+
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority specified in the `key` is not configured, or
50136
// `ErrFlowInstanceNotFound` if no instance exists for the given `key`.
51137
ManagedQueue(key types.FlowKey) (ManagedQueue, error)
52138

53139
// IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard,
54-
// identified by its unique `FlowKey`.
140+
// identified by its unique `types.FlowKey`.
55141
// The registry guarantees that a non-nil default policy (as configured at the priority-band level) is returned if
56142
// none is specified for the flow.
57143
// Returns an error wrapping `ErrFlowInstanceNotFound` if the flow instance does not exist.
@@ -63,8 +149,8 @@ type RegistryShard interface {
63149
InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error)
64150

65151
// PriorityBandAccessor retrieves a read-only accessor for a given priority level, providing a view of the band's
66-
// state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that
67-
// need to inspect and compare multiple flow queues within the same priority band.
152+
// state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that need to
153+
// inspect and compare multiple flow queues within the same priority band.
68154
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
69155
PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error)
70156

@@ -80,24 +166,33 @@ type RegistryShard interface {
80166
}
81167

82168
// ManagedQueue defines the interface for a flow's queue instance on a specific shard.
83-
// It wraps an underlying `framework.SafeQueue`, augmenting it with lifecycle validation against the `FlowRegistry` and
84-
// integrating atomic statistics updates.
169+
// It acts as a stateful decorator around an underlying `framework.SafeQueue`.
85170
//
86171
// # Conformance
87172
//
88-
// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
89-
// - All mutating methods (`Add()`, `Remove()`, `Cleanup()`, `Drain()`) MUST atomically update relevant statistics
90-
// (e.g., queue length, byte size).
173+
// - All methods MUST be goroutine-safe.
174+
// - All mutating methods (`Add()`, `Remove()`, etc.) MUST ensure that the underlying queue state and the statistics
175+
// (`Len`, `ByteSize`) are updated atomically relative to each other.
91176
type ManagedQueue interface {
92177
framework.SafeQueue
93178

94-
// FlowQueueAccessor returns a read-only, flow-aware accessor for this queue.
95-
// This accessor is primarily used by policy plugins to inspect the queue's state in a structured way.
96-
//
179+
// FlowQueueAccessor returns a read-only, flow-aware accessor for this queue, used by policy plugins.
97180
// Conformance: This method MUST NOT return nil.
98181
FlowQueueAccessor() framework.FlowQueueAccessor
99182
}
100183

184+
// AggregateStats holds globally aggregated statistics for the entire `FlowRegistry`.
185+
type AggregateStats struct {
186+
// TotalCapacityBytes is the globally configured maximum total byte size limit across all priority bands and shards.
187+
TotalCapacityBytes uint64
188+
// TotalByteSize is the total byte size of all items currently queued across the entire system.
189+
TotalByteSize uint64
190+
// TotalLen is the total number of items currently queued across the entire system.
191+
TotalLen uint64
192+
// PerPriorityBandStats maps each configured priority level to its globally aggregated statistics.
193+
PerPriorityBandStats map[uint]PriorityBandStats
194+
}
195+
101196
// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
102197
type ShardStats struct {
103198
// TotalCapacityBytes is the optional, maximum total byte size limit aggregated across all priority bands within this
@@ -110,35 +205,22 @@ type ShardStats struct {
110205
// TotalLen is the total number of items currently queued across all priority bands within this shard.
111206
TotalLen uint64
112207
// PerPriorityBandStats maps each configured priority level to its statistics within this shard.
208+
// The capacity values within represent this shard's partition of the global band capacity.
113209
// The key is the numerical priority level.
114210
// All configured priority levels are guaranteed to be represented.
115211
PerPriorityBandStats map[uint]PriorityBandStats
116212
}
117213

118-
// DeepCopy returns a deep copy of the `ShardStats`.
119-
func (s *ShardStats) DeepCopy() ShardStats {
120-
if s == nil {
121-
return ShardStats{}
122-
}
123-
newStats := *s
124-
if s.PerPriorityBandStats != nil {
125-
newStats.PerPriorityBandStats = make(map[uint]PriorityBandStats, len(s.PerPriorityBandStats))
126-
for k, v := range s.PerPriorityBandStats {
127-
newStats.PerPriorityBandStats[k] = v.DeepCopy()
128-
}
129-
}
130-
return newStats
131-
}
132-
133214
// PriorityBandStats holds aggregated statistics for a single priority band.
134215
type PriorityBandStats struct {
135216
// Priority is the numerical priority level this struct describes.
136217
Priority uint
137-
// PriorityName is an optional, human-readable name for the priority level (e.g., "Critical", "Sheddable").
218+
// PriorityName is a human-readable name for the priority band (e.g., "Critical", "Sheddable").
219+
// The registry configuration requires this field, so it is guaranteed to be non-empty.
138220
PriorityName string
139-
// CapacityBytes is the configured maximum total byte size for this priority band, aggregated across all items in
140-
// all flow queues within this band. If scoped to a shard, its value represents the configured band limit for the
141-
// `FlowRegistry` partitioned for this shard.
221+
// CapacityBytes is the configured maximum total byte size for this priority band.
222+
// When viewed via `AggregateStats`, this is the global limit. When viewed via `ShardStats`, this is the partitioned
223+
// value for that specific shard.
142224
// The `controller.FlowController` enforces this limit.
143225
// A default non-zero value is guaranteed if not configured.
144226
CapacityBytes uint64
@@ -147,11 +229,3 @@ type PriorityBandStats struct {
147229
// Len is the total number of items currently queued in this priority band.
148230
Len uint64
149231
}
150-
151-
// DeepCopy returns a deep copy of the `PriorityBandStats`.
152-
func (s *PriorityBandStats) DeepCopy() PriorityBandStats {
153-
if s == nil {
154-
return PriorityBandStats{}
155-
}
156-
return *s
157-
}

0 commit comments

Comments
 (0)