Skip to content

Commit c9c0c0b

Browse files
LukeAVanDriekfswain
authored andcommitted
feat: Introduce ManagedQueue and Contracts (kubernetes-sigs#1174)
This commit introduces the foundational layer for the Flow Registry, which will act as the stateful control plane for managing queues and policies. It defines the `ManagedQueue` decorator, refines the system's core service contracts, and simplifies the queue plugin interface. The key changes are: 1. **`ports` -> `contracts` Rename**: The `ports` package is renamed to `contracts` to more accurately reflect the "Ports and Adapters" architectural pattern and avoid confusion with network ports. 2. **`ManagedQueue` Introduction**: A new `registry.managedQueue` component is introduced. It acts as a stateful decorator around a `framework.SafeQueue`, adding critical registry-level functionality: - **Atomic Statistics**: It maintains its own atomic `len` and `byteSize` counters. - **State Reconciliation**: It uses a callback to propagate statistical deltas to its parent (the future `RegistryShard`), ensuring aggregated stats remain consistent without locks. 3. **`SafeQueue` Interface Simplification**: The `framework.SafeQueue` interface is simplified. The `Add` and `Remove` methods no longer return the new queue state, as this responsibility is now correctly handled by the `ManagedQueue` decorator. This makes the `SafeQueue` contract cleaner and more focused on its core queuing logic.
1 parent 75e77ce commit c9c0c0b

File tree

18 files changed

+905
-144
lines changed

18 files changed

+905
-144
lines changed

pkg/epp/flowcontrol/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ graph LR
5959
6060
subgraph Internal Interactions
6161
direction LR
62-
D(Ports) -- "abstracts state" --> E(Flow Registry);
62+
D(Contracts) -- "abstracts state" --> E(Flow Registry);
6363
D -- "abstracts load" --> SD(Saturation Detector);
6464
E -- "configures" --> F(Framework);
6565
F -- "defines" --> P(Plugins: Queues & Policies);
@@ -107,10 +107,10 @@ their justifications, please refer to the detailed documentation within the rele
107107
concurrent-safe request storage. It uses a `QueueCapability` system that allows for diverse and extensible queue
108108
implementations (e.g., FIFO, Priority Heap) while maintaining a stable interface.
109109

110-
4. **The `FlowRegistry` (`./registry`, `./ports`)**: This is the stateful control plane of the system. It manages the
111-
configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the
110+
4. **The `FlowRegistry` (`./registry`, `./contracts`)**: This is the stateful control plane of the system. It manages
111+
the configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the
112112
`FlowController` workers to enable parallel operation with minimal lock contention.
113113

114-
5. **Core Types and Service Ports (`./types`, `./ports`)**: These packages define the foundational data structures
115-
(e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its dependencies,
116-
following a "Ports and Adapters" architectural style.
114+
5. **Core Types and Service Contracts (`./types`, `./contracts`)**: These packages define the foundational data
115+
structures (e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its
116+
dependencies, following a "Ports and Adapters" architectural style.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its
18+
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
19+
// interfaces represent the "ports" through which the engine communicates.
20+
//
21+
// This package contains the primary service contracts for the Flow Registry and Saturation Detector.
22+
package contracts
23+
24+
import (
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
26+
)
27+
28+
// ManagedQueue defines the interface for a flow's queue instance on a specific shard.
29+
// It wraps an underlying `framework.SafeQueue`, augmenting it with lifecycle validation against the `FlowRegistry` and
30+
// integrating atomic statistics updates.
31+
//
32+
// Conformance:
33+
// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
34+
// - Mutating methods (`Add()`, `Remove()`, `CleanupExpired()`, `Drain()`) MUST ensure the flow instance still exists
35+
// and is valid within the `FlowRegistry` before proceeding. They MUST also atomically update relevant statistics
36+
// (e.g., queue length, byte size) at both the queue and priority-band levels.
37+
type ManagedQueue interface {
38+
framework.SafeQueue
39+
40+
// FlowQueueAccessor returns a read-only, flow-aware accessor for this queue.
41+
// This accessor is primarily used by policy plugins to inspect the queue's state in a structured way.
42+
//
43+
// Conformance: This method MUST NOT return nil.
44+
FlowQueueAccessor() framework.FlowQueueAccessor
45+
}

pkg/epp/flowcontrol/framework/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
// `SafeQueue` Errors
2424
//
2525
// These errors relate to operations directly on a `SafeQueue` implementation. They are returned by `SafeQueue` methods
26-
// and might be handled or wrapped by the `ports.FlowRegistry`'s `ports.ManagedQueue` or the
26+
// and might be handled or wrapped by the `contracts.FlowRegistry`'s `contracts.ManagedQueue` or the
2727
// `controller.FlowController`.
2828
var (
2929
// ErrNilQueueItem indicates that a nil `types.QueueItemAccessor` was passed to `SafeQueue.Add()`.

pkg/epp/flowcontrol/framework/mocks/mocks.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,84 @@ func (m *MockPriorityBandAccessor) IterateQueues(callback func(queue framework.F
102102
}
103103

104104
var _ framework.PriorityBandAccessor = &MockPriorityBandAccessor{}
105+
106+
// MockSafeQueue is a mock implementation of the `framework.SafeQueue` interface.
107+
type MockSafeQueue struct {
108+
NameV string
109+
CapabilitiesV []framework.QueueCapability
110+
LenV int
111+
ByteSizeV uint64
112+
PeekHeadV types.QueueItemAccessor
113+
PeekHeadErrV error
114+
PeekTailV types.QueueItemAccessor
115+
PeekTailErrV error
116+
AddFunc func(item types.QueueItemAccessor) error
117+
RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error)
118+
CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error)
119+
DrainFunc func() ([]types.QueueItemAccessor, error)
120+
}
121+
122+
func (m *MockSafeQueue) Name() string { return m.NameV }
123+
func (m *MockSafeQueue) Capabilities() []framework.QueueCapability { return m.CapabilitiesV }
124+
func (m *MockSafeQueue) Len() int { return m.LenV }
125+
func (m *MockSafeQueue) ByteSize() uint64 { return m.ByteSizeV }
126+
127+
func (m *MockSafeQueue) PeekHead() (types.QueueItemAccessor, error) {
128+
return m.PeekHeadV, m.PeekHeadErrV
129+
}
130+
131+
func (m *MockSafeQueue) PeekTail() (types.QueueItemAccessor, error) {
132+
return m.PeekTailV, m.PeekTailErrV
133+
}
134+
135+
func (m *MockSafeQueue) Add(item types.QueueItemAccessor) error {
136+
if m.AddFunc != nil {
137+
return m.AddFunc(item)
138+
}
139+
return nil
140+
}
141+
142+
func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) {
143+
if m.RemoveFunc != nil {
144+
return m.RemoveFunc(handle)
145+
}
146+
return nil, nil
147+
}
148+
149+
func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) {
150+
if m.CleanupFunc != nil {
151+
return m.CleanupFunc(predicate)
152+
}
153+
return nil, nil
154+
}
155+
156+
func (m *MockSafeQueue) Drain() ([]types.QueueItemAccessor, error) {
157+
if m.DrainFunc != nil {
158+
return m.DrainFunc()
159+
}
160+
return nil, nil
161+
}
162+
163+
var _ framework.SafeQueue = &MockSafeQueue{}
164+
165+
// MockIntraFlowDispatchPolicy is a mock implementation of the `framework.IntraFlowDispatchPolicy` interface.
166+
type MockIntraFlowDispatchPolicy struct {
167+
NameV string
168+
SelectItemV types.QueueItemAccessor
169+
SelectItemErrV error
170+
ComparatorV framework.ItemComparator
171+
RequiredQueueCapabilitiesV []framework.QueueCapability
172+
}
173+
174+
func (m *MockIntraFlowDispatchPolicy) Name() string { return m.NameV }
175+
func (m *MockIntraFlowDispatchPolicy) Comparator() framework.ItemComparator { return m.ComparatorV }
176+
177+
func (m *MockIntraFlowDispatchPolicy) SelectItem(queue framework.FlowQueueAccessor) (types.QueueItemAccessor, error) {
178+
return m.SelectItemV, m.SelectItemErrV
179+
}
180+
181+
func (m *MockIntraFlowDispatchPolicy) RequiredQueueCapabilities() []framework.QueueCapability {
182+
return m.RequiredQueueCapabilitiesV
183+
}
184+
185+
var _ framework.IntraFlowDispatchPolicy = &MockIntraFlowDispatchPolicy{}

pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Key responsibilities and characteristics of a `framework.IntraFlowDispatchPolicy
2929
3. **Queue Compatibility (`RequiredQueueCapabilities`)**: The policy specifies the capabilities its associated
3030
[`framework.SafeQueue`](../../../queue.go) must support for it to function correctly. For example, a simple FCFS
3131
policy would require `framework.CapabilityFIFO`, while a more complex, priority-based policy would require
32-
`framework.CapabilityPriorityConfigurable`. The `ports.FlowRegistry` uses this information to pair policies with
32+
`framework.CapabilityPriorityConfigurable`. The `contracts.FlowRegistry` uses this information to pair policies with
3333
compatible queues.
3434

3535
The `framework.IntraFlowDispatchPolicy` allows for fine-grained control over how individual requests within a single flow are

pkg/epp/flowcontrol/framework/plugins/queue/README.md

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defines core, self-contained queue data structures used by the `controller.FlowC
66
## Overview
77

88
The `controller.FlowController` manages requests by organizing them into queues. Each logical "flow" within a given
9-
priority band has its own `ports.ManagedQueue` instance, which wraps a `framework.SafeQueue`. This design allows the
9+
priority band has its own `contracts.ManagedQueue` instance, which wraps a `framework.SafeQueue`. This design allows the
1010
`controller.FlowController` to apply policies at both the inter-flow (across different flows) and intra-flow (within a
1111
single flow's queue) levels.
1212

@@ -16,12 +16,12 @@ design allows for:
1616
- **Different Queuing Disciplines**: A basic FIFO queue ([`listqueue`](./listqueue/)) is provided, but other disciplines
1717
like priority queues ([`maxminheap`](./maxminheap/)) can be used for more complex ordering requirements.
1818
- **Specialized Capabilities**: Policies can declare `RequiredQueueCapabilities()` (e.g., `framework.CapabilityFIFO` or
19-
`framework.CapabilityPriorityConfigurable`). The `ports.FlowRegistry` pairs the policy with a queue that provides the
20-
necessary capabilities.
19+
`framework.CapabilityPriorityConfigurable`). The `contracts.FlowRegistry` pairs the policy with a queue that provides
20+
the necessary capabilities.
2121
- **Performance Optimization**: Different queue implementations offer varying performance characteristics, which can be
2222
compared using the centralized benchmark suite to select the best fit for a given workload.
2323

24-
## Contributing a New `SafeQueue` Implementation
24+
## Contributing a New `framework.SafeQueue` Implementation
2525

2626
To contribute a new queue implementation, follow these steps:
2727

@@ -73,30 +73,31 @@ The suite includes the following scenarios:
7373

7474
### Latest Results
7575

76-
*Last Updated: 2025-07-10*
76+
*Last Updated: Commit `35a9d6c`*
7777
*(CPU: AMD EPYC 7B12)*
7878

7979
| Benchmark | Implementation | Iterations | ns/op | B/op | allocs/op |
8080
| --------------------------- | -------------- | ---------- | ------- | ----- | --------- |
81-
| **AddRemove** | `ListQueue` | 1,889,844 | 609.0 | 224 | 5 |
82-
| | `MaxMinHeap` | 1,660,987 | 696.7 | 184 | 4 |
83-
| **AddPeekRemove** | `ListQueue` | 3,884,938 | 298.0 | 224 | 5 |
84-
| | `MaxMinHeap` | 1,857,448 | 615.9 | 184 | 4 |
85-
| **AddPeekTailRemove** | `ListQueue` | 3,576,487 | 308.4 | 224 | 5 |
86-
| | `MaxMinHeap` | 2,113,134 | 535.3 | 184 | 4 |
87-
| **BulkAddThenBulkRemove** | `ListQueue` | 24,032 | 49,861 | 24801 | 698 |
88-
| | `MaxMinHeap` | 10,000 | 108,868 | 20787 | 597 |
89-
| **HighContention** | `ListQueue` | 484,574 | 2,328 | 896 | 20 |
90-
| | `MaxMinHeap` | 84,806 | 18,679 | 783 | 16 |
81+
| **AddRemove** | `ListQueue` | 1,906,153 | 595.5 | 224 | 5 |
82+
| | `MaxMinHeap` | 1,763,473 | 668.9 | 184 | 4 |
83+
| **AddPeekRemove** | `ListQueue` | 3,547,653 | 298.5 | 224 | 5 |
84+
| | `MaxMinHeap` | 1,986,780 | 751.5 | 184 | 4 |
85+
| **AddPeekTailRemove** | `ListQueue` | 3,732,302 | 303.3 | 224 | 5 |
86+
| | `MaxMinHeap` | 2,006,383 | 551.6 | 184 | 4 |
87+
| **BulkAddThenBulkRemove** | `ListQueue` | 24,046 | 47,240 | 24800 | 698 |
88+
| | `MaxMinHeap` | 9,410 | 110,929 | 20786 | 597 |
89+
| **HighContention** | `ListQueue` | 21,283,537 | 47.53 | 11 | 0 |
90+
| | `MaxMinHeap` | 16,953,121 | 74.09 | 4 | 0 |
9191

9292
### Interpretation of Results
9393

9494
The benchmark results highlight the trade-offs between the different queue implementations based on their underlying
9595
data structures:
9696

9797
- **`ListQueue`**: As a linked list, it excels in scenarios involving frequent additions or removals from either end of
98-
the queue (`AddPeekRemove`, `AddPeekTailRemove`), which are O(1) operations. Its performance is less competitive in high-contention and bulk scenarios, which reflects the necessary per-item memory allocation and pointer manipulation
99-
overhead.
98+
the queue (`AddPeekRemove`, `AddPeekTailRemove`), which are O(1) operations. The `HighContention` benchmark shows that
99+
its simple, low-overhead operations are also extremely performant for consumer throughput even under heavy concurrent
100+
load.
100101
- **`MaxMinHeap`**: As a slice-based heap, it has a lower allocation overhead per operation, making it efficient for
101102
high-throughput `AddRemove` cycles. Peeking and removing items involves maintaining the heap property, which has an
102103
O(log n) cost, making individual peek operations slower than `ListQueue`.

0 commit comments

Comments
 (0)