Skip to content

Commit 5870546

Browse files
LukeAVanDriekfswain
authored andcommitted
feat: Introduce pluggable inter-flow framework (kubernetes-sigs#1167)
This commit introduces the `InterFlowDispatchPolicy` framework, the third and final major component of the new pluggable flow control system. This framework decouples the logic for selecting which flow's queue to service next (fairness between flows) from the core controller. This completes the two-tier policy model, where the `InterFlowDispatchPolicy` makes the strategic decision about which flow to service, and the `IntraFlowDispatchPolicy` makes the tactical decision of which request to select from within that flow's queue. Key components include: - `framework.InterFlowDispatchPolicy`: The core interface that defines the contract for selecting a queue from a priority band. - `framework.PriorityBandAccessor`: A read-only interface that provides policies with safe access to the state of all queues within a priority level. - A factory and registration system for discovering and instantiating policy plugins by name. - A comprehensive functional conformance test suite to validate the contract for all policy plugins. - A `roundrobin` policy for fair, sequential queue selection. - A `besthead` policy for greedy, utilization-focused queue selection.
1 parent 9c9bc6f commit 5870546

File tree

12 files changed

+1166
-7
lines changed

12 files changed

+1166
-7
lines changed

pkg/epp/flowcontrol/framework/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,11 @@ var (
4242
// provided, valid `types.QueueItemHandle`. This can occur if the item was removed by a concurrent operation.
4343
ErrQueueItemNotFound = errors.New("queue item not found for the given handle")
4444
)
45+
46+
// Policy Errors
47+
var (
48+
// ErrIncompatiblePriorityType indicates that an `InterFlowDispatchPolicy` (like "BestHead") attempted to compare
49+
// items from two different flow queues whose `ItemComparator`s have different `ScoreType` values, making a
50+
// meaningful comparison impossible.
51+
ErrIncompatiblePriorityType = errors.New("incompatible priority score type for comparison")
52+
)

pkg/epp/flowcontrol/framework/mocks/mocks.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,42 @@ func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { return m
6363
func (m *MockFlowQueueAccessor) FlowSpec() types.FlowSpecification { return m.FlowSpecV }
6464

6565
var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{}
66+
67+
// MockPriorityBandAccessor is a mock implementation of the `framework.PriorityBandAccessor` interface.
68+
type MockPriorityBandAccessor struct {
69+
PriorityV uint
70+
PriorityNameV string
71+
FlowIDsV []string
72+
QueueV framework.FlowQueueAccessor // Value to return for any Queue(flowID) call
73+
QueueFuncV func(flowID string) framework.FlowQueueAccessor
74+
IterateQueuesV func(callback func(queue framework.FlowQueueAccessor) bool)
75+
}
76+
77+
func (m *MockPriorityBandAccessor) Priority() uint { return m.PriorityV }
78+
func (m *MockPriorityBandAccessor) PriorityName() string { return m.PriorityNameV }
79+
func (m *MockPriorityBandAccessor) FlowIDs() []string { return m.FlowIDsV }
80+
81+
func (m *MockPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAccessor {
82+
if m.QueueFuncV != nil {
83+
return m.QueueFuncV(flowID)
84+
}
85+
return m.QueueV
86+
}
87+
88+
func (m *MockPriorityBandAccessor) IterateQueues(callback func(queue framework.FlowQueueAccessor) bool) {
89+
if m.IterateQueuesV != nil {
90+
m.IterateQueuesV(callback)
91+
} else {
92+
// Default behavior: iterate based on FlowIDsV and QueueV/QueueFuncV
93+
for _, id := range m.FlowIDsV {
94+
q := m.Queue(id)
95+
if q != nil { // Only call callback if queue exists
96+
if !callback(q) {
97+
return
98+
}
99+
}
100+
}
101+
}
102+
}
103+
104+
var _ framework.PriorityBandAccessor = &MockPriorityBandAccessor{}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Flow Controller Inter-Flow Dispatch Policy Plugins
2+
3+
This directory contains concrete implementations of the [`framework.InterFlowDispatchPolicy`](../../../policies.go)
4+
interface. These policies are responsible for determining *which flow's queue* gets the next opportunity to dispatch a
5+
request to the scheduler.
6+
7+
## Overview
8+
9+
The `controller.FlowController` uses a two-tier policy system to manage requests. `framework.InterFlowDispatchPolicy`
10+
plugins represent the second tier, making strategic decisions about fairness *between* different logical flows (e.g.,
11+
between different tenants or models).
12+
13+
This contrasts with the `framework.IntraFlowDispatchPolicy`, which is responsible for **temporal scheduling**: deciding
14+
the order of requests *within* a single flow's queue after it has been selected by the inter-flow policy.
15+
16+
Key responsibilities and characteristics of a `framework.InterFlowDispatchPolicy`:
17+
18+
1. **Queue Selection (`SelectQueue`)**: The primary method, `SelectQueue(band framework.PriorityBandAccessor)`,
19+
inspects a set of queues within a single priority level (a "band") and decides which queue, if any, should be
20+
selected to dispatch a request from next.
21+
22+
2. **Fairness Across Flows**: The core purpose of this policy is to enforce a fairness doctrine across multiple
23+
competing flows. This could be simple round-robin, or more complex weighted fairness schemes.
24+
25+
3. **Stateless vs. Stateful**: Policies can be stateless (like `besthead`, which makes a decision based only on the
26+
current state of the queues) or stateful (like `roundrobin`, which needs to remember which queue it selected last).
27+
Any state must be managed in a goroutine-safe manner.
28+
29+
The `framework.InterFlowDispatchPolicy` is critical for multi-tenancy and preventing any single high-traffic flow from
30+
starving all others.
31+
32+
## Contributing a New `framework.InterFlowDispatchPolicy` Implementation
33+
34+
To contribute a new dispatch policy implementation, follow these steps:
35+
36+
1. **Define Your Implementation**
37+
- Create a new Go package in a subdirectory (e.g., `mycustompolicy/`).
38+
- Implement the `framework.InterFlowDispatchPolicy` interface.
39+
- Ensure all methods are goroutine-safe if your policy maintains any internal state.
40+
41+
2. **Register Your Policy**
42+
- In an `init()` function within your policy's Go file, call [`MustRegisterPolicy()`](./factory.go) with a unique
43+
name and a constructor function that matches the `PolicyConstructor` signature.
44+
45+
3. **Add to the Functional Test**
46+
- Add a blank import for your new package to [`functional_test.go`](./functional_test.go). Your policy will then
47+
be automatically included in the functional test suite, which validates the basic
48+
`framework.InterFlowDispatchPolicy` contract (e.g., correct initialization, handling of nil/empty bands).
49+
50+
4. **Add Policy-Specific Tests**
51+
- The functional test suite only validates the universal contract. You MUST add a separate `_test.go` file within
52+
your package to test the specific logic of your policy.
53+
- For example, your tests should validate that `SelectQueue()` correctly implements your desired selection logic
54+
(e.g., that round-robin correctly cycles through queues).
55+
56+
5. **Documentation**
57+
- Add a package-level GoDoc comment to your new policy's Go file, explaining its behavior and any trade-offs.
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package besthead provides a `framework.InterFlowDispatchPolicy` that selects the queue containing the single "best"
18+
// item from across all queues in a priority band.
19+
package besthead
20+
21+
import (
22+
"fmt"
23+
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
27+
)
28+
29+
// BestHeadPolicyName is the name of the Best Head policy implementation.
30+
const BestHeadPolicyName = "BestHead"
31+
32+
func init() {
33+
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(BestHeadPolicyName),
34+
func() (framework.InterFlowDispatchPolicy, error) {
35+
return newBestHead(), nil
36+
})
37+
}
38+
39+
type bestHead struct{}
40+
41+
func newBestHead() *bestHead {
42+
return &bestHead{}
43+
}
44+
45+
// Name returns the name of the policy.
46+
func (p *bestHead) Name() string {
47+
return BestHeadPolicyName
48+
}
49+
50+
// SelectQueue implements a greedy strategy that bypasses fairness concerns to select the queue containing the single
51+
// "best" item from across all queues in the priority band. It iterates through all non-empty queues, peeks at their
52+
// head items, and uses the `framework.ItemComparator` from each queue to find the highest-priority item overall.
53+
//
54+
// This policy is useful for maximizing utilization when fairness between flows is not a concern. It requires that all
55+
// queues being compared have a compatible `framework.ScoreType` to ensure the comparison is meaningful. If an
56+
// incompatible comparator is found, the selection fails with an error.
57+
func (p *bestHead) SelectQueue(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error) {
58+
if band == nil {
59+
return nil, nil
60+
}
61+
62+
var bestQueue framework.FlowQueueAccessor
63+
var bestItem types.QueueItemAccessor
64+
65+
var iterationErr error
66+
band.IterateQueues(func(queue framework.FlowQueueAccessor) (keepIterating bool) {
67+
if queue == nil || queue.Len() == 0 {
68+
return true
69+
}
70+
71+
item, err := queue.PeekHead()
72+
if err != nil || item == nil {
73+
return true
74+
}
75+
76+
if bestQueue == nil {
77+
bestQueue = queue
78+
bestItem = item
79+
return true
80+
}
81+
82+
if queue.Comparator().ScoreType() != bestQueue.Comparator().ScoreType() {
83+
iterationErr = fmt.Errorf("%w: expected %q, got %q", framework.ErrIncompatiblePriorityType,
84+
bestQueue.Comparator().ScoreType(), queue.Comparator().ScoreType())
85+
return false
86+
}
87+
88+
if bestQueue.Comparator().Func()(item, bestItem) {
89+
bestQueue = queue
90+
bestItem = item
91+
}
92+
return true
93+
})
94+
95+
if iterationErr != nil {
96+
return nil, iterationErr
97+
}
98+
return bestQueue, nil
99+
}

0 commit comments

Comments
 (0)