diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 22140ac74..9e6cba69e 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -284,7 +284,7 @@ func (r *Runner) Run(ctx context.Context) error { ModelServerMetricsScheme: *modelServerMetricsScheme, Client: metricsHttpClient, }, - *refreshMetricsInterval, *metricsStalenessThreshold) + *refreshMetricsInterval) datastore := datastore.NewDatastore(ctx, pmf) diff --git a/pkg/epp/backend/metrics/logger.go b/pkg/epp/backend/metrics/logger.go index bc3454673..73debae4d 100644 --- a/pkg/epp/backend/metrics/logger.go +++ b/pkg/epp/backend/metrics/logger.go @@ -68,9 +68,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh logger.V(logutil.DEFAULT).Info("Shutting down metrics logger thread") return case <-ticker.C: - podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool { - return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold - }) + podsWithFreshMetrics := datastore.PodList(PodsWithFreshMetrics(metricsStalenessThreshold)) podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool { return time.Since(pm.GetMetrics().UpdateTime) > metricsStalenessThreshold }) @@ -93,9 +91,7 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsSt var kvCacheTotal float64 var queueTotal int - podMetrics := datastore.PodList(func(pm PodMetrics) bool { - return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold - }) + podMetrics := datastore.PodList(PodsWithFreshMetrics(metricsStalenessThreshold)) logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics)) if len(podMetrics) == 0 { return diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index 1c4a3398e..952b0ee16 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -62,7 +62,7 @@ var ( func TestMetricsRefresh(t *testing.T) { ctx := context.Background() pmc := &FakePodMetricsClient{} - pmf := NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2) + pmf := NewPodMetricsFactory(pmc, time.Millisecond) // The refresher is initialized with empty metrics. pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{}) diff --git a/pkg/epp/backend/metrics/types.go b/pkg/epp/backend/metrics/types.go index 975910ff3..ad2cedeb3 100644 --- a/pkg/epp/backend/metrics/types.go +++ b/pkg/epp/backend/metrics/types.go @@ -29,21 +29,28 @@ import ( ) var ( - AllPodPredicate = func(PodMetrics) bool { return true } + AllPodsPredicate = func(PodMetrics) bool { return true } ) -func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsStalenessThreshold time.Duration) *PodMetricsFactory { +func PodsWithFreshMetrics(stalenessThreshold time.Duration) func(PodMetrics) bool { + return func(pm PodMetrics) bool { + if pm == nil { + return false // Skip nil pods + } + return time.Since(pm.GetMetrics().UpdateTime) <= stalenessThreshold + } +} + +func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval time.Duration) *PodMetricsFactory { return &PodMetricsFactory{ - pmc: pmc, - refreshMetricsInterval: refreshMetricsInterval, - metricsStalenessThreshold: metricsStalenessThreshold, + pmc: pmc, + refreshMetricsInterval: refreshMetricsInterval, } } type PodMetricsFactory struct { - pmc PodMetricsClient - refreshMetricsInterval time.Duration - metricsStalenessThreshold time.Duration + pmc PodMetricsClient + refreshMetricsInterval time.Duration } func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics { diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 39ce5b462..593615894 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -198,7 +198,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) { WithObjects(initObjs...). WithIndex(&v1alpha2.InferenceObjective{}, datastore.ModelNameIndexKey, indexInferenceObjectivesByModelName). Build() - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) ds := datastore.NewDatastore(t.Context(), pmf) for _, m := range test.objectivessInStore { ds.ObjectiveSetIfOlder(m) diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index a29d531b9..85462fccc 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -111,7 +111,7 @@ func TestInferencePoolReconciler(t *testing.T) { req := ctrl.Request{NamespacedName: namespacedName} ctx := context.Background() - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) datastore := datastore.NewDatastore(ctx, pmf) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn} @@ -186,7 +186,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string { params.wantPods = []string{} } gotPods := []string{} - for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) { + for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) { gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) } if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { @@ -250,7 +250,7 @@ func TestXInferencePoolReconciler(t *testing.T) { req := ctrl.Request{NamespacedName: namespacedName} ctx := context.Background() - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) datastore := datastore.NewDatastore(ctx, pmf) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn} @@ -336,7 +336,7 @@ func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStorePa params.wantPods = []string{} } gotPods := []string{} - for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) { + for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) { gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) } if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 0f6202177..acca167f4 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -44,7 +44,7 @@ var ( basePod3 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{PodIP: "address-3"}} basePod11 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{PodIP: "address-11"}} pmc = &backendmetrics.FakePodMetricsClient{} - pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second, time.Second*2) + pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second) ) func TestPodReconciler(t *testing.T) { @@ -198,7 +198,7 @@ func TestPodReconciler(t *testing.T) { } var gotPods []*corev1.Pod - for _, pm := range store.PodList(backendmetrics.AllPodPredicate) { + for _, pm := range store.PodList(backendmetrics.AllPodsPredicate) { pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}} gotPods = append(gotPods, pod) } diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 72ea3f384..a73a10c5e 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -82,7 +82,7 @@ func TestPool(t *testing.T) { fakeClient := fake.NewClientBuilder(). WithScheme(scheme). Build() - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) datastore := NewDatastore(context.Background(), pmf) _ = datastore.PoolSet(context.Background(), fakeClient, tt.inferencePool) gotPool, gotErr := datastore.PoolGet() @@ -214,7 +214,7 @@ func TestModel(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) ds := NewDatastore(t.Context(), pmf) for _, m := range test.existingModels { ds.ObjectiveSetIfOlder(m) @@ -281,6 +281,7 @@ func TestMetrics(t *testing.T) { pmc backendmetrics.PodMetricsClient storePods []*corev1.Pod want []*backendmetrics.MetricsState + predict func(backendmetrics.PodMetrics) bool }{ { name: "Probing metrics success", @@ -338,15 +339,18 @@ func TestMetrics(t *testing.T) { fakeClient := fake.NewClientBuilder(). WithScheme(scheme). Build() - pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond) ds := NewDatastore(ctx, pmf) _ = ds.PoolSet(ctx, fakeClient, inferencePool) for _, pod := range test.storePods { ds.PodUpdateOrAddIfNotExist(pod) } time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched. + if test.predict == nil { + test.predict = backendmetrics.AllPodsPredicate + } assert.EventuallyWithT(t, func(t *assert.CollectT) { - got := ds.PodList(backendmetrics.AllPodPredicate) + got := ds.PodList(test.predict) metrics := []*backendmetrics.MetricsState{} for _, one := range got { metrics = append(metrics, one.GetMetrics()) @@ -432,7 +436,7 @@ func TestPods(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { ctx := context.Background() - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) ds := NewDatastore(t.Context(), pmf) for _, pod := range test.existingPods { ds.PodUpdateOrAddIfNotExist(pod) @@ -440,7 +444,7 @@ func TestPods(t *testing.T) { test.op(ctx, ds) var gotPods []*corev1.Pod - for _, pm := range ds.PodList(backendmetrics.AllPodPredicate) { + for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) { pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}} gotPods = append(gotPods, pod) } diff --git a/pkg/epp/metrics/collectors/inference_pool.go b/pkg/epp/metrics/collectors/inference_pool.go index 41a3d6d61..ec3def164 100644 --- a/pkg/epp/metrics/collectors/inference_pool.go +++ b/pkg/epp/metrics/collectors/inference_pool.go @@ -63,7 +63,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) { return } - podMetrics := c.ds.PodList(backendmetrics.AllPodPredicate) + podMetrics := c.ds.PodList(backendmetrics.AllPodsPredicate) if len(podMetrics) == 0 { return } diff --git a/pkg/epp/metrics/collectors/inference_pool_test.go b/pkg/epp/metrics/collectors/inference_pool_test.go index 533d7f384..e855c55c1 100644 --- a/pkg/epp/metrics/collectors/inference_pool_test.go +++ b/pkg/epp/metrics/collectors/inference_pool_test.go @@ -49,7 +49,7 @@ var ( ) func TestNoMetricsCollected(t *testing.T) { - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) datastore := datastore.NewDatastore(context.Background(), pmf) collector := &inferencePoolMetricsCollector{ @@ -67,7 +67,7 @@ func TestMetricsCollected(t *testing.T) { pod1NamespacedName: pod1Metrics, }, } - pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond) ds := datastore.NewDatastore(context.Background(), pmf) scheme := runtime.NewScheme() diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index d2073eb65..40dded0dd 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -198,13 +198,13 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any) if !found { - return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate)) + return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodsPredicate)) } // Check if endpoint key is present in the subset map and ensure there is at least one value endpointSubsetList, found := subsetMap[subsetHintKey].([]any) if !found { - return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate)) + return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodsPredicate)) } else if len(endpointSubsetList) == 0 { loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") return []schedulingtypes.Pod{} @@ -290,7 +290,7 @@ func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestC } func (d *Director) GetRandomPod() *backend.Pod { - pods := d.datastore.PodList(backendmetrics.AllPodPredicate) + pods := d.datastore.PodList(backendmetrics.AllPodsPredicate) if len(pods) == 0 { return nil } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index b0e5aa173..cbb8c1aca 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -93,7 +93,7 @@ func TestDirector_HandleRequest(t *testing.T) { ObjRef() // Datastore setup - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) ds := datastore.NewDatastore(t.Context(), pmf) ds.ObjectiveSetIfOlder(imFoodReview) ds.ObjectiveSetIfOlder(imFoodReviewResolve) @@ -531,7 +531,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) { }, } - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) ds := datastore.NewDatastore(t.Context(), pmf) for _, testPod := range testInput { ds.PodUpdateOrAddIfNotExist(testPod) @@ -654,7 +654,7 @@ func TestGetRandomPod(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond) ds := datastore.NewDatastore(t.Context(), pmf) for _, pod := range test.storePods { ds.PodUpdateOrAddIfNotExist(pod) diff --git a/pkg/epp/saturationdetector/saturationdetector.go b/pkg/epp/saturationdetector/saturationdetector.go index 57be1af90..bc47ecd53 100644 --- a/pkg/epp/saturationdetector/saturationdetector.go +++ b/pkg/epp/saturationdetector/saturationdetector.go @@ -109,7 +109,7 @@ func NewDetector(config *Config, datastore Datastore, logger logr.Logger) *Detec func (d *Detector) IsSaturated(ctx context.Context) bool { logger := log.FromContext(ctx).WithName(loggerName) // TODO: filter out stale metrics here if needed. - allPodsMetrics := d.datastore.PodList(backendmetrics.AllPodPredicate) + allPodsMetrics := d.datastore.PodList(backendmetrics.AllPodsPredicate) if len(allPodsMetrics) == 0 { logger.V(logutil.VERBOSE).Info("No pods found in datastore; system is considered SATURATED (no capacity).") // If there are no pods, there is no capacity to serve requests. diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 859a14716..24b2f92bd 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -977,7 +977,7 @@ func setUpHermeticServer(t *testing.T, podAndMetrics map[*backend.Pod]*backendme // check if all pods are synced to datastore assert.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Len(t, serverRunner.Datastore.PodList(backendmetrics.AllPodPredicate), len(podAndMetrics), "Datastore not synced") + assert.Len(t, serverRunner.Datastore.PodList(backendmetrics.AllPodsPredicate), len(podAndMetrics), "Datastore not synced") }, 10*time.Second, time.Second) // Create a grpc connection @@ -1086,7 +1086,7 @@ func BeforeSuite() func() { serverRunner = server.NewDefaultExtProcServerRunner() serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} - pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond, time.Second*2) + pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) // Adjust from defaults serverRunner.PoolGKNN = common.GKNN{ NamespacedName: types.NamespacedName{Namespace: testNamespace, Name: testPoolName}, diff --git a/test/utils/server.go b/test/utils/server.go index 54f6a0443..f711b54ff 100644 --- a/test/utils/server.go +++ b/test/utils/server.go @@ -49,7 +49,7 @@ func PrepareForTestStreamingServer(objectives []*v1alpha2.InferenceObjective, po ctx, cancel := context.WithCancel(context.Background()) pmc := &metrics.FakePodMetricsClient{} - pmf := metrics.NewPodMetricsFactory(pmc, time.Second, time.Second*2) + pmf := metrics.NewPodMetricsFactory(pmc, time.Second) ds := datastore.NewDatastore(ctx, pmf) initObjs := []client.Object{}