Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/epp/flowcontrol/contracts/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"fmt"
"sync"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
Expand Down Expand Up @@ -112,12 +113,12 @@ func (m *MockRegistryShard) Stats() contracts.ShardStats {

// MockSaturationDetector is a simple "stub-style" mock for testing.
type MockSaturationDetector struct {
IsSaturatedFunc func(ctx context.Context) bool
IsSaturatedFunc func(ctx context.Context, candidatePods []metrics.PodMetrics) bool
}

func (m *MockSaturationDetector) IsSaturated(ctx context.Context) bool {
func (m *MockSaturationDetector) IsSaturated(ctx context.Context, candidatePods []metrics.PodMetrics) bool {
if m.IsSaturatedFunc != nil {
return m.IsSaturatedFunc(ctx)
return m.IsSaturatedFunc(ctx, candidatePods)
}
return false
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/epp/flowcontrol/contracts/saturationdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ limitations under the License.

package contracts

import "context"
import (
"context"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
)

// SaturationDetector defines the contract for a component that provides real-time load signals to the
// `controller.FlowController`.
Expand All @@ -32,8 +36,8 @@ import "context"
//
// Implementations MUST be goroutine-safe.
type SaturationDetector interface {
// IsSaturated returns true if the system's backend resources are considered saturated.
// IsSaturated returns true if the system's backend resources are considered saturated for a set of candidate pods.
// `controller.FlowController`'s dispatch workers call this method to decide whether to pause or throttle dispatch
// operations to prevent overwhelming the backends.
IsSaturated(ctx context.Context) bool
IsSaturated(ctx context.Context, candidatePods []metrics.PodMetrics) bool
}
9 changes: 4 additions & 5 deletions pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type shardProcessor interface {
// This enables dependency injection for testing.
type shardProcessorFactory func(
shard contracts.RegistryShard,
dispatchFilter internal.BandFilter,
saturationDetector contracts.SaturationDetector,
clock clock.Clock,
expiryCleanupInterval time.Duration,
enqueueChannelBufferSize int,
Expand Down Expand Up @@ -130,15 +130,15 @@ func NewFlowController(
// Use the real shard processor implementation by default.
fc.shardProcessorFactory = func(
shard contracts.RegistryShard,
dispatchFilter internal.BandFilter,
saturationDetector contracts.SaturationDetector,
clock clock.Clock,
expiryCleanupInterval time.Duration,
enqueueChannelBufferSize int,
logger logr.Logger,
) shardProcessor {
return internal.NewShardProcessor(
shard,
dispatchFilter,
saturationDetector,
clock,
expiryCleanupInterval,
enqueueChannelBufferSize,
Expand Down Expand Up @@ -310,10 +310,9 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag

// Construct a new worker, but do not start its processor goroutine yet.
processorCtx, cancel := context.WithCancel(fc.parentCtx)
dispatchFilter := internal.NewSaturationFilter(fc.saturationDetector)
processor := fc.shardProcessorFactory(
shard,
dispatchFilter,
fc.saturationDetector,
fc.clock,
fc.config.ExpiryCleanupInterval,
fc.config.EnqueueChannelBufferSize,
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ type mockShardProcessorFactory struct {

func (f *mockShardProcessorFactory) new(
shard contracts.RegistryShard,
_ internal.BandFilter,
_ contracts.SaturationDetector,
_ clock.Clock,
_ time.Duration,
_ int,
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestFlowController_Lifecycle(t *testing.T) {
h := newUnitHarness(t, t.Context(), Config{}, &mockRegistryClient{})
h.fc.shardProcessorFactory = func(
shard contracts.RegistryShard,
_ internal.BandFilter,
_ contracts.SaturationDetector,
_ clock.Clock,
_ time.Duration,
_ int,
Expand Down
144 changes: 0 additions & 144 deletions pkg/epp/flowcontrol/controller/internal/filter.go

This file was deleted.

Loading