Skip to content

Commit 6348fcb

Browse files
committed
feat: Filter out stale metrics
1 parent 654199f commit 6348fcb

File tree

16 files changed

+74
-58
lines changed

16 files changed

+74
-58
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ func (r *Runner) Run(ctx context.Context) error {
284284
ModelServerMetricsScheme: *modelServerMetricsScheme,
285285
Client: metricsHttpClient,
286286
},
287-
*refreshMetricsInterval, *metricsStalenessThreshold)
287+
*refreshMetricsInterval)
288288

289289
datastore := datastore.NewDatastore(ctx, pmf)
290290

pkg/epp/backend/metrics/logger.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
6868
logger.V(logutil.DEFAULT).Info("Shutting down metrics logger thread")
6969
return
7070
case <-ticker.C:
71-
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
72-
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
73-
})
71+
podsWithFreshMetrics := datastore.PodList(PodsWithFreshMetrics(metricsStalenessThreshold))
7472
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
7573
return time.Since(pm.GetMetrics().UpdateTime) > metricsStalenessThreshold
7674
})
@@ -93,9 +91,7 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsSt
9391
var kvCacheTotal float64
9492
var queueTotal int
9593

96-
podMetrics := datastore.PodList(func(pm PodMetrics) bool {
97-
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
98-
})
94+
podMetrics := datastore.PodList(PodsWithFreshMetrics(metricsStalenessThreshold))
9995
logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics))
10096
if len(podMetrics) == 0 {
10197
return

pkg/epp/backend/metrics/pod_metrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ var (
6262
func TestMetricsRefresh(t *testing.T) {
6363
ctx := context.Background()
6464
pmc := &FakePodMetricsClient{}
65-
pmf := NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2)
65+
pmf := NewPodMetricsFactory(pmc, time.Millisecond)
6666

6767
// The refresher is initialized with empty metrics.
6868
pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{})

pkg/epp/backend/metrics/types.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,28 @@ import (
2929
)
3030

3131
var (
32-
AllPodPredicate = func(PodMetrics) bool { return true }
32+
AllPodsPredicate = func(PodMetrics) bool { return true }
3333
)
3434

35-
func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsStalenessThreshold time.Duration) *PodMetricsFactory {
35+
func PodsWithFreshMetrics(stalenessThreshold time.Duration) func(PodMetrics) bool {
36+
return func(pm PodMetrics) bool {
37+
if pm == nil {
38+
return false // Skip nil pods
39+
}
40+
return time.Since(pm.GetMetrics().UpdateTime) <= stalenessThreshold
41+
}
42+
}
43+
44+
func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval time.Duration) *PodMetricsFactory {
3645
return &PodMetricsFactory{
37-
pmc: pmc,
38-
refreshMetricsInterval: refreshMetricsInterval,
39-
metricsStalenessThreshold: metricsStalenessThreshold,
46+
pmc: pmc,
47+
refreshMetricsInterval: refreshMetricsInterval,
4048
}
4149
}
4250

4351
type PodMetricsFactory struct {
44-
pmc PodMetricsClient
45-
refreshMetricsInterval time.Duration
46-
metricsStalenessThreshold time.Duration
52+
pmc PodMetricsClient
53+
refreshMetricsInterval time.Duration
4754
}
4855

4956
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {

pkg/epp/controller/inferenceobjective_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) {
198198
WithObjects(initObjs...).
199199
WithIndex(&v1alpha2.InferenceObjective{}, datastore.ModelNameIndexKey, indexInferenceObjectivesByModelName).
200200
Build()
201-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
201+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
202202
ds := datastore.NewDatastore(t.Context(), pmf)
203203
for _, m := range test.objectivessInStore {
204204
ds.ObjectiveSetIfOlder(m)

pkg/epp/controller/inferencepool_reconciler_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func TestInferencePoolReconciler(t *testing.T) {
111111
req := ctrl.Request{NamespacedName: namespacedName}
112112
ctx := context.Background()
113113

114-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
114+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
115115
datastore := datastore.NewDatastore(ctx, pmf)
116116
inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn}
117117

@@ -186,7 +186,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
186186
params.wantPods = []string{}
187187
}
188188
gotPods := []string{}
189-
for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) {
189+
for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) {
190190
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
191191
}
192192
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
@@ -250,7 +250,7 @@ func TestXInferencePoolReconciler(t *testing.T) {
250250
req := ctrl.Request{NamespacedName: namespacedName}
251251
ctx := context.Background()
252252

253-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
253+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
254254
datastore := datastore.NewDatastore(ctx, pmf)
255255
inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn}
256256

@@ -336,7 +336,7 @@ func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStorePa
336336
params.wantPods = []string{}
337337
}
338338
gotPods := []string{}
339-
for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) {
339+
for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) {
340340
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
341341
}
342342
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {

pkg/epp/controller/pod_reconciler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ var (
4444
basePod3 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{PodIP: "address-3"}}
4545
basePod11 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{PodIP: "address-11"}}
4646
pmc = &backendmetrics.FakePodMetricsClient{}
47-
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second, time.Second*2)
47+
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second)
4848
)
4949

5050
func TestPodReconciler(t *testing.T) {
@@ -198,7 +198,7 @@ func TestPodReconciler(t *testing.T) {
198198
}
199199

200200
var gotPods []*corev1.Pod
201-
for _, pm := range store.PodList(backendmetrics.AllPodPredicate) {
201+
for _, pm := range store.PodList(backendmetrics.AllPodsPredicate) {
202202
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
203203
gotPods = append(gotPods, pod)
204204
}

pkg/epp/datastore/datastore_test.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestPool(t *testing.T) {
8282
fakeClient := fake.NewClientBuilder().
8383
WithScheme(scheme).
8484
Build()
85-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
85+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
8686
datastore := NewDatastore(context.Background(), pmf)
8787
_ = datastore.PoolSet(context.Background(), fakeClient, tt.inferencePool)
8888
gotPool, gotErr := datastore.PoolGet()
@@ -214,7 +214,7 @@ func TestModel(t *testing.T) {
214214
}
215215
for _, test := range tests {
216216
t.Run(test.name, func(t *testing.T) {
217-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
217+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
218218
ds := NewDatastore(t.Context(), pmf)
219219
for _, m := range test.existingModels {
220220
ds.ObjectiveSetIfOlder(m)
@@ -265,6 +265,11 @@ var (
265265
},
266266
WaitingModels: map[string]int{},
267267
}
268+
pod3 = &corev1.Pod{
269+
ObjectMeta: metav1.ObjectMeta{
270+
Name: "pod3",
271+
},
272+
}
268273

269274
pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
270275
pod2NamespacedName = types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}
@@ -281,6 +286,7 @@ func TestMetrics(t *testing.T) {
281286
pmc backendmetrics.PodMetricsClient
282287
storePods []*corev1.Pod
283288
want []*backendmetrics.MetricsState
289+
predict func(backendmetrics.PodMetrics) bool
284290
}{
285291
{
286292
name: "Probing metrics success",
@@ -326,6 +332,18 @@ func TestMetrics(t *testing.T) {
326332
},
327333
},
328334
},
335+
{
336+
name: "Filter stale metrics",
337+
pmc: &backendmetrics.FakePodMetricsClient{
338+
Res: map[types.NamespacedName]*backendmetrics.MetricsState{
339+
pod1NamespacedName: pod1Metrics,
340+
pod2NamespacedName: pod2Metrics,
341+
},
342+
},
343+
storePods: []*corev1.Pod{pod1, pod2, pod3}, // We store pod3 but it has no metrics meaning it is stale.
344+
want: []*backendmetrics.MetricsState{pod1Metrics, pod2Metrics}, // pod3 metrics were stale and should not be included.
345+
predict: backendmetrics.PodsWithFreshMetrics(time.Second * 2), // Only include pods with metrics updated in the last 2 seconds.
346+
},
329347
}
330348

331349
for _, test := range tests {
@@ -338,15 +356,18 @@ func TestMetrics(t *testing.T) {
338356
fakeClient := fake.NewClientBuilder().
339357
WithScheme(scheme).
340358
Build()
341-
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond, time.Second*2)
359+
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
342360
ds := NewDatastore(ctx, pmf)
343361
_ = ds.PoolSet(ctx, fakeClient, inferencePool)
344362
for _, pod := range test.storePods {
345363
ds.PodUpdateOrAddIfNotExist(pod)
346364
}
347365
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
366+
if test.predict == nil {
367+
test.predict = backendmetrics.AllPodsPredicate
368+
}
348369
assert.EventuallyWithT(t, func(t *assert.CollectT) {
349-
got := ds.PodList(backendmetrics.AllPodPredicate)
370+
got := ds.PodList(test.predict)
350371
metrics := []*backendmetrics.MetricsState{}
351372
for _, one := range got {
352373
metrics = append(metrics, one.GetMetrics())
@@ -432,15 +453,15 @@ func TestPods(t *testing.T) {
432453
for _, test := range tests {
433454
t.Run(test.name, func(t *testing.T) {
434455
ctx := context.Background()
435-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
456+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
436457
ds := NewDatastore(t.Context(), pmf)
437458
for _, pod := range test.existingPods {
438459
ds.PodUpdateOrAddIfNotExist(pod)
439460
}
440461

441462
test.op(ctx, ds)
442463
var gotPods []*corev1.Pod
443-
for _, pm := range ds.PodList(backendmetrics.AllPodPredicate) {
464+
for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) {
444465
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
445466
gotPods = append(gotPods, pod)
446467
}

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
6363
return
6464
}
6565

66-
podMetrics := c.ds.PodList(backendmetrics.AllPodPredicate)
66+
podMetrics := c.ds.PodList(backendmetrics.AllPodsPredicate)
6767
if len(podMetrics) == 0 {
6868
return
6969
}

pkg/epp/metrics/collectors/inference_pool_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ var (
4949
)
5050

5151
func TestNoMetricsCollected(t *testing.T) {
52-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
52+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
5353
datastore := datastore.NewDatastore(context.Background(), pmf)
5454

5555
collector := &inferencePoolMetricsCollector{
@@ -67,7 +67,7 @@ func TestMetricsCollected(t *testing.T) {
6767
pod1NamespacedName: pod1Metrics,
6868
},
6969
}
70-
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2)
70+
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond)
7171
ds := datastore.NewDatastore(context.Background(), pmf)
7272

7373
scheme := runtime.NewScheme()

0 commit comments

Comments
 (0)