diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index d8e21c416..765f1d5c9 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -331,7 +331,7 @@ func (r *Runner) Run(ctx context.Context) error { scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig) - saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, setupLog) + saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog) director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 3ba28183a..b640e53c0 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -30,10 +30,10 @@ import ( "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/log" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" @@ -43,6 +43,13 @@ import ( requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" ) +// Datastore defines the interface required by the Director. +type Datastore interface { + PoolGet() (*v1.InferencePool, error) + ObjectiveGet(modelName string) *v1alpha2.InferenceObjective + PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics +} + // Scheduler defines the interface required by the Director for scheduling. type Scheduler interface { Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error) @@ -50,11 +57,11 @@ type Scheduler interface { // SaturationDetector provides a signal indicating whether the backends are considered saturated. type SaturationDetector interface { - IsSaturated(ctx context.Context) bool + IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool } // NewDirectorWithConfig creates a new Director instance with all dependencies. -func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director { +func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director { return &Director{ datastore: datastore, scheduler: scheduler, @@ -66,24 +73,19 @@ func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, s // Director orchestrates the request handling flow, including scheduling. type Director struct { - datastore datastore.Datastore + datastore Datastore scheduler Scheduler saturationDetector SaturationDetector preRequestPlugins []PreRequest postResponsePlugins []PostResponse } -// HandleRequest orchestrates the request lifecycle: -// 1. Parses request details. -// 2. Calls admitRequest for admission control. -// 3. Calls Scheduler.Schedule if request is approved. -// 4. Calls prepareRequest to populate RequestContext with result and call PreRequest plugins. -// +// HandleRequest orchestrates the request lifecycle. // It always returns the requestContext even in the error case, as the request context is used in error handling. func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { logger := log.FromContext(ctx) - // --- 1. Parse Request, Resolve Target Models, and Determine Parameters --- + // Parse Request, Resolve Target Models, and Determine Parameters requestBodyMap := reqCtx.Request.Body var ok bool reqCtx.IncomingModelName, ok = requestBodyMap["model"].(string) @@ -134,22 +136,23 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo ctx = log.IntoContext(ctx, logger) logger.V(logutil.DEBUG).Info("LLM request assembled") - // --- 2. Admission Control check -- - if err := d.admitRequest(ctx, *infObjective.Spec.Criticality, reqCtx.FairnessID); err != nil { - return reqCtx, err - } - - // --- 3. Call Scheduler (with the relevant candidate pods) --- + // Get candidate pods for scheduling candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata) if len(candidatePods) == 0 { return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"} } - result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods) + + // Admission Control check + if err := d.admitRequest(ctx, candidatePods, *infObjective.Spec.Criticality, reqCtx.FairnessID); err != nil { + return reqCtx, err + } + + result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, d.toSchedulerPodMetrics(candidatePods)) if err != nil { return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} } - // --- 4. Prepare Request (Populates RequestContext and call PreRequest plugins) --- + // Prepare Request (Populates RequestContext and call PreRequest plugins) // Insert target endpoint to instruct Envoy to route requests to the specified target pod and attach the port number. // Invoke PreRequest registered plugins. reqCtx, err = d.prepareRequest(ctx, reqCtx, result) @@ -161,8 +164,9 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo } // admitRequest handles admission control to decide whether or not to accept the request -// based on the request criticality and system saturation state. -func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2.Criticality, fairnessID string) error { +// based on the request criticality and the saturation state of the candidate pods. +func (d *Director) admitRequest(ctx context.Context, candidatePods []backendmetrics.PodMetrics, + requestCriticality v1alpha2.Criticality, fairnessID string) error { logger := log.FromContext(ctx) logger.V(logutil.TRACE).Info("Entering Flow Control", "criticality", requestCriticality, "fairnessID", fairnessID) @@ -173,7 +177,7 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2 } logger.V(logutil.DEBUG).Info("Performing saturation check for non-critical request.") - if d.saturationDetector.IsSaturated(ctx) { // Assuming non-nil Saturation Detector + if d.saturationDetector.IsSaturated(ctx, candidatePods) { return errutil.Error{ Code: errutil.InferencePoolResourceExhausted, Msg: "system saturated, non-critical request dropped", @@ -189,21 +193,21 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2 // Snapshot pod metrics from the datastore to: // 1. Reduce concurrent access to the datastore. // 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles. -func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []schedulingtypes.Pod { +func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any) if !found { - return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate)) + return d.datastore.PodList(backendmetrics.AllPodPredicate) } // Check if endpoint key is present in the subset map and ensure there is at least one value endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any) if !found { - return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate)) + return d.datastore.PodList(backendmetrics.AllPodPredicate) } else if len(endpointSubsetList) == 0 { loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") - return []schedulingtypes.Pod{} + return []backendmetrics.PodMetrics{} } // Create a map of endpoint addresses for easy lookup @@ -226,7 +230,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList)) - return d.toSchedulerPodMetrics(podFitleredList) + return podFitleredList } // prepareRequest populates the RequestContext and calls the registered PreRequest plugins diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 3a98b28a3..2b5cd1f88 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client/fake" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" @@ -54,7 +55,7 @@ type mockSaturationDetector struct { isSaturated bool } -func (m *mockSaturationDetector) IsSaturated(_ context.Context) bool { +func (m *mockSaturationDetector) IsSaturated(_ context.Context, _ []backendmetrics.PodMetrics) bool { return m.isSaturated } @@ -67,6 +68,23 @@ func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMReques return m.scheduleResults, m.scheduleErr } +type mockDatastore struct { + pods []backendmetrics.PodMetrics +} + +func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { return nil, nil } +func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { return nil } +func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics { + res := []backendmetrics.PodMetrics{} + for _, pod := range ds.pods { + if predicate(pod) { + res = append(res, pod) + } + } + + return res +} + func TestDirector_HandleRequest(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) @@ -442,119 +460,72 @@ func TestGetCandidatePodsForScheduling(t *testing.T) { } } - testInput := []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - }, - Status: corev1.PodStatus{ - PodIP: "10.0.0.1", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pod2", - }, - Status: corev1.PodStatus{ - PodIP: "10.0.0.2", - }, - }, - } - - outputPod1 := &backend.Pod{ + pod1 := &backend.Pod{ NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "10.0.0.1", Labels: map[string]string{}, } - outputPod2 := &backend.Pod{ + pod2 := &backend.Pod{ NamespacedName: types.NamespacedName{Name: "pod2"}, Address: "10.0.0.2", Labels: map[string]string{}, } + testInput := []backendmetrics.PodMetrics{ + &backendmetrics.FakePodMetrics{Pod: pod1}, + &backendmetrics.FakePodMetrics{Pod: pod2}, + } + tests := []struct { name string metadata map[string]any - output []schedulingtypes.Pod + output []backendmetrics.PodMetrics }{ { name: "SubsetFilter, filter not present — return all pods", metadata: map[string]any{}, - output: []schedulingtypes.Pod{ - &schedulingtypes.PodMetrics{ - Pod: outputPod1, - MetricsState: backendmetrics.NewMetricsState(), - }, - &schedulingtypes.PodMetrics{ - Pod: outputPod2, - MetricsState: backendmetrics.NewMetricsState(), - }, - }, + output: testInput, }, { name: "SubsetFilter, namespace present filter not present — return all pods", metadata: map[string]any{metadata.SubsetFilterNamespace: map[string]any{}}, - output: []schedulingtypes.Pod{ - &schedulingtypes.PodMetrics{ - Pod: outputPod1, - MetricsState: backendmetrics.NewMetricsState(), - }, - &schedulingtypes.PodMetrics{ - Pod: outputPod2, - MetricsState: backendmetrics.NewMetricsState(), - }, - }, + output: testInput, }, { name: "SubsetFilter, filter present with empty list — return error", metadata: makeFilterMetadata([]any{}), - output: []schedulingtypes.Pod{}, + output: []backendmetrics.PodMetrics{}, }, { name: "SubsetFilter, subset with one matching pod", metadata: makeFilterMetadata([]any{"10.0.0.1"}), - output: []schedulingtypes.Pod{ - &schedulingtypes.PodMetrics{ - Pod: outputPod1, - MetricsState: backendmetrics.NewMetricsState(), + output: []backendmetrics.PodMetrics{ + &backendmetrics.FakePodMetrics{ + Pod: pod1, }, }, }, { name: "SubsetFilter, subset with multiple matching pods", metadata: makeFilterMetadata([]any{"10.0.0.1", "10.0.0.2", "10.0.0.3"}), - output: []schedulingtypes.Pod{ - &schedulingtypes.PodMetrics{ - Pod: outputPod1, - MetricsState: backendmetrics.NewMetricsState(), - }, - &schedulingtypes.PodMetrics{ - Pod: outputPod2, - MetricsState: backendmetrics.NewMetricsState(), - }, - }, + output: testInput, }, { name: "SubsetFilter, subset with no matching pods", metadata: makeFilterMetadata([]any{"10.0.0.3"}), - output: []schedulingtypes.Pod{}, + output: []backendmetrics.PodMetrics{}, }, } - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) - ds := datastore.NewDatastore(t.Context(), pmf) - for _, testPod := range testInput { - ds.PodUpdateOrAddIfNotExist(testPod) - } - + ds := &mockDatastore{pods: testInput} for _, test := range tests { t.Run(test.name, func(t *testing.T) { director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, NewConfig()) got := director.getCandidatePodsForScheduling(context.Background(), test.metadata) - diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b schedulingtypes.Pod) bool { + diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool { return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String() })) if diff != "" { @@ -578,8 +549,8 @@ func TestRandomWeightedDraw(t *testing.T) { model: &v1alpha2.InferenceObjective{ Spec: v1alpha2.InferenceObjectiveSpec{ TargetModels: []v1alpha2.TargetModel{ - {Name: "canary", Weight: pointer(50)}, - {Name: "v1", Weight: pointer(50)}, + {Name: "canary", Weight: ptr.To(int32(50))}, + {Name: "v1", Weight: ptr.To(int32(50))}, }, }, }, @@ -590,9 +561,9 @@ func TestRandomWeightedDraw(t *testing.T) { model: &v1alpha2.InferenceObjective{ Spec: v1alpha2.InferenceObjectiveSpec{ TargetModels: []v1alpha2.TargetModel{ - {Name: "canary", Weight: pointer(25)}, - {Name: "v1.1", Weight: pointer(55)}, - {Name: "v1", Weight: pointer(50)}, + {Name: "canary", Weight: ptr.To(int32(25))}, + {Name: "v1.1", Weight: ptr.To(int32(55))}, + {Name: "v1", Weight: ptr.To(int32(50))}, }, }, }, @@ -603,9 +574,9 @@ func TestRandomWeightedDraw(t *testing.T) { model: &v1alpha2.InferenceObjective{ Spec: v1alpha2.InferenceObjectiveSpec{ TargetModels: []v1alpha2.TargetModel{ - {Name: "canary", Weight: pointer(20)}, - {Name: "v1.1", Weight: pointer(20)}, - {Name: "v1", Weight: pointer(10)}, + {Name: "canary", Weight: ptr.To(int32(20))}, + {Name: "v1.1", Weight: ptr.To(int32(20))}, + {Name: "v1", Weight: ptr.To(int32(10))}, }, }, }, @@ -683,10 +654,6 @@ func TestGetRandomPod(t *testing.T) { } } -func pointer(v int32) *int32 { - return &v -} - func TestDirector_HandleResponse(t *testing.T) { pr1 := newTestPostResponse("pr1") diff --git a/pkg/epp/saturationdetector/saturationdetector.go b/pkg/epp/saturationdetector/saturationdetector.go index 57be1af90..22b835868 100644 --- a/pkg/epp/saturationdetector/saturationdetector.go +++ b/pkg/epp/saturationdetector/saturationdetector.go @@ -17,11 +17,10 @@ limitations under the License. // Package saturationdetector implements a mechanism to determine if the // backend model servers are considered saturated based on observed metrics. // -// The current implementation provides a global saturation signal (IsSaturated) +// The current implementation provides a saturation signal (IsSaturated) // primarily based on backend queue depths and KV cache utilization, reflecting // the saturation signals previously used by the Scheduler before the -// introduction of the FlowController. It fetches live metrics from the -// provided Datastore. +// introduction of the FlowController. // // TODO: Explore more advanced saturation signals in the future, such as: // - Latency-objective-based saturation. @@ -61,39 +60,21 @@ type Config struct { MetricsStalenessThreshold time.Duration } -// Datastore provides an interface to access backend pod metrics. -type Datastore interface { - PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics -} - -// Detector determines system saturation based on metrics from the Datastore. -// -// The Detector currently holds a direct dependency on a Datastore interface. -// This design choice was made to encapsulate the logic of fetching and -// interpreting metrics for saturation, thereby simplifying the dependencies -// for primary consumers like the FlowController--to be added soon--(which -// would otherwise need to manage Datastore interactions itself). -// This architectural decision may be revisited in the future if a more -// decoupled approach (e.g., passing metrics directly to IsSaturated) proves -// more beneficial. +// Detector determines system saturation based on metrics of the given candidate pods. type Detector struct { - datastore Datastore - config *Config + config *Config } // NewDetector creates a new SaturationDetector. -// The datastore is expected to provide access to live/recently-updated pod -// metrics. // The config provides the thresholds for determining saturation. -func NewDetector(config *Config, datastore Datastore, logger logr.Logger) *Detector { +func NewDetector(config *Config, logger logr.Logger) *Detector { logger.WithName(loggerName).V(logutil.DEFAULT).Info("Creating new SaturationDetector", "queueDepthThreshold", config.QueueDepthThreshold, "kvCacheUtilThreshold", config.KVCacheUtilThreshold, "metricsStalenessThreshold", config.MetricsStalenessThreshold.String()) return &Detector{ - datastore: datastore, - config: config, + config: config, } } @@ -104,20 +85,11 @@ func NewDetector(config *Config, datastore Datastore, logger logr.Logger) *Detec // 2. WaitingQueueSize <= QueueDepthThreshold. // 3. KVCacheUsagePercent <= KVCacheUtilThreshold. // -// If no pods are found in the datastore, the system is considered saturated -// (no capacity). -func (d *Detector) IsSaturated(ctx context.Context) bool { - logger := log.FromContext(ctx).WithName(loggerName) - // TODO: filter out stale metrics here if needed. - allPodsMetrics := d.datastore.PodList(backendmetrics.AllPodPredicate) - if len(allPodsMetrics) == 0 { - logger.V(logutil.VERBOSE).Info("No pods found in datastore; system is considered SATURATED (no capacity).") - // If there are no pods, there is no capacity to serve requests. - // Treat this as a saturated state to enable FlowController queuing. - return true - } - - for _, podMetric := range allPodsMetrics { +// This function is called only with the relevant pods for the current request, and is assumed +// to be called with at least one pod. zero pods condition is checked before calling IsSaturated. +func (d *Detector) IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool { + logger := log.FromContext(ctx) + for _, podMetric := range candidatePods { metrics := podMetric.GetMetrics() podNn := "unknown-pod" if podMetric.GetPod() != nil { diff --git a/pkg/epp/saturationdetector/saturationdetector_test.go b/pkg/epp/saturationdetector/saturationdetector_test.go index c0ed3d27d..57756d8bc 100644 --- a/pkg/epp/saturationdetector/saturationdetector_test.go +++ b/pkg/epp/saturationdetector/saturationdetector_test.go @@ -31,21 +31,6 @@ import ( backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" ) -// --- Mock Implementations --- - -type mockDatastore struct { - pods []*backendmetrics.FakePodMetrics -} - -// PodList lists pods matching the given predicate. -func (fds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics { - pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods)) - for _, pod := range fds.pods { - pm = append(pm, pod) - } - return pm -} - func newMockPodMetrics(name string, metrics *backendmetrics.MetricsState) *backendmetrics.FakePodMetrics { return &backendmetrics.FakePodMetrics{ Pod: &backend.Pod{ @@ -61,7 +46,6 @@ func TestNewDetector(t *testing.T) { tests := []struct { name string config *Config - datastore Datastore expectedQueueDepthThreshold int expectedKVCacheUtilThreshold float64 expectedStalenessThreshold time.Duration @@ -73,7 +57,6 @@ func TestNewDetector(t *testing.T) { KVCacheUtilThreshold: 0.8, MetricsStalenessThreshold: 100 * time.Millisecond, }, - datastore: &mockDatastore{}, expectedQueueDepthThreshold: 10, expectedKVCacheUtilThreshold: 0.8, expectedStalenessThreshold: 100 * time.Millisecond, @@ -85,7 +68,6 @@ func TestNewDetector(t *testing.T) { KVCacheUtilThreshold: -5, MetricsStalenessThreshold: 0, }, - datastore: &mockDatastore{}, expectedQueueDepthThreshold: DefaultQueueDepthThreshold, expectedKVCacheUtilThreshold: DefaultKVCacheUtilThreshold, expectedStalenessThreshold: DefaultMetricsStalenessThreshold, @@ -97,7 +79,6 @@ func TestNewDetector(t *testing.T) { KVCacheUtilThreshold: 1.5, MetricsStalenessThreshold: 100 * time.Millisecond, }, - datastore: &mockDatastore{}, expectedQueueDepthThreshold: 10, expectedKVCacheUtilThreshold: DefaultKVCacheUtilThreshold, expectedStalenessThreshold: 100 * time.Millisecond, @@ -111,7 +92,7 @@ func TestNewDetector(t *testing.T) { os.Setenv(EnvSdKVCacheUtilThreshold, fmt.Sprintf("%v", test.config.KVCacheUtilThreshold)) os.Setenv(EnvSdMetricsStalenessThreshold, test.config.MetricsStalenessThreshold.String()) - detector := NewDetector(LoadConfigFromEnv(), test.datastore, logr.Discard()) + detector := NewDetector(LoadConfigFromEnv(), logr.Discard()) if detector == nil { t.Fatalf("NewDetector() returned nil detector for valid config") } @@ -137,77 +118,77 @@ func TestDetector_IsSaturated(t *testing.T) { } tests := []struct { - name string - config *Config - pods []*backendmetrics.FakePodMetrics - expectedSaturat bool + name string + config *Config + pods []backendmetrics.PodMetrics + expectedSaturation bool }{ { - name: "No pods in datastore", - config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{}, - expectedSaturat: true, // No capacity = saturated + name: "No candidate pods", + config: defaultConfig, + pods: []backendmetrics.PodMetrics{}, + expectedSaturation: true, // No capacity = saturated }, { name: "Single pod with good capacity", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime, WaitingQueueSize: 2, KVCacheUsagePercent: 0.5, }), }, - expectedSaturat: false, + expectedSaturation: false, }, { name: "Single pod with stale metrics", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime.Add(-200 * time.Millisecond), // Stale WaitingQueueSize: 1, KVCacheUsagePercent: 0.1, }), }, - expectedSaturat: true, + expectedSaturation: true, }, { name: "Single pod with high queue depth", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime, WaitingQueueSize: 10, // Exceeds threshold 5 KVCacheUsagePercent: 0.1, }), }, - expectedSaturat: true, + expectedSaturation: true, }, { name: "Single pod with high KV cache utilization", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime, WaitingQueueSize: 1, KVCacheUsagePercent: 0.95, // Exceeds threshold 0.90 }), }, - expectedSaturat: true, + expectedSaturation: true, }, { name: "Single pod with nil metrics", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", nil), }, - expectedSaturat: true, + expectedSaturation: true, }, { name: "Multiple pods, all good capacity", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime, WaitingQueueSize: 1, @@ -219,12 +200,12 @@ func TestDetector_IsSaturated(t *testing.T) { KVCacheUsagePercent: 0.2, }), }, - expectedSaturat: false, + expectedSaturation: false, }, { name: "Multiple pods, one good, one bad (stale)", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime, // Good WaitingQueueSize: 1, @@ -236,12 +217,12 @@ func TestDetector_IsSaturated(t *testing.T) { KVCacheUsagePercent: 0.2, }), }, - expectedSaturat: false, // One good pod is enough + expectedSaturation: false, // One good pod is enough }, { name: "Multiple pods, one good, one bad (high queue)", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime, WaitingQueueSize: 1, @@ -253,12 +234,12 @@ func TestDetector_IsSaturated(t *testing.T) { KVCacheUsagePercent: 0.2, }), }, - expectedSaturat: false, + expectedSaturation: false, }, { name: "Multiple pods, all bad capacity", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime.Add(-200 * time.Millisecond), // Stale WaitingQueueSize: 1, @@ -275,52 +256,52 @@ func TestDetector_IsSaturated(t *testing.T) { KVCacheUsagePercent: 0.99, // High KV }), }, - expectedSaturat: true, + expectedSaturation: true, }, { name: "Queue depth exactly at threshold", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime, WaitingQueueSize: defaultConfig.QueueDepthThreshold, // Exactly at threshold (good) KVCacheUsagePercent: 0.1, }), }, - expectedSaturat: false, + expectedSaturation: false, }, { name: "KV cache exactly at threshold", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime, WaitingQueueSize: 1, KVCacheUsagePercent: defaultConfig.KVCacheUtilThreshold, // Exactly at threshold (good) }), }, - expectedSaturat: false, + expectedSaturation: false, }, { name: "Metrics age just over staleness threshold", config: defaultConfig, - pods: []*backendmetrics.FakePodMetrics{ + pods: []backendmetrics.PodMetrics{ newMockPodMetrics("pod1", &backendmetrics.MetricsState{ UpdateTime: baseTime.Add(-defaultConfig.MetricsStalenessThreshold - time.Nanosecond), // Just over (stale) WaitingQueueSize: 1, KVCacheUsagePercent: 0.1, }), }, - expectedSaturat: true, + expectedSaturation: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - detector := NewDetector(test.config, &mockDatastore{pods: test.pods}, logr.Discard()) + detector := NewDetector(test.config, logr.Discard()) - if got := detector.IsSaturated(context.Background()); got != test.expectedSaturat { - t.Errorf("IsSaturated() = %v, want %v", got, test.expectedSaturat) + if got := detector.IsSaturated(context.Background(), test.pods); got != test.expectedSaturation { + t.Errorf("IsSaturated() = %v, want %v", got, test.expectedSaturation) } }) } diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index c82a702b5..c3c968ee7 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -1126,7 +1126,7 @@ func BeforeSuite() func() { KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, MetricsStalenessThreshold: saturationdetector.DefaultMetricsStalenessThreshold, } - detector := saturationdetector.NewDetector(sdConfig, serverRunner.Datastore, logger.WithName("saturation-detector")) + detector := saturationdetector.NewDetector(sdConfig, logger.WithName("saturation-detector")) serverRunner.SaturationDetector = detector serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, detector, requestcontrol.NewConfig()) serverRunner.SecureServing = false