Skip to content

Commit 4ffb5f6

Browse files
authored
Introduce pluggable intra-flow dispatch framework (#1139)
This commit introduces the `IntraFlowDispatchPolicy` framework, the second major component of the new pluggable flow control system. This framework decouples the logic for selecting a request from within a single flow's queue (temporal scheduling) from the underlying queue data structure. Key components include: - `framework.IntraFlowDispatchPolicy`: The core interface that defines the contract for selecting an item from a flow's queue. - `framework.FlowQueueAccessor`: A read-only interface that provides policies with safe access to queue state. - `RequiredQueueCapabilities`: A mechanism for policies to declare their queue requirements (e.g., FIFO, priority-ordered), which are validated by the registry. - A factory and registration system for discovering and instantiating policy plugins by name. - A comprehensive conformance test suite to validate the contract for all policy plugins. - A foundational `FCFS` (First-Come, First-Served) policy as the first reference implementation. This work builds directly on the `SafeQueue` framework, enabling the development of sophisticated, policy-driven request prioritization and scheduling.
1 parent 9a5491f commit 4ffb5f6

File tree

12 files changed

+559
-28
lines changed

12 files changed

+559
-28
lines changed

pkg/epp/flowcontrol/framework/doc.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ limitations under the License.
2020
// to. By building on these interfaces, the Flow Control system can be extended and customized without modifying the
2121
// core controller logic.
2222
//
23-
// The primary interfaces defined here are:
24-
// - `SafeQueue`: The contract for concurrent-safe queue implementations.
25-
// - `ItemComparator`: The contract for policy-driven logic that defines the relative priority of items within a
23+
// The primary contracts are:
24+
// - `SafeQueue`: An interface for concurrent-safe queue implementations.
25+
// - `IntraFlowDispatchPolicy`: An interface for policies that decide which item to select from within a single flow's
2626
// queue.
27+
// - `ItemComparator`: An interface vended by policies to make their internal item-ordering logic explicit and
28+
// available to other components.
29+
//
30+
// These components are linked by `QueueCapability`, which allows policies to declare their queue requirements (e.g.,
31+
// FIFO or priority-based ordering).
2732
package framework

pkg/epp/flowcontrol/framework/mocks/mocks.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package mocks
1818

1919
import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2122
)
2223

2324
// MockItemComparator provides a mock implementation of the `framework.ItemComparator` interface.
@@ -30,3 +31,35 @@ func (m *MockItemComparator) Func() framework.ItemComparatorFunc { return m.Func
3031
func (m *MockItemComparator) ScoreType() string { return m.ScoreTypeV }
3132

3233
var _ framework.ItemComparator = &MockItemComparator{}
34+
35+
// MockFlowQueueAccessor is a mock implementation of the `framework.FlowQueueAccessor` interface.
36+
type MockFlowQueueAccessor struct {
37+
NameV string
38+
CapabilitiesV []framework.QueueCapability
39+
LenV int
40+
ByteSizeV uint64
41+
PeekHeadV types.QueueItemAccessor
42+
PeekHeadErrV error
43+
PeekTailV types.QueueItemAccessor
44+
PeekTailErrV error
45+
FlowSpecV types.FlowSpecification
46+
ComparatorV framework.ItemComparator
47+
}
48+
49+
func (m *MockFlowQueueAccessor) Name() string { return m.NameV }
50+
func (m *MockFlowQueueAccessor) Capabilities() []framework.QueueCapability { return m.CapabilitiesV }
51+
func (m *MockFlowQueueAccessor) Len() int { return m.LenV }
52+
func (m *MockFlowQueueAccessor) ByteSize() uint64 { return m.ByteSizeV }
53+
54+
func (m *MockFlowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) {
55+
return m.PeekHeadV, m.PeekHeadErrV
56+
}
57+
58+
func (m *MockFlowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) {
59+
return m.PeekTailV, m.PeekTailErrV
60+
}
61+
62+
func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { return m.ComparatorV }
63+
func (m *MockFlowQueueAccessor) FlowSpec() types.FlowSpecification { return m.FlowSpecV }
64+
65+
var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Flow Controller Intra-Flow Dispatch Policy Plugins
2+
3+
This directory contains concrete implementations of the [`framework.IntraFlowDispatchPolicy`](../../../policies.go)
4+
interface. These policies are responsible for **temporal scheduling**: determining the order in which requests are
5+
selected for dispatch *from within a single flow's queue*.
6+
7+
## Overview
8+
9+
The `controller.FlowController` uses a two-tier policy system to manage requests. `framework.IntraFlowDispatchPolicy`
10+
plugins represent the first tier, making tactical decisions about the ordering of requests *within* a single logical
11+
flow (e.g., for a specific model or tenant).
12+
13+
This contrasts with the `framework.InterFlowDispatchPolicy` (not yet implemented), which is responsible for
14+
**spatial fairness**: deciding *which flow's queue* gets the next opportunity to dispatch a request. The
15+
`framework.IntraFlowDispatchPolicy` only operates *after* the inter-flow policy has selected a specific queue.
16+
17+
Key responsibilities and characteristics of a `framework.IntraFlowDispatchPolicy`:
18+
19+
1. **Request Selection (`SelectItem`)**: The primary method, `SelectItem(queue framework.FlowQueueAccessor)`, inspects
20+
the given flow's queue (via a read-only accessor) and decides which item, if any, should be dispatched next from
21+
*that specific queue*.
22+
23+
2. **Priority Definition (`ItemComparator`)**:
24+
- This policy type is unique because it defines the nature of priority for items *within its specific managed
25+
queue*. It makes this logic explicit by vending a [`framework.ItemComparator`](../../../policies.go).
26+
- The vended comparator defines the "less than" relationship between two items and exposes a `ScoreType()` string
27+
(e.g., `"enqueue_time_ns_asc"`, `"slo_deadline_urgency"`) that gives a semantic meaning to the comparison.
28+
29+
3. **Queue Compatibility (`RequiredQueueCapabilities`)**: The policy specifies the capabilities its associated
30+
[`framework.SafeQueue`](../../../queue.go) must support for it to function correctly. For example, a simple FCFS
31+
policy would require `framework.CapabilityFIFO`, while a more complex, priority-based policy would require
32+
`framework.CapabilityPriorityConfigurable`. The `ports.FlowRegistry` uses this information to pair policies with
33+
compatible queues.
34+
35+
The `framework.IntraFlowDispatchPolicy` allows for fine-grained control over how individual requests within a single flow are
36+
serviced, enabling strategies like basic FCFS or more advanced schemes based on SLOs or deadlines.
37+
38+
## Contributing a New `framework.IntraFlowDispatchPolicy` Implementation
39+
40+
To contribute a new dispatch policy implementation, follow these steps:
41+
42+
1. **Define Your Implementation**
43+
- Create a new Go package in a subdirectory (e.g., `mycustompolicy/`).
44+
- Implement the `framework.IntraFlowDispatchPolicy` interface.
45+
- Ensure all methods are goroutine-safe if your policy maintains any internal state.
46+
47+
2. **Register Your Policy**
48+
- In an `init()` function within your policy's Go file, call [`MustRegisterPolicy()`](./factory.go) with a
49+
unique name and a constructor function that matches the `PolicyConstructor` signature.
50+
51+
3. **Add to the Functional Test**
52+
- Add a blank import for your new package to [`functional_test.go`](./functional_test.go). Your policy will then
53+
be automatically included in the functional test suite, which validates the basic
54+
`framework.IntraFlowDispatchPolicy` contract (e.g., correct initialization, handling of nil/empty queues).
55+
56+
4. **Add Policy-Specific Tests**
57+
- The functional test suite only validates the universal contract. You MUST add a separate `_test.go` file within
58+
your package to test the specific logic of your policy.
59+
- For example, your tests should validate that your `Comparator()` works as expected and that `SelectItem()`
60+
correctly implements your desired selection logic for a non-empty queue.
61+
62+
5. **Documentation**
63+
- Add a package-level GoDoc comment to your new policy's Go file, explaining its behavior and any trade-offs.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package dispatch provides the factory and registration mechanism for all `framework.IntraFlowDispatchPolicy`
18+
// implementations.
19+
// It allows new policies to be added to the system and instantiated by name.
20+
package dispatch
21+
22+
import (
23+
"fmt"
24+
"sync"
25+
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
27+
)
28+
29+
// RegisteredPolicyName is the unique name under which a policy is registered.
30+
type RegisteredPolicyName string
31+
32+
// PolicyConstructor defines the function signature for creating a `framework.IntraFlowDispatchPolicy`.
33+
type PolicyConstructor func() (framework.IntraFlowDispatchPolicy, error)
34+
35+
var (
36+
// mu guards the registration map.
37+
mu sync.RWMutex
38+
// RegisteredPolicies stores the constructors for all registered policies.
39+
RegisteredPolicies = make(map[RegisteredPolicyName]PolicyConstructor)
40+
)
41+
42+
// MustRegisterPolicy registers a policy constructor, and panics if the name is already registered.
43+
// This is intended to be called from the `init()` function of a policy implementation.
44+
func MustRegisterPolicy(name RegisteredPolicyName, constructor PolicyConstructor) {
45+
mu.Lock()
46+
defer mu.Unlock()
47+
if _, ok := RegisteredPolicies[name]; ok {
48+
panic(fmt.Sprintf("IntraFlowDispatchPolicy already registered with name %q", name))
49+
}
50+
RegisteredPolicies[name] = constructor
51+
}
52+
53+
// NewPolicyFromName creates a new `IntraFlowDispatchPolicy` given its registered name.
54+
// This is called by the `registry.FlowRegistry` when configuring a flow.
55+
func NewPolicyFromName(name RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
56+
mu.RLock()
57+
defer mu.RUnlock()
58+
constructor, ok := RegisteredPolicies[name]
59+
if !ok {
60+
return nil, fmt.Errorf("no IntraFlowDispatchPolicy registered with name %q", name)
61+
}
62+
return constructor()
63+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package fcfs provides a First-Come, First-Served implementation of the `framework.IntraFlowDispatchPolicy`.
18+
package fcfs
19+
20+
import (
21+
"errors"
22+
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
26+
)
27+
28+
// FCFSPolicyName is the name of the FCFS policy implementation.
29+
const FCFSPolicyName = "FCFS"
30+
31+
func init() {
32+
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(FCFSPolicyName),
33+
func() (framework.IntraFlowDispatchPolicy, error) {
34+
return newFCFS(), nil
35+
})
36+
}
37+
38+
// fcfs (First-Come, First-Served) implements the `framework.IntraFlowDispatchPolicy` interface.
39+
type fcfs struct {
40+
comparator framework.ItemComparator
41+
}
42+
43+
// newFCFS creates a new `fcfs` policy instance.
44+
func newFCFS() *fcfs {
45+
return &fcfs{
46+
comparator: &enqueueTimeComparator{},
47+
}
48+
}
49+
50+
// Name returns the name of the policy.
51+
func (p *fcfs) Name() string {
52+
return FCFSPolicyName
53+
}
54+
55+
// SelectItem selects the next item from the queue by peeking its head. This implementation relies on the queue being
56+
// ordered by dispatch preference, as indicated by its `RequiredQueueCapabilities`.
57+
func (p *fcfs) SelectItem(queue framework.FlowQueueAccessor) (types.QueueItemAccessor, error) {
58+
if queue == nil {
59+
return nil, nil
60+
}
61+
item, err := queue.PeekHead()
62+
if errors.Is(err, framework.ErrQueueEmpty) {
63+
return nil, nil
64+
}
65+
return item, err
66+
}
67+
68+
// Comparator returns a `framework.ItemComparator` based on enqueue time.
69+
func (p *fcfs) Comparator() framework.ItemComparator {
70+
return p.comparator
71+
}
72+
73+
// RequiredQueueCapabilities specifies that this policy needs a queue that supports FIFO operations.
74+
func (p *fcfs) RequiredQueueCapabilities() []framework.QueueCapability {
75+
return []framework.QueueCapability{framework.CapabilityFIFO}
76+
}
77+
78+
// --- enqueueTimeComparator ---
79+
80+
// enqueueTimeComparator implements `framework.ItemComparator` for FCFS logic.
81+
// It prioritizes items with earlier enqueue times.
82+
type enqueueTimeComparator struct{}
83+
84+
// Func returns the comparison logic.
85+
// It returns true if item 'a' should be dispatched before item 'b'.
86+
func (c *enqueueTimeComparator) Func() framework.ItemComparatorFunc {
87+
return func(a, b types.QueueItemAccessor) bool {
88+
if a == nil && b == nil {
89+
return false
90+
}
91+
if a == nil { // Treat nil as lowest priority
92+
return false
93+
}
94+
if b == nil { // Treat non-nil 'a' as higher priority than nil 'b'
95+
return true
96+
}
97+
return a.EnqueueTime().Before(b.EnqueueTime())
98+
}
99+
}
100+
101+
// ScoreType returns a string descriptor for the comparison logic.
102+
func (c *enqueueTimeComparator) ScoreType() string {
103+
return string(framework.EnqueueTimePriorityScoreType)
104+
}

0 commit comments

Comments
 (0)