Skip to content

Commit 3bcb68a

Browse files
committed
enable global metrics logging
Signed-off-by: Etai Lev Ran <[email protected]>
1 parent 6bb9baf commit 3bcb68a

File tree

3 files changed

+179
-1
lines changed

3 files changed

+179
-1
lines changed

pkg/epp/datalayer/enabled.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
import (
20+
"github.com/go-logr/logr"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
23+
)
24+
25+
const (
26+
EnableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
27+
)
28+
29+
func Enabled(logger logr.Logger) bool {
30+
return env.GetEnvBool(EnableExperimentalDatalayerV2, false, logger)
31+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/go-logr/logr"
25+
"sigs.k8s.io/controller-runtime/pkg/log"
26+
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
29+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
30+
)
31+
32+
const debugPrintInterval = 5 * time.Second
33+
34+
// StartMetricsLogger starts background goroutines for:
35+
// 1. Refreshing Prometheus metrics periodically
36+
// 2. Debug logging (if DEBUG level enabled)
37+
func StartMetricsLogger(ctx context.Context, datastore datalayer.PoolInfo, refreshInterval, stalenessThreshold time.Duration) {
38+
logger := log.FromContext(ctx)
39+
40+
go runPrometheusRefresher(ctx, logger, datastore, refreshInterval, stalenessThreshold)
41+
42+
if logger.V(logutil.DEBUG).Enabled() {
43+
go runDebugLogger(ctx, logger, datastore, stalenessThreshold)
44+
}
45+
}
46+
47+
func runPrometheusRefresher(ctx context.Context, logger logr.Logger, datastore datalayer.PoolInfo, interval, stalenessThreshold time.Duration) {
48+
ticker := time.NewTicker(interval)
49+
defer ticker.Stop()
50+
51+
for {
52+
select {
53+
case <-ctx.Done():
54+
logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread")
55+
return
56+
case <-ticker.C:
57+
refreshPrometheusMetrics(logger, datastore, stalenessThreshold)
58+
}
59+
}
60+
}
61+
62+
func runDebugLogger(ctx context.Context, logger logr.Logger, datastore datalayer.PoolInfo, stalenessThreshold time.Duration) {
63+
ticker := time.NewTicker(debugPrintInterval)
64+
defer ticker.Stop()
65+
66+
for {
67+
select {
68+
case <-ctx.Done():
69+
logger.V(logutil.DEFAULT).Info("Shutting down metrics logger thread")
70+
return
71+
case <-ticker.C:
72+
printDebugMetrics(logger, datastore, stalenessThreshold)
73+
}
74+
}
75+
}
76+
77+
func podsWithFreshMetrics(stalenessThreshold time.Duration) func(datalayer.Endpoint) bool {
78+
return func(ep datalayer.Endpoint) bool {
79+
if ep == nil {
80+
return false // Skip nil pods
81+
}
82+
return time.Since(ep.GetMetrics().UpdateTime) <= stalenessThreshold
83+
}
84+
}
85+
86+
func podsWithStaleMetrics(stalenessThreshold time.Duration) func(datalayer.Endpoint) bool {
87+
return func(ep datalayer.Endpoint) bool {
88+
if ep == nil {
89+
return false // Skip nil pods
90+
}
91+
return time.Since(ep.GetMetrics().UpdateTime) > stalenessThreshold
92+
}
93+
}
94+
95+
func printDebugMetrics(logger logr.Logger, datastore datalayer.PoolInfo, stalenessThreshold time.Duration) {
96+
freshPods := datastore.PodList(podsWithFreshMetrics(stalenessThreshold))
97+
stalePods := datastore.PodList(podsWithStaleMetrics(stalenessThreshold))
98+
99+
message := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v",
100+
freshPods, stalePods)
101+
logger.V(logutil.VERBOSE).Info(message)
102+
}
103+
104+
func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, stalenessThreshold time.Duration) {
105+
pool, err := datastore.PoolGet()
106+
if err != nil {
107+
logger.V(logutil.DEFAULT).Info("Pool is not initialized, skipping refreshing metrics")
108+
return
109+
}
110+
111+
podMetrics := datastore.PodList(podsWithFreshMetrics(stalenessThreshold))
112+
logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics))
113+
114+
if len(podMetrics) == 0 {
115+
return
116+
}
117+
118+
totals := calculateTotals(podMetrics)
119+
podCount := len(podMetrics)
120+
121+
metrics.RecordInferencePoolAvgKVCache(pool.Name, totals.kvCache/float64(podCount))
122+
metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(totals.queueSize/podCount))
123+
metrics.RecordInferencePoolReadyPods(pool.Name, float64(podCount))
124+
}
125+
126+
// totals holds aggregated metric values
127+
type totals struct {
128+
kvCache float64
129+
queueSize int
130+
}
131+
132+
func calculateTotals(endpoints []datalayer.Endpoint) totals {
133+
var result totals
134+
for _, pod := range endpoints {
135+
metrics := pod.GetMetrics()
136+
result.kvCache += metrics.KVCacheUsagePercent
137+
result.queueSize += metrics.WaitingQueueSize
138+
}
139+
return result
140+
}

pkg/epp/server/runserver.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import (
3838
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
3939
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4040
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/controller"
41+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
42+
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
4143
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4244
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
4345
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
@@ -138,7 +140,12 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man
138140
// The runnable implements LeaderElectionRunnable with leader election disabled.
139141
func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
140142
return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error {
141-
backendmetrics.StartMetricsLogger(ctx, r.Datastore, r.RefreshPrometheusMetricsInterval, r.MetricsStalenessThreshold)
143+
if datalayer.Enabled(logger) {
144+
dlmetrics.StartMetricsLogger(ctx, r.Datastore, r.RefreshPrometheusMetricsInterval, r.MetricsStalenessThreshold)
145+
} else {
146+
backendmetrics.StartMetricsLogger(ctx, r.Datastore, r.RefreshPrometheusMetricsInterval, r.MetricsStalenessThreshold)
147+
}
148+
142149
var srv *grpc.Server
143150
if r.SecureServing {
144151
var cert tls.Certificate

0 commit comments

Comments
 (0)