Skip to content

Commit e6617bf

Browse files
LukeAVanDriekfswain
authored andcommitted
feat(flowcontrol): Implement ShardProcessor engine (kubernetes-sigs#1203)
Introduces the `ShardProcessor`, the core data plane worker for the `FlowController`. Each processor is paired with a `RegistryShard` and is responsible for all request lifecycle operations on that shard, including enqueueing, dispatching, and background cleanup. The processor's design is centered on correctness and performance in a highly concurrent environment. It uses a single-goroutine ownership model for its main `Run` loop, which serializes all queue write operations (enqueues) through a channel. This makes the critical "check-then-act" capacity logic safe without complex locking. The `flowItem` uses a `sync.Once` to guarantee idempotent finalization, deterministically resolving the race between dispatch and expiry. Resilience is achieved through a two-tiered error handling strategy that distinguishes between inter-flow and intra-flow policy failures. This isolates faults, prevents tight-loop error conditions, and ensures a failure in one priority band does not halt progress in others. To simplify policy logic, this change also introduces the `BandFilter` abstraction. This acts as a pre-policy gate, decoupling the logic of *viability* (e.g., is a flow backpressured?) from the logic of *selection* (e.g., which flow is next?). A comprehensive, high-fidelity test suite with stateful mocks is included to reliably verify the processor's complex concurrent behavior, race conditions, and failure modes.
1 parent 076920d commit e6617bf

File tree

10 files changed

+3088
-0
lines changed

10 files changed

+3088
-0
lines changed
Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package mocks provides mocks for the interfaces defined in the `contracts` package.
18+
//
19+
// # Testing Philosophy: High-Fidelity Mocks
20+
//
21+
// The components that consume these contracts, particularly the `controller.ShardProcessor`, are complex, concurrent
22+
// orchestrators. Testing them reliably requires more than simple stubs. It requires high-fidelity mocks that allow for
23+
// the deterministic simulation of race conditions and specific failure modes.
24+
//
25+
// For this reason, mocks like `MockManagedQueue` are deliberately stateful and thread-safe. They provide a reliable,
26+
// in-memory simulation of the real component's behavior, while also providing function-based overrides
27+
// (e.g., `AddFunc`) that allow tests to inject specific errors or pause execution at critical moments. This strategy is
28+
// essential for creating the robust, non-flaky tests needed to verify the correctness of the system's concurrent logic.
29+
// For a more detailed defense of this strategy, see the comment at the top of `controller/internal/processor_test.go`.
30+
package mocks
31+
32+
import (
33+
"context"
34+
"fmt"
35+
"sync"
36+
37+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
38+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
39+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
40+
typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
41+
)
42+
43+
// MockRegistryShard is a simple "stub-style" mock for testing.
44+
// Its methods are implemented as function fields (e.g., `IDFunc`). A test can inject behavior by setting the desired
45+
// function field in the test setup. If a func is nil, the method will return a zero value.
46+
type MockRegistryShard struct {
47+
IDFunc func() string
48+
IsActiveFunc func() bool
49+
ActiveManagedQueueFunc func(flowID string) (contracts.ManagedQueue, error)
50+
ManagedQueueFunc func(flowID string, priority uint) (contracts.ManagedQueue, error)
51+
IntraFlowDispatchPolicyFunc func(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error)
52+
InterFlowDispatchPolicyFunc func(priority uint) (framework.InterFlowDispatchPolicy, error)
53+
PriorityBandAccessorFunc func(priority uint) (framework.PriorityBandAccessor, error)
54+
AllOrderedPriorityLevelsFunc func() []uint
55+
StatsFunc func() contracts.ShardStats
56+
}
57+
58+
func (m *MockRegistryShard) ID() string {
59+
if m.IDFunc != nil {
60+
return m.IDFunc()
61+
}
62+
return ""
63+
}
64+
65+
func (m *MockRegistryShard) IsActive() bool {
66+
if m.IsActiveFunc != nil {
67+
return m.IsActiveFunc()
68+
}
69+
return false
70+
}
71+
72+
func (m *MockRegistryShard) ActiveManagedQueue(flowID string) (contracts.ManagedQueue, error) {
73+
if m.ActiveManagedQueueFunc != nil {
74+
return m.ActiveManagedQueueFunc(flowID)
75+
}
76+
return nil, nil
77+
}
78+
79+
func (m *MockRegistryShard) ManagedQueue(flowID string, priority uint) (contracts.ManagedQueue, error) {
80+
if m.ManagedQueueFunc != nil {
81+
return m.ManagedQueueFunc(flowID, priority)
82+
}
83+
return nil, nil
84+
}
85+
86+
func (m *MockRegistryShard) IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error) {
87+
if m.IntraFlowDispatchPolicyFunc != nil {
88+
return m.IntraFlowDispatchPolicyFunc(flowID, priority)
89+
}
90+
return nil, nil
91+
}
92+
93+
func (m *MockRegistryShard) InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error) {
94+
if m.InterFlowDispatchPolicyFunc != nil {
95+
return m.InterFlowDispatchPolicyFunc(priority)
96+
}
97+
return nil, nil
98+
}
99+
100+
func (m *MockRegistryShard) PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error) {
101+
if m.PriorityBandAccessorFunc != nil {
102+
return m.PriorityBandAccessorFunc(priority)
103+
}
104+
return nil, nil
105+
}
106+
107+
func (m *MockRegistryShard) AllOrderedPriorityLevels() []uint {
108+
if m.AllOrderedPriorityLevelsFunc != nil {
109+
return m.AllOrderedPriorityLevelsFunc()
110+
}
111+
return nil
112+
}
113+
114+
func (m *MockRegistryShard) Stats() contracts.ShardStats {
115+
if m.StatsFunc != nil {
116+
return m.StatsFunc()
117+
}
118+
return contracts.ShardStats{}
119+
}
120+
121+
// MockSaturationDetector is a simple "stub-style" mock for testing.
122+
type MockSaturationDetector struct {
123+
IsSaturatedFunc func(ctx context.Context) bool
124+
}
125+
126+
func (m *MockSaturationDetector) IsSaturated(ctx context.Context) bool {
127+
if m.IsSaturatedFunc != nil {
128+
return m.IsSaturatedFunc(ctx)
129+
}
130+
return false
131+
}
132+
133+
// MockManagedQueue is a high-fidelity, thread-safe mock of the `contracts.ManagedQueue` interface, designed
134+
// specifically for testing the concurrent `controller/internal.ShardProcessor`.
135+
//
136+
// This mock is essential for creating deterministic and focused unit tests. It allows for precise control over queue
137+
// behavior and enables the testing of critical edge cases (e.g., empty queues, dispatch failures) in complete
138+
// isolation, which would be difficult and unreliable to achieve with the concrete `registry.managedQueue`
139+
// implementation.
140+
//
141+
// ### Design Philosophy
142+
//
143+
// 1. **Stateful**: The mock maintains an internal map of items to accurately reflect a real queue's state. Its `Len()`
144+
// and `ByteSize()` methods are derived directly from this state.
145+
// 2. **Deadlock-Safe Overrides**: Test-specific logic (e.g., `AddFunc`) is executed instead of the default
146+
// implementation. The override function is fully responsible for its own logic and synchronization, as the mock's
147+
// internal mutex will *not* be held during its execution.
148+
// 3. **Self-Wiring**: The `FlowQueueAccessor()` method returns the mock itself, ensuring the accessor is always
149+
// correctly connected to the queue's state without manual wiring in tests.
150+
type MockManagedQueue struct {
151+
// FlowSpecV defines the flow specification for this mock queue. It should be set by the test.
152+
FlowSpecV types.FlowSpecification
153+
154+
// AddFunc allows a test to completely override the default Add behavior.
155+
AddFunc func(item types.QueueItemAccessor) error
156+
// RemoveFunc allows a test to completely override the default Remove behavior.
157+
RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error)
158+
// CleanupFunc allows a test to completely override the default Cleanup behavior.
159+
CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error)
160+
// DrainFunc allows a test to completely override the default Drain behavior.
161+
DrainFunc func() ([]types.QueueItemAccessor, error)
162+
163+
// mu protects access to the internal `items` map.
164+
mu sync.Mutex
165+
initOnce sync.Once
166+
items map[types.QueueItemHandle]types.QueueItemAccessor
167+
}
168+
169+
func (m *MockManagedQueue) init() {
170+
m.initOnce.Do(func() {
171+
m.items = make(map[types.QueueItemHandle]types.QueueItemAccessor)
172+
})
173+
}
174+
175+
// FlowQueueAccessor returns the mock itself, as it fully implements the `framework.FlowQueueAccessor` interface.
176+
func (m *MockManagedQueue) FlowQueueAccessor() framework.FlowQueueAccessor {
177+
return m
178+
}
179+
180+
// Add adds an item to the queue.
181+
// It checks for a test override before locking. If no override is present, it executes the default stateful logic,
182+
// which includes fulfilling the `SafeQueue.Add` contract.
183+
func (m *MockManagedQueue) Add(item types.QueueItemAccessor) error {
184+
// If an override is provided, it is responsible for the full contract, including setting the handle.
185+
if m.AddFunc != nil {
186+
return m.AddFunc(item)
187+
}
188+
189+
m.mu.Lock()
190+
defer m.mu.Unlock()
191+
m.init()
192+
193+
// Fulfill the `SafeQueue.Add` contract: the queue is responsible for setting the handle.
194+
if item.Handle() == nil {
195+
item.SetHandle(&typesmocks.MockQueueItemHandle{})
196+
}
197+
198+
m.items[item.Handle()] = item
199+
return nil
200+
}
201+
202+
// Remove removes an item from the queue. It checks for a test override before locking.
203+
func (m *MockManagedQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) {
204+
if m.RemoveFunc != nil {
205+
return m.RemoveFunc(handle)
206+
}
207+
m.mu.Lock()
208+
defer m.mu.Unlock()
209+
m.init()
210+
item, ok := m.items[handle]
211+
if !ok {
212+
return nil, fmt.Errorf("item with handle %v not found", handle)
213+
}
214+
delete(m.items, handle)
215+
return item, nil
216+
}
217+
218+
// Cleanup removes items matching a predicate. It checks for a test override before locking.
219+
func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) {
220+
if m.CleanupFunc != nil {
221+
return m.CleanupFunc(predicate)
222+
}
223+
m.mu.Lock()
224+
defer m.mu.Unlock()
225+
m.init()
226+
var removed []types.QueueItemAccessor
227+
for handle, item := range m.items {
228+
if predicate(item) {
229+
removed = append(removed, item)
230+
delete(m.items, handle)
231+
}
232+
}
233+
return removed, nil
234+
}
235+
236+
// Drain removes all items from the queue. It checks for a test override before locking.
237+
func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) {
238+
if m.DrainFunc != nil {
239+
return m.DrainFunc()
240+
}
241+
m.mu.Lock()
242+
defer m.mu.Unlock()
243+
m.init()
244+
drained := make([]types.QueueItemAccessor, 0, len(m.items))
245+
for _, item := range m.items {
246+
drained = append(drained, item)
247+
}
248+
m.items = make(map[types.QueueItemHandle]types.QueueItemAccessor)
249+
return drained, nil
250+
}
251+
252+
func (m *MockManagedQueue) FlowSpec() types.FlowSpecification { return m.FlowSpecV }
253+
func (m *MockManagedQueue) Name() string { return "" }
254+
func (m *MockManagedQueue) Capabilities() []framework.QueueCapability { return nil }
255+
func (m *MockManagedQueue) Comparator() framework.ItemComparator { return nil }
256+
257+
// Len returns the actual number of items currently in the mock queue.
258+
func (m *MockManagedQueue) Len() int {
259+
m.mu.Lock()
260+
defer m.mu.Unlock()
261+
m.init()
262+
return len(m.items)
263+
}
264+
265+
// ByteSize returns the actual total byte size of all items in the mock queue.
266+
func (m *MockManagedQueue) ByteSize() uint64 {
267+
m.mu.Lock()
268+
defer m.mu.Unlock()
269+
m.init()
270+
var size uint64
271+
for _, item := range m.items {
272+
size += item.OriginalRequest().ByteSize()
273+
}
274+
return size
275+
}
276+
277+
// PeekHead returns the first item found in the mock queue. Note: map iteration order is not guaranteed.
278+
func (m *MockManagedQueue) PeekHead() (types.QueueItemAccessor, error) {
279+
m.mu.Lock()
280+
defer m.mu.Unlock()
281+
m.init()
282+
for _, item := range m.items {
283+
return item, nil // Return first item found
284+
}
285+
return nil, nil // Queue is empty
286+
}
287+
288+
// PeekTail is not implemented for this mock.
289+
func (m *MockManagedQueue) PeekTail() (types.QueueItemAccessor, error) {
290+
return nil, nil
291+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package contracts
18+
19+
import "context"
20+
21+
// SaturationDetector defines the contract for a component that provides real-time load signals to the
22+
// `controller.FlowController`.
23+
//
24+
// This interface abstracts away the complexity of determining system load. An implementation would consume various
25+
// backend metrics (e.g., queue depths, KV cache utilization, observed latencies) and translate them into a simple
26+
// boolean signal.
27+
//
28+
// This decoupling is important because it allows the saturation detection logic to evolve independently of the core
29+
// `controller.FlowController` engine, which is only concerned with the final true/false signal.
30+
//
31+
// # Conformance
32+
//
33+
// Implementations MUST be goroutine-safe.
34+
type SaturationDetector interface {
35+
// IsSaturated returns true if the system's backend resources are considered saturated.
36+
// `controller.FlowController`'s dispatch workers call this method to decide whether to pause or throttle dispatch
37+
// operations to prevent overwhelming the backends.
38+
IsSaturated(ctx context.Context) bool
39+
}

0 commit comments

Comments
 (0)