Skip to content

Commit b1b43dc

Browse files
authored
feat: Make metrics stale time configurable (#1046)
1 parent f4f803b commit b1b43dc

File tree

19 files changed

+76
-71
lines changed

19 files changed

+76
-71
lines changed

cmd/epp/runner/runner.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,6 @@ var (
9595
"pool-namespace",
9696
runserver.DefaultPoolNamespace,
9797
"Namespace of the InferencePool this Endpoint Picker is associated with.")
98-
refreshMetricsInterval = flag.Duration(
99-
"refresh-metrics-interval",
100-
runserver.DefaultRefreshMetricsInterval,
101-
"interval to refresh metrics")
102-
refreshPrometheusMetricsInterval = flag.Duration(
103-
"refresh-prometheus-metrics-interval",
104-
runserver.DefaultRefreshPrometheusMetricsInterval,
105-
"interval to flush prometheus metrics")
10698
logVerbosity = flag.Int(
10799
"v",
108100
logging.DEFAULT,
@@ -135,6 +127,19 @@ var (
135127
"lora-info-metric",
136128
runserver.DefaultLoraInfoMetric,
137129
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
130+
131+
// metrics related flags
132+
refreshMetricsInterval = flag.Duration(
133+
"refresh-metrics-interval",
134+
runserver.DefaultRefreshMetricsInterval,
135+
"interval to refresh metrics")
136+
refreshPrometheusMetricsInterval = flag.Duration(
137+
"refresh-prometheus-metrics-interval",
138+
runserver.DefaultRefreshPrometheusMetricsInterval,
139+
"interval to flush prometheus metrics")
140+
metricsStalenessThreshold = flag.Duration("metrics-staleness-threshold",
141+
runserver.DefaultMetricsStalenessThreshold,
142+
"Duration after which metrics are considered stale. This is used to determine if a pod's metrics are fresh enough.")
138143
// configuration flags
139144
configFile = flag.String(
140145
"config-file",
@@ -268,7 +273,8 @@ func (r *Runner) Run(ctx context.Context) error {
268273
ModelServerMetricsPath: *modelServerMetricsPath,
269274
ModelServerMetricsScheme: *modelServerMetricsScheme,
270275
Client: metricsHttpClient,
271-
}, *refreshMetricsInterval)
276+
},
277+
*refreshMetricsInterval, *metricsStalenessThreshold)
272278

273279
datastore := datastore.NewDatastore(ctx, pmf)
274280

@@ -337,6 +343,7 @@ func (r *Runner) Run(ctx context.Context) error {
337343
HealthChecking: *healthChecking,
338344
CertPath: *certPath,
339345
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
346+
MetricsStalenessThreshold: *metricsStalenessThreshold,
340347
Director: director,
341348
SaturationDetector: saturationDetector,
342349
}

pkg/epp/backend/metrics/logger.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,18 @@ import (
3030
)
3131

3232
const (
33-
// Note currently the EPP treats stale metrics same as fresh.
34-
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336
35-
metricsValidityPeriod = 5 * time.Second
36-
debugPrintInterval = 5 * time.Second
33+
debugPrintInterval = 5 * time.Second
3734
)
3835

3936
type Datastore interface {
4037
PoolGet() (*v1.InferencePool, error)
4138
// PodMetrics operations
42-
// PodGetAll returns all pods and metrics, including fresh and stale.
43-
PodGetAll() []PodMetrics
4439
PodList(func(PodMetrics) bool) []PodMetrics
4540
}
4641

4742
// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
4843
// enabled; 2) flushes Prometheus metrics about the backend servers.
49-
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
44+
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
5045
logger := log.FromContext(ctx)
5146
ticker := time.NewTicker(refreshPrometheusMetricsInterval)
5247
go func() {
@@ -57,7 +52,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
5752
logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread")
5853
return
5954
case <-ticker.C: // Periodically refresh prometheus metrics for inference pool
60-
refreshPrometheusMetrics(logger, datastore)
55+
refreshPrometheusMetrics(logger, datastore, metricsStalenessThreshold)
6156
}
6257
}
6358
}()
@@ -74,10 +69,10 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
7469
return
7570
case <-ticker.C:
7671
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
77-
return time.Since(pm.GetMetrics().UpdateTime) <= metricsValidityPeriod
72+
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
7873
})
7974
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
80-
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
75+
return time.Since(pm.GetMetrics().UpdateTime) > metricsStalenessThreshold
8176
})
8277
s := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v", podsWithFreshMetrics, podsWithStaleMetrics)
8378
logger.V(logutil.VERBOSE).Info(s)
@@ -87,7 +82,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
8782
}
8883
}
8984

90-
func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
85+
func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsStalenessThreshold time.Duration) {
9186
pool, err := datastore.PoolGet()
9287
if err != nil {
9388
// No inference pool or not initialize.
@@ -98,7 +93,9 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
9893
var kvCacheTotal float64
9994
var queueTotal int
10095

101-
podMetrics := datastore.PodGetAll()
96+
podMetrics := datastore.PodList(func(pm PodMetrics) bool {
97+
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
98+
})
10299
logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics))
103100
if len(podMetrics) == 0 {
104101
return

pkg/epp/backend/metrics/pod_metrics_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ var (
6262
func TestMetricsRefresh(t *testing.T) {
6363
ctx := context.Background()
6464
pmc := &FakePodMetricsClient{}
65-
pmf := NewPodMetricsFactory(pmc, time.Millisecond)
65+
pmf := NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2)
6666

6767
// The refresher is initialized with empty metrics.
6868
pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{})
@@ -90,10 +90,7 @@ type fakeDataStore struct{}
9090
func (f *fakeDataStore) PoolGet() (*v1.InferencePool, error) {
9191
return &v1.InferencePool{Spec: v1.InferencePoolSpec{TargetPortNumber: 8000}}, nil
9292
}
93-
func (f *fakeDataStore) PodGetAll() []PodMetrics {
94-
// Not implemented.
95-
return nil
96-
}
93+
9794
func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics {
9895
// Not implemented.
9996
return nil

pkg/epp/backend/metrics/types.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,22 @@ import (
2828
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
2929
)
3030

31-
func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval time.Duration) *PodMetricsFactory {
31+
var (
32+
AllPodPredicate = func(PodMetrics) bool { return true }
33+
)
34+
35+
func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsStalenessThreshold time.Duration) *PodMetricsFactory {
3236
return &PodMetricsFactory{
33-
pmc: pmc,
34-
refreshMetricsInterval: refreshMetricsInterval,
37+
pmc: pmc,
38+
refreshMetricsInterval: refreshMetricsInterval,
39+
metricsStalenessThreshold: metricsStalenessThreshold,
3540
}
3641
}
3742

3843
type PodMetricsFactory struct {
39-
pmc PodMetricsClient
40-
refreshMetricsInterval time.Duration
44+
pmc PodMetricsClient
45+
refreshMetricsInterval time.Duration
46+
metricsStalenessThreshold time.Duration
4147
}
4248

4349
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {

pkg/epp/controller/inferenceobjective_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) {
196196
WithObjects(initObjs...).
197197
WithIndex(&v1alpha2.InferenceObjective{}, datastore.ModelNameIndexKey, indexInferenceObjectivesByModelName).
198198
Build()
199-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
199+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
200200
ds := datastore.NewDatastore(t.Context(), pmf)
201201
for _, m := range test.objectivessInStore {
202202
ds.ObjectiveSetIfOlder(m)

pkg/epp/controller/inferencepool_reconciler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestInferencePoolReconciler(t *testing.T) {
9797
req := ctrl.Request{NamespacedName: namespacedName}
9898
ctx := context.Background()
9999

100-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
100+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
101101
datastore := datastore.NewDatastore(ctx, pmf)
102102
inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore}
103103

@@ -172,7 +172,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
172172
params.wantPods = []string{}
173173
}
174174
gotPods := []string{}
175-
for _, pm := range datastore.PodGetAll() {
175+
for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) {
176176
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
177177
}
178178
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {

pkg/epp/controller/pod_reconciler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ var (
4444
basePod3 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{PodIP: "address-3"}}
4545
basePod11 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{PodIP: "address-11"}}
4646
pmc = &backendmetrics.FakePodMetricsClient{}
47-
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second)
47+
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second, time.Second*2)
4848
)
4949

5050
func TestPodReconciler(t *testing.T) {
@@ -198,7 +198,7 @@ func TestPodReconciler(t *testing.T) {
198198
}
199199

200200
var gotPods []*corev1.Pod
201-
for _, pm := range store.PodGetAll() {
201+
for _, pm := range store.PodList(backendmetrics.AllPodPredicate) {
202202
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
203203
gotPods = append(gotPods, pod)
204204
}

pkg/epp/datastore/datastore.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ type Datastore interface {
6262
ObjectiveResync(ctx context.Context, reader client.Reader, modelName string) (bool, error)
6363
ObjectiveGetAll() []*v1alpha2.InferenceObjective
6464

65-
// PodMetrics operations
66-
// PodGetAll returns all pods and metrics, including fresh and stale.
67-
PodGetAll() []backendmetrics.PodMetrics
6865
// PodList lists pods matching the given predicate.
6966
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
7067
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
@@ -245,11 +242,8 @@ func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective {
245242
}
246243

247244
// /// Pods/endpoints APIs ///
248-
249-
func (ds *datastore) PodGetAll() []backendmetrics.PodMetrics {
250-
return ds.PodList(func(backendmetrics.PodMetrics) bool { return true })
251-
}
252-
245+
// TODO: add a flag for callers to specify the staleness threshold for metrics.
246+
// ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694
253247
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
254248
res := []backendmetrics.PodMetrics{}
255249

pkg/epp/datastore/datastore_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestPool(t *testing.T) {
8282
fakeClient := fake.NewClientBuilder().
8383
WithScheme(scheme).
8484
Build()
85-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
85+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
8686
datastore := NewDatastore(context.Background(), pmf)
8787
_ = datastore.PoolSet(context.Background(), fakeClient, tt.inferencePool)
8888
gotPool, gotErr := datastore.PoolGet()
@@ -214,7 +214,7 @@ func TestModel(t *testing.T) {
214214
}
215215
for _, test := range tests {
216216
t.Run(test.name, func(t *testing.T) {
217-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
217+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
218218
ds := NewDatastore(t.Context(), pmf)
219219
for _, m := range test.existingModels {
220220
ds.ObjectiveSetIfOlder(m)
@@ -265,6 +265,7 @@ var (
265265
},
266266
WaitingModels: map[string]int{},
267267
}
268+
268269
pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
269270
pod2NamespacedName = types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}
270271
inferencePool = &v1.InferencePool{
@@ -314,8 +315,7 @@ func TestMetrics(t *testing.T) {
314315
},
315316
},
316317
storePods: []*corev1.Pod{pod1, pod2},
317-
want: []*backendmetrics.MetricsState{
318-
pod1Metrics,
318+
want: []*backendmetrics.MetricsState{pod1Metrics,
319319
// Failed to fetch pod2 metrics so it remains the default values.
320320
{
321321
ActiveModels: map[string]int{},
@@ -338,14 +338,15 @@ func TestMetrics(t *testing.T) {
338338
fakeClient := fake.NewClientBuilder().
339339
WithScheme(scheme).
340340
Build()
341-
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
341+
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond, time.Second*2)
342342
ds := NewDatastore(ctx, pmf)
343343
_ = ds.PoolSet(ctx, fakeClient, inferencePool)
344344
for _, pod := range test.storePods {
345345
ds.PodUpdateOrAddIfNotExist(pod)
346346
}
347+
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
347348
assert.EventuallyWithT(t, func(t *assert.CollectT) {
348-
got := ds.PodGetAll()
349+
got := ds.PodList(backendmetrics.AllPodPredicate)
349350
metrics := []*backendmetrics.MetricsState{}
350351
for _, one := range got {
351352
metrics = append(metrics, one.GetMetrics())
@@ -431,15 +432,15 @@ func TestPods(t *testing.T) {
431432
for _, test := range tests {
432433
t.Run(test.name, func(t *testing.T) {
433434
ctx := context.Background()
434-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
435+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
435436
ds := NewDatastore(t.Context(), pmf)
436437
for _, pod := range test.existingPods {
437438
ds.PodUpdateOrAddIfNotExist(pod)
438439
}
439440

440441
test.op(ctx, ds)
441442
var gotPods []*corev1.Pod
442-
for _, pm := range ds.PodGetAll() {
443+
for _, pm := range ds.PodList(backendmetrics.AllPodPredicate) {
443444
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
444445
gotPods = append(gotPods, pod)
445446
}

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/client_golang/prometheus"
2121
compbasemetrics "k8s.io/component-base/metrics"
2222

23+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
2425
metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics"
2526
)
@@ -62,7 +63,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
6263
return
6364
}
6465

65-
podMetrics := c.ds.PodGetAll()
66+
podMetrics := c.ds.PodList(backendmetrics.AllPodPredicate)
6667
if len(podMetrics) == 0 {
6768
return
6869
}

0 commit comments

Comments
 (0)