Skip to content

Commit 6cf8f31

Browse files
authored
feat(flowcontrol): Implement registry shard (#1187)
This commit introduces the core operational layer of the Flow Registry's sharded architecture. It deliberately focuses on implementing the `registryShard`—the concurrent, high-performance data plane for a single worker—as opposed to the top-level `FlowRegistry` administrative control plane, which will be built upon this foundation. The `registryShard` provides the concrete implementation of the `contracts.RegistryShard` port, giving each `FlowController` worker a safe, partitioned view of the system's state. This design is fundamental to achieving scalability by minimizing cross-worker contention on the hot path. The key components are: - **`registry.Config`**: The master configuration blueprint for the entire `FlowRegistry`. It is validated once and then partitioned, with each shard receiving its own slice of the configuration, notably for capacity limits. - **`registry.registryShard`**: The operational heart of this commit. It manages the lifecycle of queues and policies within a single shard, providing the read-oriented access needed by a `FlowController` worker. It ensures concurrency safety through a combination of mutexes for structural changes and lock-free atomics for statistics. - **`registry.managedQueue`**: A stateful decorator that wraps a raw `framework.SafeQueue`. Its two primary responsibilities are to enable the sharded model by providing atomic, upwardly-reconciled statistics, and to enforce lifecycle state (active vs. draining), which is essential for the graceful draining of flows during future administrative updates. - **Contracts and Errors**: New sentinel errors are added to the `contracts` package to create a clear, stable API boundary between the registry and its consumers. This work establishes the robust, scalable, and concurrent foundation upon which the top-level `FlowRegistry` administrative interface will be built.
1 parent 3a5d807 commit 6cf8f31

File tree

9 files changed

+1498
-34
lines changed

9 files changed

+1498
-34
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package contracts
18+
19+
import "errors"
20+
21+
// Registry Errors
22+
var (
23+
// ErrFlowInstanceNotFound indicates that a requested flow instance (a `ManagedQueue`) does not exist in the registry
24+
// shard, either because the flow is not registered or the specific instance (e.g., a draining queue at a particular
25+
// priority) is not present.
26+
ErrFlowInstanceNotFound = errors.New("flow instance not found")
27+
28+
// ErrPriorityBandNotFound indicates that a requested priority band does not exist in the registry because it was not
29+
// part of the initial configuration.
30+
ErrPriorityBandNotFound = errors.New("priority band not found")
31+
32+
// ErrPolicyQueueIncompatible indicates that a selected policy is not compatible with the capabilities of the queue it
33+
// is intended to operate on. For example, a policy requiring priority-based peeking is used with a simple FIFO queue.
34+
ErrPolicyQueueIncompatible = errors.New("policy is not compatible with queue capabilities")
35+
)

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 119 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,78 @@ limitations under the License.
1818
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
1919
// interfaces represent the "ports" through which the engine communicates.
2020
//
21-
// This package contains the primary service contracts for the Flow Registry and Saturation Detector.
21+
// This package contains the primary service contracts for the Flow Registry, which acts as the control plane for all
22+
// flow state and configuration.
2223
package contracts
2324

2425
import (
2526
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
2627
)
2728

29+
// RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its
30+
// specific slice (shard) of the `FlowRegistry`'s state. It provides the necessary methods for a worker to perform its
31+
// dispatch operations by accessing queues and policies in a concurrent-safe manner.
32+
//
33+
// # Conformance
34+
//
35+
// All methods MUST be goroutine-safe.
36+
type RegistryShard interface {
37+
// ID returns a unique identifier for this shard, which must remain stable for the shard's lifetime.
38+
ID() string
39+
40+
// IsActive returns true if the shard should accept new requests for enqueueing. A false value indicates the shard is
41+
// being gracefully drained and should not be given new work.
42+
IsActive() bool
43+
44+
// ActiveManagedQueue returns the currently active `ManagedQueue` for a given flow on this shard. This is the queue to
45+
// which new requests for the flow should be enqueued.
46+
// Returns an error wrapping `ErrFlowInstanceNotFound` if no active instance exists for the given `flowID`.
47+
ActiveManagedQueue(flowID string) (ManagedQueue, error)
48+
49+
// ManagedQueue retrieves a specific (potentially draining) `ManagedQueue` instance from this shard. This allows a
50+
// worker to continue dispatching items from queues that are draining as part of a flow update.
51+
// Returns an error wrapping `ErrFlowInstanceNotFound` if no instance for the given flowID and priority exists.
52+
ManagedQueue(flowID string, priority uint) (ManagedQueue, error)
53+
54+
// IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard.
55+
// The registry guarantees that a non-nil default policy (as configured at the priority-band level) is returned if
56+
// none is specified on the flow itself.
57+
// Returns an error wrapping `ErrFlowInstanceNotFound` if the flow instance does not exist.
58+
IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error)
59+
60+
// InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy` for this shard.
61+
// The registry guarantees that a non-nil default policy is returned if none is configured for the band.
62+
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
63+
InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error)
64+
65+
// PriorityBandAccessor retrieves a read-only accessor for a given priority level, providing a view of the band's
66+
// state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that
67+
// need to inspect and compare multiple flow queues within the same priority band.
68+
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
69+
PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error)
70+
71+
// AllOrderedPriorityLevels returns all configured priority levels that this shard is aware of, sorted in ascending
72+
// numerical order. This order corresponds to highest priority (lowest numeric value) to lowest priority (highest
73+
// numeric value).
74+
// The returned slice provides a definitive, ordered list of priority levels for iteration, for example, by a
75+
// `controller.FlowController` worker's dispatch loop.
76+
AllOrderedPriorityLevels() []uint
77+
78+
// Stats returns a snapshot of the statistics for this specific shard.
79+
Stats() ShardStats
80+
}
81+
2882
// ManagedQueue defines the interface for a flow's queue instance on a specific shard.
2983
// It wraps an underlying `framework.SafeQueue`, augmenting it with lifecycle validation against the `FlowRegistry` and
3084
// integrating atomic statistics updates.
3185
//
32-
// Conformance:
86+
// # Conformance
87+
//
3388
// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
34-
// - Mutating methods (`Add()`, `Remove()`, `CleanupExpired()`, `Drain()`) MUST ensure the flow instance still exists
35-
// and is valid within the `FlowRegistry` before proceeding. They MUST also atomically update relevant statistics
36-
// (e.g., queue length, byte size) at both the queue and priority-band levels.
89+
// - The `Add()` method MUST reject new items if the queue has been marked as "draining" by the `FlowRegistry`,
90+
// ensuring that lifecycle changes are respected even by consumers holding a stale pointer to the queue.
91+
// - All mutating methods (`Add()`, `Remove()`, `Cleanup()`, `Drain()`) MUST atomically update relevant statistics
92+
// (e.g., queue length, byte size).
3793
type ManagedQueue interface {
3894
framework.SafeQueue
3995

@@ -43,3 +99,61 @@ type ManagedQueue interface {
4399
// Conformance: This method MUST NOT return nil.
44100
FlowQueueAccessor() framework.FlowQueueAccessor
45101
}
102+
103+
// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
104+
type ShardStats struct {
105+
// TotalCapacityBytes is the optional, maximum total byte size limit aggregated across all priority bands within this
106+
// shard. Its value represents the globally configured limit for the `FlowRegistry` partitioned for this shard.
107+
// The `controller.FlowController` enforces this limit in addition to any per-band capacity limits.
108+
// A value of 0 signifies that this global limit is ignored, and only per-band limits apply.
109+
TotalCapacityBytes uint64
110+
// TotalByteSize is the total byte size of all items currently queued across all priority bands within this shard.
111+
TotalByteSize uint64
112+
// TotalLen is the total number of items currently queued across all priority bands within this shard.
113+
TotalLen uint64
114+
// PerPriorityBandStats maps each configured priority level to its statistics within this shard.
115+
// The key is the numerical priority level.
116+
// All configured priority levels are guaranteed to be represented.
117+
PerPriorityBandStats map[uint]PriorityBandStats
118+
}
119+
120+
// DeepCopy returns a deep copy of the `ShardStats`.
121+
func (s *ShardStats) DeepCopy() ShardStats {
122+
if s == nil {
123+
return ShardStats{}
124+
}
125+
newStats := *s
126+
if s.PerPriorityBandStats != nil {
127+
newStats.PerPriorityBandStats = make(map[uint]PriorityBandStats, len(s.PerPriorityBandStats))
128+
for k, v := range s.PerPriorityBandStats {
129+
newStats.PerPriorityBandStats[k] = v.DeepCopy()
130+
}
131+
}
132+
return newStats
133+
}
134+
135+
// PriorityBandStats holds aggregated statistics for a single priority band.
136+
type PriorityBandStats struct {
137+
// Priority is the numerical priority level this struct describes.
138+
Priority uint
139+
// PriorityName is an optional, human-readable name for the priority level (e.g., "Critical", "Sheddable").
140+
PriorityName string
141+
// CapacityBytes is the configured maximum total byte size for this priority band, aggregated across all items in
142+
// all flow queues within this band. If scoped to a shard, its value represents the configured band limit for the
143+
// `FlowRegistry` partitioned for this shard.
144+
// The `controller.FlowController` enforces this limit.
145+
// A default non-zero value is guaranteed if not configured.
146+
CapacityBytes uint64
147+
// ByteSize is the total byte size of items currently queued in this priority band.
148+
ByteSize uint64
149+
// Len is the total number of items currently queued in this priority band.
150+
Len uint64
151+
}
152+
153+
// DeepCopy returns a deep copy of the `PriorityBandStats`.
154+
func (s *PriorityBandStats) DeepCopy() PriorityBandStats {
155+
if s == nil {
156+
return PriorityBandStats{}
157+
}
158+
return *s
159+
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package registry
18+
19+
import (
20+
"errors"
21+
"fmt"
22+
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
25+
inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead"
27+
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue"
31+
)
32+
33+
// Config holds the master configuration for the entire `FlowRegistry`. It serves as the top-level blueprint, defining
34+
// global capacity limits and the structure of its priority bands.
35+
//
36+
// This master configuration is validated and defaulted once at startup. It is then partitioned and distributed to each
37+
// internal `registryShard`, ensuring a consistent and predictable state across the system.
38+
type Config struct {
39+
// MaxBytes defines an optional, global maximum total byte size limit aggregated across all priority bands and shards.
40+
// The `controller.FlowController` enforces this limit in addition to per-band capacity limits.
41+
//
42+
// Optional: Defaults to 0, which signifies that the global limit is ignored.
43+
MaxBytes uint64
44+
45+
// PriorityBands defines the set of priority bands managed by the `FlowRegistry`. The configuration for each band,
46+
// including its default policies and queue types, is specified here.
47+
//
48+
// Required: At least one `PriorityBandConfig` must be provided for a functional registry.
49+
PriorityBands []PriorityBandConfig
50+
}
51+
52+
// partition calculates and returns a new `Config` with capacity values partitioned for a specific shard.
53+
// This method ensures that the total capacity is distributed as evenly as possible across all shards.
54+
func (c *Config) partition(shardIndex, totalShards int) (*Config, error) {
55+
if totalShards <= 0 || shardIndex < 0 || shardIndex >= totalShards {
56+
return nil, fmt.Errorf("invalid shard partitioning arguments: shardIndex=%d, totalShards=%d",
57+
shardIndex, totalShards)
58+
}
59+
60+
partitionValue := func(total uint64) uint64 {
61+
if total == 0 {
62+
return 0
63+
}
64+
base := total / uint64(totalShards)
65+
remainder := total % uint64(totalShards)
66+
if uint64(shardIndex) < remainder {
67+
return base + 1
68+
}
69+
return base
70+
}
71+
72+
newCfg := &Config{
73+
MaxBytes: partitionValue(c.MaxBytes),
74+
PriorityBands: make([]PriorityBandConfig, len(c.PriorityBands)),
75+
}
76+
77+
for i, band := range c.PriorityBands {
78+
newBand := band // Copy the original config
79+
newBand.MaxBytes = partitionValue(band.MaxBytes) // Overwrite with the partitioned value
80+
newCfg.PriorityBands[i] = newBand
81+
}
82+
83+
return newCfg, nil
84+
}
85+
86+
// validateAndApplyDefaults checks the configuration for validity and populates any empty fields with system defaults.
87+
// This method should be called once by the registry before it initializes any shards.
88+
func (c *Config) validateAndApplyDefaults() error {
89+
if len(c.PriorityBands) == 0 {
90+
return errors.New("config validation failed: at least one priority band must be defined")
91+
}
92+
93+
priorities := make(map[uint]struct{}) // Keep track of seen priorities
94+
95+
for i := range c.PriorityBands {
96+
band := &c.PriorityBands[i]
97+
if _, exists := priorities[band.Priority]; exists {
98+
return fmt.Errorf("config validation failed: duplicate priority level %d found", band.Priority)
99+
}
100+
priorities[band.Priority] = struct{}{}
101+
102+
if band.PriorityName == "" {
103+
return errors.New("config validation failed: PriorityName is required for all priority bands")
104+
}
105+
if band.IntraFlowDispatchPolicy == "" {
106+
band.IntraFlowDispatchPolicy = fcfs.FCFSPolicyName
107+
}
108+
if band.InterFlowDispatchPolicy == "" {
109+
band.InterFlowDispatchPolicy = besthead.BestHeadPolicyName
110+
}
111+
if band.Queue == "" {
112+
band.Queue = listqueue.ListQueueName
113+
}
114+
115+
// After defaulting, validate that the chosen plugins are compatible.
116+
if err := validateBandCompatibility(*band); err != nil {
117+
return err
118+
}
119+
}
120+
return nil
121+
}
122+
123+
// validateBandCompatibility verifies that a band's default policy is compatible with its default queue type.
124+
func validateBandCompatibility(band PriorityBandConfig) error {
125+
policy, err := intra.NewPolicyFromName(band.IntraFlowDispatchPolicy)
126+
if err != nil {
127+
return fmt.Errorf("failed to validate policy %q for priority band %d: %w",
128+
band.IntraFlowDispatchPolicy, band.Priority, err)
129+
}
130+
131+
requiredCapabilities := policy.RequiredQueueCapabilities()
132+
if len(requiredCapabilities) == 0 {
133+
return nil // Policy has no specific requirements.
134+
}
135+
136+
// Create a temporary queue instance to inspect its capabilities.
137+
tempQueue, err := queue.NewQueueFromName(band.Queue, nil)
138+
if err != nil {
139+
return fmt.Errorf("failed to inspect queue type %q for priority band %d: %w", band.Queue, band.Priority, err)
140+
}
141+
queueCapabilities := tempQueue.Capabilities()
142+
143+
// Build a set of the queue's capabilities for efficient lookup.
144+
capabilitySet := make(map[framework.QueueCapability]struct{}, len(queueCapabilities))
145+
for _, cap := range queueCapabilities {
146+
capabilitySet[cap] = struct{}{}
147+
}
148+
149+
// Check if all required capabilities are present.
150+
for _, req := range requiredCapabilities {
151+
if _, ok := capabilitySet[req]; !ok {
152+
return fmt.Errorf(
153+
"policy %q is not compatible with queue %q for priority band %d (%s): missing capability %q: %w",
154+
policy.Name(),
155+
tempQueue.Name(),
156+
band.Priority,
157+
band.PriorityName,
158+
req,
159+
contracts.ErrPolicyQueueIncompatible,
160+
)
161+
}
162+
}
163+
164+
return nil
165+
}
166+
167+
// PriorityBandConfig defines the configuration for a single priority band within the `FlowRegistry`. It establishes the
168+
// default behaviors (such as queueing and dispatch policies) and capacity limits for all flows that operate at this
169+
// priority level.
170+
type PriorityBandConfig struct {
171+
// Priority is the numerical priority level for this band.
172+
// Convention: Lower numerical values indicate higher priority (e.g., 0 is highest).
173+
//
174+
// Required.
175+
Priority uint
176+
177+
// PriorityName is a human-readable name for this priority band (e.g., "Critical", "Standard", "Sheddable").
178+
//
179+
// Required.
180+
PriorityName string
181+
182+
// IntraFlowDispatchPolicy specifies the default name of the registered policy used to select a specific request to
183+
// dispatch next from within a single flow's queue in this band. This default can be overridden on a per-flow basis.
184+
//
185+
// Optional: If empty, a system default (e.g., "FCFS") is used.
186+
IntraFlowDispatchPolicy intra.RegisteredPolicyName
187+
188+
// InterFlowDispatchPolicy specifies the name of the registered policy used to select which flow's queue to service
189+
// next from this band.
190+
//
191+
// Optional: If empty, a system default (e.g., "BestHead") is used.
192+
InterFlowDispatchPolicy inter.RegisteredPolicyName
193+
194+
// Queue specifies the default name of the registered SafeQueue implementation to be used for flow queues within this
195+
// band.
196+
//
197+
// Optional: If empty, a system default (e.g., "ListQueue") is used.
198+
Queue queue.RegisteredQueueName
199+
200+
// MaxBytes defines the maximum total byte size for this specific priority band, aggregated across all shards.
201+
//
202+
// Optional: If not set, a system default (e.g., 1 GB) is applied.
203+
MaxBytes uint64
204+
}

0 commit comments

Comments
 (0)