@@ -14,23 +14,107 @@ See the License for the specific language governing permissions and
1414limitations under the License.
1515*/
1616
17- // Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its
18- // primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
19- // interfaces represent the "ports" through which the engine communicates.
20- //
21- // This package contains the primary service contracts for the Flow Registry, which acts as the control plane for all
22- // flow state and configuration.
2317package contracts
2418
2519import (
2620 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
2721 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2822)
2923
30- // RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its
31- // specific slice (shard) of the `FlowRegistry's` state. It provides a concurrent-safe view of all flow instances, which
32- // are uniquely identified by their composite `types.FlowKey`. It is the primary contract for performing dispatch
33- // operations.
24+ // FlowRegistry is the complete interface for the global control plane. An implementation of this interface is the single
25+ // source of truth for all flow control state and configuration.
26+ //
27+ // # Conformance
28+ //
29+ // All methods MUST be goroutine-safe. Implementations are expected to perform complex updates (e.g.,
30+ // `RegisterOrUpdateFlow`) atomically.
31+ //
32+ // # System Invariants
33+ //
34+ // Concrete implementations MUST uphold the following invariants:
35+ // 1. Shard Consistency: All configured priority bands and registered flow instances must exist on every Active shard.
36+ // Plugin instance types must be consistent for a given flow across all shards.
37+ // 2. Flow Instance Uniqueness: Each unique `types.FlowKey` (`ID` + `Priority`) corresponds to exactly one managed flow
38+ // instance.
39+ // 3. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active
40+ // shards.
41+ //
42+ // # Flow Lifecycle
43+ //
44+ // A flow instance (identified by its immutable `FlowKey`) has a simple lifecycle:
45+ //
46+ // - Registered: Known to the `FlowRegistry` via `RegisterOrUpdateFlow`.
47+ // - Idle: Queues are empty across all Active and Draining shards.
48+ // - Garbage Collected (Unregistered): The registry automatically garbage collects flows after they have remained Idle
49+ // for a configurable duration.
50+ //
51+ // # Shard Lifecycle
52+ //
53+ // When a shard is decommissioned, it is marked inactive (`IsActive() == false`) to prevent new enqueues. The shard
54+ // continues to drain and is deleted only after it is empty.
55+ type FlowRegistry interface {
56+ FlowRegistryAdmin
57+ ShardProvider
58+ }
59+
60+ // FlowRegistryAdmin defines the administrative interface for the global control plane.
61+ //
62+ // # Dynamic Update Strategies
63+ //
64+ // The contract specifies behaviors for handling dynamic updates, prioritizing stability and correctness:
65+ //
66+ // - Immutable Flow Identity (`types.FlowKey`): The system treats the `FlowKey` (`ID` + `Priority`) as the immutable
67+ // identifier. Changing the priority of traffic requires registering a new `FlowKey`. The old flow instance is
68+ // automatically garbage collected when Idle. This design eliminates complex priority migration logic.
69+ //
70+ // - Graceful Draining (Shard Scale-Down): Decommissioned shards enter a Draining state. They stop accepting new
71+ // requests but continue to be processed for dispatch until empty.
72+ //
73+ // - Self-Balancing (Shard Scale-Up): When new shards are added, the `controller.FlowController`'s distribution logic
74+ // naturally utilizes them, funneling new requests to the less-loaded shards. Existing queued items are not
75+ // migrated.
76+ type FlowRegistryAdmin interface {
77+ // RegisterOrUpdateFlow handles the registration of a new flow instance or the update of an existing instance's
78+ // specification (for the same `types.FlowKey`). The operation is atomic across all shards.
79+ //
80+ // Since the `FlowKey` (including `Priority`) is immutable, this method cannot change a flow's priority.
81+ // To change priority, the caller should simply register the new `FlowKey`; the old flow instance will be
82+ // automatically garbage collected when it becomes Idle.
83+ //
84+ // Returns errors wrapping `ErrFlowIDEmpty`, `ErrPriorityBandNotFound`, or internal errors if plugin instantiation
85+ // fails.
86+ RegisterOrUpdateFlow (spec types.FlowSpecification ) error
87+
88+ // UpdateShardCount dynamically adjusts the number of internal state shards.
89+ //
90+ // The implementation MUST atomically re-partition capacity allocations across all active shards.
91+ // Returns an error wrapping `ErrInvalidShardCount` if `n` is not positive.
92+ UpdateShardCount (n int ) error
93+
94+ // Stats returns globally aggregated statistics for the entire `FlowRegistry`.
95+ Stats () AggregateStats
96+
97+ // ShardStats returns a slice of statistics, one for each internal shard. This provides visibility for debugging and
98+ // monitoring per-shard behavior (e.g., identifying hot or stuck shards).
99+ ShardStats () []ShardStats
100+ }
101+
102+ // ShardProvider defines the interface for discovering available shards.
103+ //
104+ // A "shard" is an internal, parallel execution unit that allows the `controller.FlowController`'s core dispatch logic
105+ // to be parallelized, preventing a CPU bottleneck at high request rates. The `FlowRegistry`'s state is sharded to
106+ // support this parallelism by reducing lock contention.
107+ //
108+ // Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard to avoid sending requests to a
109+ // Draining shard.
110+ type ShardProvider interface {
111+ // Shards returns a slice of accessors, one for each internal state shard (Active and Draining).
112+ // Callers should not modify the returned slice.
113+ Shards () []RegistryShard
114+ }
115+
116+ // RegistryShard defines the interface for accessing a specific slice (shard) of the `FlowRegistry's` state.
117+ // It provides a concurrent-safe view for `controller.FlowController` workers.
34118//
35119// # Conformance
36120//
@@ -46,12 +130,12 @@ type RegistryShard interface {
46130 // ManagedQueue retrieves the managed queue for the given, unique `types.FlowKey`. This is the primary method for
47131 // accessing a specific flow's queue for either enqueueing or dispatching requests.
48132 //
49- // Returns an error wrapping `ErrPriorityBandNotFound` if the priority specified in the key is not configured, or
133+ // Returns an error wrapping `ErrPriorityBandNotFound` if the priority specified in the ` key` is not configured, or
50134 // `ErrFlowInstanceNotFound` if no instance exists for the given `key`.
51135 ManagedQueue (key types.FlowKey ) (ManagedQueue , error )
52136
53137 // IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard,
54- // identified by its unique `FlowKey`.
138+ // identified by its unique `types. FlowKey`.
55139 // The registry guarantees that a non-nil default policy (as configured at the priority-band level) is returned if
56140 // none is specified for the flow.
57141 // Returns an error wrapping `ErrFlowInstanceNotFound` if the flow instance does not exist.
@@ -63,8 +147,8 @@ type RegistryShard interface {
63147 InterFlowDispatchPolicy (priority uint ) (framework.InterFlowDispatchPolicy , error )
64148
65149 // PriorityBandAccessor retrieves a read-only accessor for a given priority level, providing a view of the band's
66- // state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that
67- // need to inspect and compare multiple flow queues within the same priority band.
150+ // state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that need to
151+ // inspect and compare multiple flow queues within the same priority band.
68152 // Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
69153 PriorityBandAccessor (priority uint ) (framework.PriorityBandAccessor , error )
70154
@@ -80,24 +164,33 @@ type RegistryShard interface {
80164}
81165
82166// ManagedQueue defines the interface for a flow's queue instance on a specific shard.
83- // It wraps an underlying `framework.SafeQueue`, augmenting it with lifecycle validation against the `FlowRegistry` and
84- // integrating atomic statistics updates.
167+ // It acts as a stateful decorator around an underlying `framework.SafeQueue`.
85168//
86169// # Conformance
87170//
88- // - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
89- // - All mutating methods (`Add()`, `Remove()`, `Cleanup()`, `Drain()`) MUST atomically update relevant statistics
90- // (e.g., queue length, byte size) .
171+ // - All methods MUST be goroutine-safe.
172+ // - All mutating methods (`Add()`, `Remove()`, etc.) MUST ensure that the underlying queue state and the statistics
173+ // (`Len`, `ByteSize`) are updated atomically relative to each other .
91174type ManagedQueue interface {
92175 framework.SafeQueue
93176
94- // FlowQueueAccessor returns a read-only, flow-aware accessor for this queue.
95- // This accessor is primarily used by policy plugins to inspect the queue's state in a structured way.
96- //
177+ // FlowQueueAccessor returns a read-only, flow-aware accessor for this queue, used by policy plugins.
97178 // Conformance: This method MUST NOT return nil.
98179 FlowQueueAccessor () framework.FlowQueueAccessor
99180}
100181
182+ // AggregateStats holds globally aggregated statistics for the entire `FlowRegistry`.
183+ type AggregateStats struct {
184+ // TotalCapacityBytes is the globally configured maximum total byte size limit across all priority bands and shards.
185+ TotalCapacityBytes uint64
186+ // TotalByteSize is the total byte size of all items currently queued across the entire system.
187+ TotalByteSize uint64
188+ // TotalLen is the total number of items currently queued across the entire system.
189+ TotalLen uint64
190+ // PerPriorityBandStats maps each configured priority level to its globally aggregated statistics.
191+ PerPriorityBandStats map [uint ]PriorityBandStats
192+ }
193+
101194// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
102195type ShardStats struct {
103196 // TotalCapacityBytes is the optional, maximum total byte size limit aggregated across all priority bands within this
@@ -110,35 +203,22 @@ type ShardStats struct {
110203 // TotalLen is the total number of items currently queued across all priority bands within this shard.
111204 TotalLen uint64
112205 // PerPriorityBandStats maps each configured priority level to its statistics within this shard.
206+ // The capacity values within represent this shard's partition of the global band capacity.
113207 // The key is the numerical priority level.
114208 // All configured priority levels are guaranteed to be represented.
115209 PerPriorityBandStats map [uint ]PriorityBandStats
116210}
117211
118- // DeepCopy returns a deep copy of the `ShardStats`.
119- func (s * ShardStats ) DeepCopy () ShardStats {
120- if s == nil {
121- return ShardStats {}
122- }
123- newStats := * s
124- if s .PerPriorityBandStats != nil {
125- newStats .PerPriorityBandStats = make (map [uint ]PriorityBandStats , len (s .PerPriorityBandStats ))
126- for k , v := range s .PerPriorityBandStats {
127- newStats .PerPriorityBandStats [k ] = v .DeepCopy ()
128- }
129- }
130- return newStats
131- }
132-
133212// PriorityBandStats holds aggregated statistics for a single priority band.
134213type PriorityBandStats struct {
135214 // Priority is the numerical priority level this struct describes.
136215 Priority uint
137- // PriorityName is an optional, human-readable name for the priority level (e.g., "Critical", "Sheddable").
216+ // PriorityName is a human-readable name for the priority band (e.g., "Critical", "Sheddable").
217+ // The registry configuration requires this field, so it is guaranteed to be non-empty.
138218 PriorityName string
139- // CapacityBytes is the configured maximum total byte size for this priority band, aggregated across all items in
140- // all flow queues within this band. If scoped to a shard, its value represents the configured band limit for the
141- // `FlowRegistry` partitioned for this shard.
219+ // CapacityBytes is the configured maximum total byte size for this priority band.
220+ // When viewed via `AggregateStats`, this is the global limit. When viewed via `ShardStats`, this is the partitioned
221+ // value for that specific shard.
142222 // The `controller.FlowController` enforces this limit.
143223 // A default non-zero value is guaranteed if not configured.
144224 CapacityBytes uint64
@@ -147,11 +227,3 @@ type PriorityBandStats struct {
147227 // Len is the total number of items currently queued in this priority band.
148228 Len uint64
149229}
150-
151- // DeepCopy returns a deep copy of the `PriorityBandStats`.
152- func (s * PriorityBandStats ) DeepCopy () PriorityBandStats {
153- if s == nil {
154- return PriorityBandStats {}
155- }
156- return * s
157- }
0 commit comments