Skip to content

Commit 6b26721

Browse files
committed
feat(flowcontrol): Implement the FlowRegistry
This commit introduces the complete, concrete implementation of the `FlowRegistry`, the stateful control plane for the flow control system. It is responsible for managing the lifecycle of all flows, queues, and shards. Key features of this implementation include: - An actor-based, serialized event loop that processes all state changes to ensure correctness and eliminate race conditions in the control plane. - A robust garbage collection system for idle flows and drained components, using a "Trust but Verify" pattern to safely handle races between the data path and control plane. - A well-defined component lifecycle (Active, Draining, Drained) with atomic state transitions and exactly-once edge signaling. - A sharded architecture where the `FlowRegistry` orchestrates the `registryShard` data plane slices.
1 parent fd39cac commit 6b26721

File tree

17 files changed

+4220
-858
lines changed

17 files changed

+4220
-858
lines changed

pkg/epp/flowcontrol/contracts/doc.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its
18+
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
19+
// interfaces represent the "ports" through which the engine communicates with external systems and pluggable logic.
20+
//
21+
// The two primary contracts defined here are:
22+
//
23+
// - `FlowRegistry`: The interface for the stateful control plane that manages the lifecycle of all flows, queues, and
24+
// policies.
25+
//
26+
// - `SaturationDetector`: The interface for a component that provides real-time load signals, allowing the dispatch
27+
// engine to react to backend saturation.
28+
package contracts

pkg/epp/flowcontrol/contracts/errors.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@ import "errors"
2020

2121
// Registry Errors
2222
var (
23-
// ErrFlowInstanceNotFound indicates that a requested flow instance (a `ManagedQueue`) does not exist in the registry
24-
// shard, either because the flow is not registered or the specific instance (e.g., a draining queue at a particular
25-
// priority) is not present.
23+
// ErrFlowInstanceNotFound indicates that a requested flow instance (a `ManagedQueue`) does not exist.
2624
ErrFlowInstanceNotFound = errors.New("flow instance not found")
2725

26+
// ErrFlowIDEmpty indicates that a flow specification was provided with an empty flow ID.
27+
ErrFlowIDEmpty = errors.New("flow ID cannot be empty")
28+
2829
// ErrPriorityBandNotFound indicates that a requested priority band does not exist in the registry because it was not
2930
// part of the initial configuration.
3031
ErrPriorityBandNotFound = errors.New("priority band not found")
3132

3233
// ErrPolicyQueueIncompatible indicates that a selected policy is not compatible with the capabilities of the queue it
3334
// is intended to operate on. For example, a policy requiring priority-based peeking is used with a simple FIFO queue.
3435
ErrPolicyQueueIncompatible = errors.New("policy is not compatible with queue capabilities")
36+
37+
// ErrInvalidShardCount indicates that an invalid shard count was provided (e.g., zero).
38+
ErrInvalidShardCount = errors.New("invalid shard count")
3539
)

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 146 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,144 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its
18-
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
19-
// interfaces represent the "ports" through which the engine communicates.
20-
//
21-
// This package contains the primary service contracts for the Flow Registry, which acts as the control plane for all
22-
// flow state and configuration.
2317
package contracts
2418

2519
import (
2620
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2722
)
2823

24+
// FlowRegistry is the complete interface for the global control plane, composed of administrative functions and the
25+
// ability to provide shard accessors. A concrete implementation of this interface is the single source of truth for all
26+
// flow control state and configuration.
27+
//
28+
// # Conformance
29+
//
30+
// All methods defined in this interface (including those embedded) MUST be goroutine-safe.
31+
// Implementations are expected to perform complex updates (e.g., RegisterOrUpdateFlow, UpdateShardCount) atomically to
32+
// preserve system invariants.
33+
//
34+
// # Invariants
35+
//
36+
// Concrete implementations of FlowRegistry MUST uphold the following invariants across all operations:
37+
// 1. Shard Consistency: All configured priority bands and logical flows must be represented on every active internal
38+
// shard. Plugin instance types (e.g., the specific SafeQueue implementation or policy plugins) must be consistent
39+
// for a given flow or band across all shards.
40+
// 2. Flow Instance Uniqueness per Band: For any given logical flow, there can be a maximum of one ManagedQueue
41+
// instance per priority band. An instance can be either 'active' or 'draining'.
42+
// 3. Single Active Instance per Flow: For any given logical flow, there can be a maximum of one *active* ManagedQueue
43+
// instance across all priority bands. All other instances for that flow must be in a 'draining' state.
44+
// 4. Capacity Partitioning Consistency: Global and per-band capacity limits are uniformly partitioned across all
45+
// active shards. The sum of the capacity limits allocated to each shard must not exceed the globally configured
46+
// limits.
47+
//
48+
// # Flow Lifecycle States
49+
//
50+
// - Registered: A logical flow is 'registered' when it is known to the FlowRegistry. It has exactly one 'active'
51+
// instance across all priority bands and zero or more 'draining' instances.
52+
// - Active: A specific instance of a flow within a priority band is 'active' if it is the designated target for all
53+
// new enqueues for that logical flow.
54+
// - Draining: A flow instance is 'draining' if it no longer accepts new enqueues but still contains items that are
55+
// eligible for dispatch. This occurs after a priority change or when a flow is unregistered.
56+
// - Unregistered: A logical flow is 'unregistered' after a call to UnregisterFlow. It has no 'active' instances,
57+
// though 'draining' instances may still exist until their queues are empty.
58+
//
59+
// # Shard Garbage Collection
60+
//
61+
// When a shard is decommissioned via UpdateShardCount, the FlowRegistry must ensure a graceful shutdown. It must mark
62+
// the shard as inactive to prevent new enqueues, allow the FlowController to continue draining its queues, and only
63+
// delete the shard's state after the associated worker has fully terminated and all queues are empty.
64+
type FlowRegistry interface {
65+
FlowRegistryAdmin
66+
ShardProvider
67+
}
68+
69+
// FlowRegistryAdmin defines the administrative interface for the global control plane. This interface is intended for
70+
// external systems (like a Kubernetes operator) to configure flows, manage system parallelism, and query aggregated
71+
// statistics for observability.
72+
//
73+
// # Design Rationale for Dynamic Update Strategies
74+
//
75+
// The `FlowRegistryAdmin` contract specifies precise behaviors for handling dynamic updates. These strategies were chosen
76+
// to prioritize system stability, correctness, and minimal disruption:
77+
//
78+
// - Graceful Draining (for Priority/Shard Lifecycle Changes): For operations that change a flow's priority or
79+
// decommission a shard, the affected queue instances are marked as inactive but are not immediately deleted. They enter
80+
// a "draining" state where they no longer accept new requests but are still processed for dispatch. This ensures that
81+
// requests already accepted by the system are processed to completion. Crucially, requests in a draining queue continue
82+
// to be dispatched according to the priority level and policies they were enqueued with, ensuring consistency.
83+
//
84+
// - Atomic Queue Migration (Future Design for Incompatible Intra-Flow Policy Changes): When an intra-flow policy is
85+
// updated to one that is incompatible with the existing queue data structure, the designed future behavior is a
86+
// full "drain and re-enqueue" migration. This more disruptive operation is necessary to guarantee correctness. A
87+
// simpler "graceful drain"—by creating a second instance of the same flow in the same priority band—is not used
88+
// because it would violate the system's "one flow instance per band" invariant. This invariant is critical because
89+
// it ensures that inter-flow policies operate on a clean set of distinct flows, stateful intra-flow policies have a
90+
// single authoritative view of their flow's state, and lookups are unambiguous. Note: This atomic migration is a
91+
// future design consideration and is not implemented in the current version.
92+
//
93+
// - Self-Balancing on Shard Scale-Up: When new shards are added via `UpdateShardCount`, the framework relies on the
94+
// `FlowController`'s request distribution logic (e.g., a "Join the Shortest Queue by Bytes (JSQ-Bytes)" strategy) to
95+
// naturally funnel *new* requests to the less-loaded shards. This design choice strategically avoids the complexity of
96+
// actively migrating or rebalancing existing items that are already queued on other shards, promoting system stability
97+
// during scaling events.
98+
type FlowRegistryAdmin interface {
99+
// RegisterOrUpdateFlow handles the registration of a new flow or the update of an existing flow's specification.
100+
// This method orchestrates complex state transitions atomically across all managed shards.
101+
//
102+
// # Dynamic Update Behaviors
103+
//
104+
// - Priority Changes: If a flow's priority level changes, its current active `ManagedQueue` instance is marked
105+
// as inactive to drain existing requests. A new instance is activated at the new priority level. If a flow is
106+
// updated to a priority level where an instance is already draining (e.g., during a rapid rollback), that
107+
// draining instance is re-activated.
108+
//
109+
// # Returns
110+
//
111+
// - nil on success.
112+
// - An error wrapping ErrFlowIDEmpty if spec.ID is empty.
113+
// - An error wrapping ErrPriorityBandNotFound if spec.Priority refers to an unconfigured priority level.
114+
// - Other errors if internal creation/activation of policy or queue instances fail.
115+
RegisterOrUpdateFlow(spec types.FlowSpecification) error
116+
117+
// UpdateShardCount dynamically adjusts the number of internal state shards, triggering a state rebalance.
118+
//
119+
// # Dynamic Update Behaviors
120+
//
121+
// - On Increase: New, empty state shards are initialized with all registered flows. The FlowController's request
122+
// distribution logic will naturally balance load to these new shards over time.
123+
// - On Decrease: A specified number of existing shards are marked as inactive. They stop accepting new requests
124+
// but continue to drain existing items. They are fully removed only after their queues are empty.
125+
//
126+
// The implementation MUST atomically re-partition capacity allocations across all active shards when the count
127+
// changes.
128+
UpdateShardCount(n uint) error
129+
130+
// Stats returns globally aggregated statistics for the entire FlowRegistry.
131+
Stats() AggregateStats
132+
133+
// ShardStats returns a slice of statistics, one for each internal shard. This provides visibility for debugging and
134+
// monitoring per-shard behavior (e.g., identifying hot or stuck shards).
135+
ShardStats() []ShardStats
136+
}
137+
138+
// ShardProvider defines a minimal interface for consumers that need to discover and iterate over available shards.
139+
//
140+
// A "shard" is an internal, parallel execution unit that allows the FlowController's core dispatch logic to be
141+
// parallelized. Consumers of this interface, such as a request distributor, MUST check `RegistryShard.IsActive()` before
142+
// routing new work to a shard to ensure they do not send requests to a shard that is gracefully draining.
143+
type ShardProvider interface {
144+
// Shards returns a slice of accessors, one for each internal state shard.
145+
//
146+
// A "shard" is an internal, parallel execution unit that allows the FlowController's core dispatch logic to be
147+
// parallelized, preventing a CPU bottleneck at high request rates. The FlowRegistry's state is sharded to support
148+
// this parallelism by reducing lock contention.
149+
//
150+
// The returned slice includes accessors for both active and draining shards. Consumers MUST use IsActive() to
151+
// determine if new work should be routed to a shard. Callers should not modify the returned slice.
152+
Shards() []RegistryShard
153+
}
154+
29155
// RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its
30156
// specific slice (shard) of the `FlowRegistry`'s state. It provides the necessary methods for a worker to perform its
31157
// dispatch operations by accessing queues and policies in a concurrent-safe manner.
@@ -100,6 +226,18 @@ type ManagedQueue interface {
100226
FlowQueueAccessor() framework.FlowQueueAccessor
101227
}
102228

229+
// AggregateStats holds globally aggregated statistics for the entire `FlowRegistry`.
230+
type AggregateStats struct {
231+
// TotalCapacityBytes is the globally configured maximum total byte size limit across all priority bands and shards.
232+
TotalCapacityBytes uint64
233+
// TotalByteSize is the total byte size of all items currently queued across the entire system.
234+
TotalByteSize uint64
235+
// TotalLen is the total number of items currently queued across the entire system.
236+
TotalLen uint64
237+
// PerPriorityBandStats maps each configured priority level to its globally aggregated statistics.
238+
PerPriorityBandStats map[uint]PriorityBandStats
239+
}
240+
103241
// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
104242
type ShardStats struct {
105243
// TotalCapacityBytes is the optional, maximum total byte size limit aggregated across all priority bands within this
@@ -117,6 +255,7 @@ type ShardStats struct {
117255
PerPriorityBandStats map[uint]PriorityBandStats
118256
}
119257

258+
// DeepCopy returns a deep copy of the `ShardStats`.
120259
// DeepCopy returns a deep copy of the `ShardStats`.
121260
func (s *ShardStats) DeepCopy() ShardStats {
122261
if s == nil {
@@ -150,6 +289,7 @@ type PriorityBandStats struct {
150289
Len uint64
151290
}
152291

292+
// DeepCopy returns a deep copy of the `PriorityBandStats`.
153293
// DeepCopy returns a deep copy of the `PriorityBandStats`.
154294
func (s *PriorityBandStats) DeepCopy() PriorityBandStats {
155295
if s == nil {

0 commit comments

Comments
 (0)