Skip to content

Commit 3211b34

Browse files
authored
feat: Adapt flow control to per-request saturation (#1622)
This commit refactors the flow control `ShardProcessor` to align with the new `SaturationDetector` contract (introduced in 7d84fb9), which evaluates saturation for a specific set of candidate pods rather than for the entire pool. This change fundamentally alters the dispatching logic to prioritize strict fairness and priority over work conservation. The `BandFilter` abstraction has been removed, and the `ShardProcessor` now performs a post-selection viability check. After policies select the fairest request, the `SaturationDetector` is called with the candidate pods for only that specific request. If the check fails, the processor stops the entire dispatch cycle for the current tick, enforcing Head-of-Line blocking to prevent priority inversion. This new model correctly upholds a strict fairness and priority contract. However, it introduces a known trade-off where the system may leave resources idle if the fairest request is blocked, rather than finding other viable work (the "noisy neighbor" problem).
1 parent f1e4bd3 commit 3211b34

File tree

10 files changed

+95
-463
lines changed

10 files changed

+95
-463
lines changed

pkg/epp/flowcontrol/contracts/mocks/mocks.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"fmt"
3535
"sync"
3636

37+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3738
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
3839
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
3940
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
@@ -112,12 +113,12 @@ func (m *MockRegistryShard) Stats() contracts.ShardStats {
112113

113114
// MockSaturationDetector is a simple "stub-style" mock for testing.
114115
type MockSaturationDetector struct {
115-
IsSaturatedFunc func(ctx context.Context) bool
116+
IsSaturatedFunc func(ctx context.Context, candidatePods []metrics.PodMetrics) bool
116117
}
117118

118-
func (m *MockSaturationDetector) IsSaturated(ctx context.Context) bool {
119+
func (m *MockSaturationDetector) IsSaturated(ctx context.Context, candidatePods []metrics.PodMetrics) bool {
119120
if m.IsSaturatedFunc != nil {
120-
return m.IsSaturatedFunc(ctx)
121+
return m.IsSaturatedFunc(ctx, candidatePods)
121122
}
122123
return false
123124
}

pkg/epp/flowcontrol/contracts/saturationdetector.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ limitations under the License.
1616

1717
package contracts
1818

19-
import "context"
19+
import (
20+
"context"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
23+
)
2024

2125
// SaturationDetector defines the contract for a component that provides real-time load signals to the
2226
// `controller.FlowController`.
@@ -32,8 +36,8 @@ import "context"
3236
//
3337
// Implementations MUST be goroutine-safe.
3438
type SaturationDetector interface {
35-
// IsSaturated returns true if the system's backend resources are considered saturated.
39+
// IsSaturated returns true if the system's backend resources are considered saturated for a set of candidate pods.
3640
// `controller.FlowController`'s dispatch workers call this method to decide whether to pause or throttle dispatch
3741
// operations to prevent overwhelming the backends.
38-
IsSaturated(ctx context.Context) bool
42+
IsSaturated(ctx context.Context, candidatePods []metrics.PodMetrics) bool
3943
}

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type shardProcessor interface {
5858
// This enables dependency injection for testing.
5959
type shardProcessorFactory func(
6060
shard contracts.RegistryShard,
61-
dispatchFilter internal.BandFilter,
61+
saturationDetector contracts.SaturationDetector,
6262
clock clock.Clock,
6363
expiryCleanupInterval time.Duration,
6464
enqueueChannelBufferSize int,
@@ -130,15 +130,15 @@ func NewFlowController(
130130
// Use the real shard processor implementation by default.
131131
fc.shardProcessorFactory = func(
132132
shard contracts.RegistryShard,
133-
dispatchFilter internal.BandFilter,
133+
saturationDetector contracts.SaturationDetector,
134134
clock clock.Clock,
135135
expiryCleanupInterval time.Duration,
136136
enqueueChannelBufferSize int,
137137
logger logr.Logger,
138138
) shardProcessor {
139139
return internal.NewShardProcessor(
140140
shard,
141-
dispatchFilter,
141+
saturationDetector,
142142
clock,
143143
expiryCleanupInterval,
144144
enqueueChannelBufferSize,
@@ -310,10 +310,9 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
310310

311311
// Construct a new worker, but do not start its processor goroutine yet.
312312
processorCtx, cancel := context.WithCancel(fc.parentCtx)
313-
dispatchFilter := internal.NewSaturationFilter(fc.saturationDetector)
314313
processor := fc.shardProcessorFactory(
315314
shard,
316-
dispatchFilter,
315+
fc.saturationDetector,
317316
fc.clock,
318317
fc.config.ExpiryCleanupInterval,
319318
fc.config.EnqueueChannelBufferSize,

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ type mockShardProcessorFactory struct {
209209

210210
func (f *mockShardProcessorFactory) new(
211211
shard contracts.RegistryShard,
212-
_ internal.BandFilter,
212+
_ contracts.SaturationDetector,
213213
_ clock.Clock,
214214
_ time.Duration,
215215
_ int,
@@ -640,7 +640,7 @@ func TestFlowController_Lifecycle(t *testing.T) {
640640
h := newUnitHarness(t, t.Context(), Config{}, &mockRegistryClient{})
641641
h.fc.shardProcessorFactory = func(
642642
shard contracts.RegistryShard,
643-
_ internal.BandFilter,
643+
_ contracts.SaturationDetector,
644644
_ clock.Clock,
645645
_ time.Duration,
646646
_ int,

pkg/epp/flowcontrol/controller/internal/filter.go

Lines changed: 0 additions & 144 deletions
This file was deleted.

0 commit comments

Comments
 (0)