Skip to content

cleanup: fix typo and delete useless parameter #1310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (r *Runner) Run(ctx context.Context) error {
ModelServerMetricsScheme: *modelServerMetricsScheme,
Client: metricsHttpClient,
},
*refreshMetricsInterval, *metricsStalenessThreshold)
*refreshMetricsInterval)

datastore := datastore.NewDatastore(ctx, pmf)

Expand Down
8 changes: 2 additions & 6 deletions pkg/epp/backend/metrics/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
logger.V(logutil.DEFAULT).Info("Shutting down metrics logger thread")
return
case <-ticker.C:
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
})
podsWithFreshMetrics := datastore.PodList(PodsWithFreshMetrics(metricsStalenessThreshold))
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
return time.Since(pm.GetMetrics().UpdateTime) > metricsStalenessThreshold
})
Expand All @@ -93,9 +91,7 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsSt
var kvCacheTotal float64
var queueTotal int

podMetrics := datastore.PodList(func(pm PodMetrics) bool {
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
})
podMetrics := datastore.PodList(PodsWithFreshMetrics(metricsStalenessThreshold))
logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics))
if len(podMetrics) == 0 {
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/backend/metrics/pod_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
func TestMetricsRefresh(t *testing.T) {
ctx := context.Background()
pmc := &FakePodMetricsClient{}
pmf := NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2)
pmf := NewPodMetricsFactory(pmc, time.Millisecond)

// The refresher is initialized with empty metrics.
pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{})
Expand Down
23 changes: 15 additions & 8 deletions pkg/epp/backend/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,28 @@ import (
)

var (
AllPodPredicate = func(PodMetrics) bool { return true }
AllPodsPredicate = func(PodMetrics) bool { return true }
)

func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsStalenessThreshold time.Duration) *PodMetricsFactory {
func PodsWithFreshMetrics(stalenessThreshold time.Duration) func(PodMetrics) bool {
return func(pm PodMetrics) bool {
if pm == nil {
return false // Skip nil pods
}
return time.Since(pm.GetMetrics().UpdateTime) <= stalenessThreshold
}
}

func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval time.Duration) *PodMetricsFactory {
return &PodMetricsFactory{
pmc: pmc,
refreshMetricsInterval: refreshMetricsInterval,
metricsStalenessThreshold: metricsStalenessThreshold,
pmc: pmc,
refreshMetricsInterval: refreshMetricsInterval,
}
}

type PodMetricsFactory struct {
pmc PodMetricsClient
refreshMetricsInterval time.Duration
metricsStalenessThreshold time.Duration
pmc PodMetricsClient
refreshMetricsInterval time.Duration
}

func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/controller/inferenceobjective_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) {
WithObjects(initObjs...).
WithIndex(&v1alpha2.InferenceObjective{}, datastore.ModelNameIndexKey, indexInferenceObjectivesByModelName).
Build()
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
ds := datastore.NewDatastore(t.Context(), pmf)
for _, m := range test.objectivessInStore {
ds.ObjectiveSetIfOlder(m)
Expand Down
8 changes: 4 additions & 4 deletions pkg/epp/controller/inferencepool_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestInferencePoolReconciler(t *testing.T) {
req := ctrl.Request{NamespacedName: namespacedName}
ctx := context.Background()

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
datastore := datastore.NewDatastore(ctx, pmf)
inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn}

Expand Down Expand Up @@ -186,7 +186,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
params.wantPods = []string{}
}
gotPods := []string{}
for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) {
for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) {
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
}
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestXInferencePoolReconciler(t *testing.T) {
req := ctrl.Request{NamespacedName: namespacedName}
ctx := context.Background()

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
datastore := datastore.NewDatastore(ctx, pmf)
inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn}

Expand Down Expand Up @@ -336,7 +336,7 @@ func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStorePa
params.wantPods = []string{}
}
gotPods := []string{}
for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) {
for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) {
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
}
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/controller/pod_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
basePod3 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{PodIP: "address-3"}}
basePod11 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{PodIP: "address-11"}}
pmc = &backendmetrics.FakePodMetricsClient{}
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second, time.Second*2)
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second)
)

func TestPodReconciler(t *testing.T) {
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestPodReconciler(t *testing.T) {
}

var gotPods []*corev1.Pod
for _, pm := range store.PodList(backendmetrics.AllPodPredicate) {
for _, pm := range store.PodList(backendmetrics.AllPodsPredicate) {
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
gotPods = append(gotPods, pod)
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/epp/datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestPool(t *testing.T) {
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
datastore := NewDatastore(context.Background(), pmf)
_ = datastore.PoolSet(context.Background(), fakeClient, tt.inferencePool)
gotPool, gotErr := datastore.PoolGet()
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestModel(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
ds := NewDatastore(t.Context(), pmf)
for _, m := range test.existingModels {
ds.ObjectiveSetIfOlder(m)
Expand Down Expand Up @@ -281,6 +281,7 @@ func TestMetrics(t *testing.T) {
pmc backendmetrics.PodMetricsClient
storePods []*corev1.Pod
want []*backendmetrics.MetricsState
predict func(backendmetrics.PodMetrics) bool
}{
{
name: "Probing metrics success",
Expand Down Expand Up @@ -338,15 +339,18 @@ func TestMetrics(t *testing.T) {
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
ds := NewDatastore(ctx, pmf)
_ = ds.PoolSet(ctx, fakeClient, inferencePool)
for _, pod := range test.storePods {
ds.PodUpdateOrAddIfNotExist(pod)
}
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
if test.predict == nil {
test.predict = backendmetrics.AllPodsPredicate
}
assert.EventuallyWithT(t, func(t *assert.CollectT) {
got := ds.PodList(backendmetrics.AllPodPredicate)
got := ds.PodList(test.predict)
metrics := []*backendmetrics.MetricsState{}
for _, one := range got {
metrics = append(metrics, one.GetMetrics())
Expand Down Expand Up @@ -432,15 +436,15 @@ func TestPods(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
ds := NewDatastore(t.Context(), pmf)
for _, pod := range test.existingPods {
ds.PodUpdateOrAddIfNotExist(pod)
}

test.op(ctx, ds)
var gotPods []*corev1.Pod
for _, pm := range ds.PodList(backendmetrics.AllPodPredicate) {
for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) {
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
gotPods = append(gotPods, pod)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/metrics/collectors/inference_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
return
}

podMetrics := c.ds.PodList(backendmetrics.AllPodPredicate)
podMetrics := c.ds.PodList(backendmetrics.AllPodsPredicate)
if len(podMetrics) == 0 {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/metrics/collectors/inference_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
)

func TestNoMetricsCollected(t *testing.T) {
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
datastore := datastore.NewDatastore(context.Background(), pmf)

collector := &inferencePoolMetricsCollector{
Expand All @@ -67,7 +67,7 @@ func TestMetricsCollected(t *testing.T) {
pod1NamespacedName: pod1Metrics,
},
}
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond)
ds := datastore.NewDatastore(context.Background(), pmf)

scheme := runtime.NewScheme()
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet

subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any)
if !found {
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodsPredicate))
}

// Check if endpoint key is present in the subset map and ensure there is at least one value
endpointSubsetList, found := subsetMap[subsetHintKey].([]any)
if !found {
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodsPredicate))
} else if len(endpointSubsetList) == 0 {
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
return []schedulingtypes.Pod{}
Expand Down Expand Up @@ -290,7 +290,7 @@ func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestC
}

func (d *Director) GetRandomPod() *backend.Pod {
pods := d.datastore.PodList(backendmetrics.AllPodPredicate)
pods := d.datastore.PodList(backendmetrics.AllPodsPredicate)
if len(pods) == 0 {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/requestcontrol/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestDirector_HandleRequest(t *testing.T) {
ObjRef()

// Datastore setup
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
ds := datastore.NewDatastore(t.Context(), pmf)
ds.ObjectiveSetIfOlder(imFoodReview)
ds.ObjectiveSetIfOlder(imFoodReviewResolve)
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
},
}

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
ds := datastore.NewDatastore(t.Context(), pmf)
for _, testPod := range testInput {
ds.PodUpdateOrAddIfNotExist(testPod)
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestGetRandomPod(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond)
ds := datastore.NewDatastore(t.Context(), pmf)
for _, pod := range test.storePods {
ds.PodUpdateOrAddIfNotExist(pod)
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/saturationdetector/saturationdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewDetector(config *Config, datastore Datastore, logger logr.Logger) *Detec
func (d *Detector) IsSaturated(ctx context.Context) bool {
logger := log.FromContext(ctx).WithName(loggerName)
// TODO: filter out stale metrics here if needed.
allPodsMetrics := d.datastore.PodList(backendmetrics.AllPodPredicate)
allPodsMetrics := d.datastore.PodList(backendmetrics.AllPodsPredicate)
if len(allPodsMetrics) == 0 {
logger.V(logutil.VERBOSE).Info("No pods found in datastore; system is considered SATURATED (no capacity).")
// If there are no pods, there is no capacity to serve requests.
Expand Down
4 changes: 2 additions & 2 deletions test/integration/epp/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func setUpHermeticServer(t *testing.T, podAndMetrics map[*backend.Pod]*backendme

// check if all pods are synced to datastore
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Len(t, serverRunner.Datastore.PodList(backendmetrics.AllPodPredicate), len(podAndMetrics), "Datastore not synced")
assert.Len(t, serverRunner.Datastore.PodList(backendmetrics.AllPodsPredicate), len(podAndMetrics), "Datastore not synced")
}, 10*time.Second, time.Second)

// Create a grpc connection
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func BeforeSuite() func() {

serverRunner = server.NewDefaultExtProcServerRunner()
serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{}
pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond, time.Second*2)
pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond)
// Adjust from defaults
serverRunner.PoolGKNN = common.GKNN{
NamespacedName: types.NamespacedName{Namespace: testNamespace, Name: testPoolName},
Expand Down
2 changes: 1 addition & 1 deletion test/utils/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func PrepareForTestStreamingServer(objectives []*v1alpha2.InferenceObjective, po
ctx, cancel := context.WithCancel(context.Background())

pmc := &metrics.FakePodMetricsClient{}
pmf := metrics.NewPodMetricsFactory(pmc, time.Second, time.Second*2)
pmf := metrics.NewPodMetricsFactory(pmc, time.Second)
ds := datastore.NewDatastore(ctx, pmf)

initObjs := []client.Object{}
Expand Down