diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index eb2ea3b25..f60cefeae 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -46,6 +46,8 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors" @@ -59,10 +61,17 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer" testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" "sigs.k8s.io/gateway-api-inference-extension/version" ) +const ( + // enableExperimentalDatalayerV2 defines the environment variable + // used as feature flag for the pluggable data layer. + enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2" +) + var ( grpcPort = flag.Int( "grpc-port", @@ -245,40 +254,12 @@ func (r *Runner) Run(ctx context.Context) error { } // --- Setup Datastore --- - mapping, err := backendmetrics.NewMetricMapping( - *totalQueuedRequestsMetric, - *kvCacheUsagePercentageMetric, - *loraInfoMetric, - ) + useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog) + epf, err := r.setupMetricsCollection(setupLog, useDatalayerV2) if err != nil { - setupLog.Error(err, "Failed to create metric mapping from flags.") return err } - verifyMetricMapping(*mapping, setupLog) - - var metricsHttpClient *http.Client - if *modelServerMetricsScheme == "https" { - metricsHttpClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: *modelServerMetricsHttpsInsecureSkipVerify, - }, - }, - } - } else { - metricsHttpClient = http.DefaultClient - } - - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{ - MetricMapping: mapping, - ModelServerMetricsPort: int32(*modelServerMetricsPort), - ModelServerMetricsPath: *modelServerMetricsPath, - ModelServerMetricsScheme: *modelServerMetricsScheme, - Client: metricsHttpClient, - }, - *refreshMetricsInterval) - - datastore := datastore.NewDatastore(ctx, pmf) + datastore := datastore.NewDatastore(ctx, epf) // --- Setup Metrics Server --- customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)} @@ -371,6 +352,7 @@ func (r *Runner) Run(ctx context.Context) error { MetricsStalenessThreshold: *metricsStalenessThreshold, Director: director, SaturationDetector: saturationDetector, + UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag } if err := serverRunner.SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "Failed to setup EPP controllers") @@ -446,6 +428,81 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error { return nil } +func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDatalayer bool) (datalayer.EndpointFactory, error) { + if useExperimentalDatalayer { + return setupDatalayer() + } + + if len(datalayer.GetSources()) != 0 { + setupLog.Info("data sources registered but pluggable datalayer is disabled") + } + return setupMetricsV1(setupLog) +} + +func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) { + mapping, err := backendmetrics.NewMetricMapping( + *totalQueuedRequestsMetric, + *kvCacheUsagePercentageMetric, + *loraInfoMetric, + ) + if err != nil { + setupLog.Error(err, "Failed to create metric mapping from flags.") + return nil, err + } + verifyMetricMapping(*mapping, setupLog) + + var metricsHttpClient *http.Client + if *modelServerMetricsScheme == "https" { + metricsHttpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: *modelServerMetricsHttpsInsecureSkipVerify, + }, + }, + } + } else { + metricsHttpClient = http.DefaultClient + } + + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{ + MetricMapping: mapping, + ModelServerMetricsPort: int32(*modelServerMetricsPort), + ModelServerMetricsPath: *modelServerMetricsPath, + ModelServerMetricsScheme: *modelServerMetricsScheme, + Client: metricsHttpClient, + }, + *refreshMetricsInterval) + return pmf, nil +} + +func setupDatalayer() (datalayer.EndpointFactory, error) { + // create and register a metrics data source and extractor. In the future, + // data sources and extractors might be configured via a file. Once done, + // this (and registering the sources with the endpoint factory) should + // be moved accordingly. + source := dlmetrics.NewDataSource(*modelServerMetricsScheme, + int32(*modelServerMetricsPort), // start with (optional) command line port value + *modelServerMetricsPath, + *modelServerMetricsHttpsInsecureSkipVerify, + nil) + extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric, + *kvCacheUsagePercentageMetric, + *loraInfoMetric) + + if err != nil { + return nil, err + } + if err := source.AddExtractor(extractor); err != nil { + return nil, err + } + if err := datalayer.RegisterSource(source); err != nil { + return nil, err + } + + factory := datalayer.NewEndpointFactory(datalayer.GetSources(), *refreshMetricsInterval) + return factory, nil +} + func initLogging(opts *zap.Options) { // Unless -zap-log-level is explicitly set, use -v useV := true diff --git a/pkg/epp/datalayer/collector.go b/pkg/epp/datalayer/collector.go index 87d2a62e6..86a8f7b4e 100644 --- a/pkg/epp/datalayer/collector.go +++ b/pkg/epp/datalayer/collector.go @@ -82,17 +82,18 @@ func NewCollector() *Collector { } // Start initiates data source collection for the endpoint. +// TODO: pass PoolInfo for backward compatibility func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error { var ready chan struct{} started := false c.startOnce.Do(func() { + logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress()) c.ctx, c.cancel = context.WithCancel(ctx) started = true ready = make(chan struct{}) go func(endpoint Endpoint, sources []DataSource) { - logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress()) logger.V(logging.DEFAULT).Info("starting collection") defer func() { @@ -107,6 +108,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc case <-c.ctx.Done(): // per endpoint context cancelled return case <-ticker.Channel(): + // TODO: do not collect if there's no pool specified? for _, src := range sources { ctx, cancel := context.WithTimeout(c.ctx, defaultCollectionTimeout) _ = src.Collect(ctx, endpoint) // TODO: track errors per collector? diff --git a/pkg/epp/datalayer/datasource.go b/pkg/epp/datalayer/datasource.go index 6ad7b290d..737e2c50e 100644 --- a/pkg/epp/datalayer/datasource.go +++ b/pkg/epp/datalayer/datasource.go @@ -92,14 +92,27 @@ func (dsr *DataSourceRegistry) GetSources() []DataSource { // --- default registry accessors --- +// RegisterSource adds a new data source to the default registry. func RegisterSource(src DataSource) error { return defaultDataSources.Register(src) } -func GetNamedSource(name string) (DataSource, bool) { - return defaultDataSources.GetNamedSource(name) +// GetNamedSource returns a typed data source from the default registry. +func GetNamedSource[T DataSource](name string) (T, bool) { + v, ok := defaultDataSources.GetNamedSource(name) + if !ok { + var zero T + return zero, false + } + src, ok := v.(T) + if !ok { + var zero T + return zero, false + } + return src, true } +// GetSources returns the list of data sources registered in the default registry. func GetSources() []DataSource { return defaultDataSources.GetSources() } diff --git a/pkg/epp/datalayer/factory.go b/pkg/epp/datalayer/factory.go index a282069ee..eca7697e5 100644 --- a/pkg/epp/datalayer/factory.go +++ b/pkg/epp/datalayer/factory.go @@ -18,8 +18,12 @@ package datalayer import ( "context" + "sync" + "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/log" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" ) @@ -44,3 +48,74 @@ type EndpointFactory interface { NewEndpoint(parent context.Context, inpod *corev1.Pod, poolinfo PoolInfo) Endpoint ReleaseEndpoint(ep Endpoint) } + +// EndpointLifecycle manages the life cycle (creation and termination) of +// endpoints. +type EndpointLifecycle struct { + sources []DataSource // data sources for collectors + collectors sync.Map // collectors map. key: Pod namespaced name, value: *Collector + refreshInterval time.Duration // metrics refresh interval +} + +// NewEndpointFactory returns a new endpoint for factory, managing collectors for +// its endpoints. This function assumes that sources are not modified afterwards. +func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Duration) *EndpointLifecycle { + return &EndpointLifecycle{ + sources: sources, + collectors: sync.Map{}, + refreshInterval: refreshMetricsInterval, + } +} + +// NewEndpoint implements EndpointFactory.NewEndpoint. +// Creates a new endpoint and starts its associated collector with its own ticker. +// Guards against multiple concurrent calls for the same endpoint. +func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *corev1.Pod, _ PoolInfo) Endpoint { + key := types.NamespacedName{Namespace: inpod.Namespace, Name: inpod.Name} + logger := log.FromContext(parent).WithValues("pod", key) + + if _, ok := lc.collectors.Load(key); ok { + logger.Info("collector already running for endpoint", "endpoint", key) + return nil + } + + endpoint := NewEndpoint() + endpoint.UpdatePod(inpod) + collector := NewCollector() // for full backward compatibility, set the logger and poolinfo + + if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded { + // another goroutine already created and stored a collector for this endpoint. + // No need to start the new collector. + logger.Info("collector already running for endpoint", "endpoint", key) + return nil + } + + ticker := NewTimeTicker(lc.refreshInterval) + if err := collector.Start(parent, ticker, endpoint, lc.sources); err != nil { + logger.Error(err, "failed to start collector for endpoint", "endpoint", key) + lc.collectors.Delete(key) + } + + return endpoint +} + +// ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint +// Stops the collector and cleans up resources for the endpoint +func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) { + key := ep.GetPod().GetNamespacedName() + + if value, ok := lc.collectors.LoadAndDelete(key); ok { + collector := value.(*Collector) + _ = collector.Stop() + } +} + +// Shutdown gracefully stops all collectors and cleans up all resources. +func (lc *EndpointLifecycle) Shutdown() { + lc.collectors.Range(func(key, value any) bool { + collector := value.(*Collector) + _ = collector.Stop() + lc.collectors.Delete(key) + return true + }) +} diff --git a/pkg/epp/datalayer/metrics/client.go b/pkg/epp/datalayer/metrics/client.go index 7961b9247..962a2a584 100644 --- a/pkg/epp/datalayer/metrics/client.go +++ b/pkg/epp/datalayer/metrics/client.go @@ -33,22 +33,31 @@ type Client interface { Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error) } -// -- package implementations -- const ( + // the maximum idle connection count is shared by all endpoints. The value is + // set high to ensure the number of idle connection is above the expected + // endpoint count. Setting it too low would cause thrashing of the idle connection + // pool and incur higher overheads for every GET (e.g., socket initiation, certificate + // exchange, connections in timed wait state, etc.). maxIdleConnections = 5000 - maxIdleTime = 10 * time.Second - timeout = 10 * time.Second + maxIdleTime = 10 * time.Second // once a endpoint goes down, allow closing. + timeout = 10 * time.Second // mostly guard against unresponsive endpoints. + // allow some grace when connections are not made idle immediately (e.g., parsing + // and updating might take some time). This allows maintaining up to two idle connections + // per endpoint (defined as scheme://host:port). + maxIdleConnsPerHost = 2 ) var ( + baseTransport = &http.Transport{ + MaxIdleConns: maxIdleConnections, + MaxIdleConnsPerHost: maxIdleConnsPerHost, + // TODO: set additional timeouts, transport options, etc. + } defaultClient = &client{ Client: http.Client{ - Timeout: timeout, - Transport: &http.Transport{ - MaxIdleConns: maxIdleConnections, - MaxIdleConnsPerHost: 4, // host is defined as scheme://host:port - }, - // TODO: set additional timeouts, transport options, etc. + Timeout: timeout, + Transport: baseTransport, }, } ) diff --git a/pkg/epp/datalayer/metrics/datasource.go b/pkg/epp/datalayer/metrics/datasource.go index 5ff7ef9bf..7dcdc97ba 100644 --- a/pkg/epp/datalayer/metrics/datasource.go +++ b/pkg/epp/datalayer/metrics/datasource.go @@ -18,6 +18,7 @@ package metrics import ( "context" + "crypto/tls" "errors" "fmt" "net" @@ -30,7 +31,7 @@ import ( ) const ( - dataSourceName = "metrics-data-source" + DataSourceName = "metrics-data-source" ) // DataSource is a Model Server Protocol (MSP) compliant metrics data source, @@ -44,9 +45,19 @@ type DataSource struct { extractors sync.Map // key: name, value: extractor } -// NewDataSource returns a new MSP compliant metrics data source, configured with the provided -// client factory. If ClientFactory is nil, a default factory is used. -func NewDataSource(metricsScheme string, metricsPort int32, metricsPath string, cl Client) *DataSource { +// NewDataSource returns a new MSP compliant metrics data source, configured with +// the provided client factory. If ClientFactory is nil, a default factory is used. +// The Scheme, port and path are command line options. It should be noted that +// a port value of zero is set if the command line is unspecified. +func NewDataSource(metricsScheme string, metricsPort int32, metricsPath string, skipCertVerification bool, cl Client) *DataSource { + if metricsScheme == "https" { + httpsTransport := baseTransport.Clone() + httpsTransport.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: skipCertVerification, + } + defaultClient.Transport = httpsTransport + } + if cl == nil { cl = defaultClient } @@ -61,14 +72,23 @@ func NewDataSource(metricsScheme string, metricsPort int32, metricsPath string, } // SetPort updates the port used for metrics scraping. +// The port value can only be set once (i.e., if set by command line, +// do not overwrite from Pool.Spec). A port value of 0 (i.e., unspecified +// command line value) is ignored. +// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1398 func (dataSrc *DataSource) SetPort(metricsPort int32) { - port := strconv.Itoa(int(metricsPort)) - dataSrc.metricsPort.Store(&port) + if dataSrc.metricsPort.Load() != nil { // do not overwrite + return + } + if metricsPort != 0 { // ignore zero value for port + port := strconv.Itoa(int(metricsPort)) + dataSrc.metricsPort.Store(&port) + } } // Name returns the metrics data source name. func (dataSrc *DataSource) Name() string { - return dataSourceName + return DataSourceName } // AddExtractor adds an extractor to the data source, validating it can process diff --git a/pkg/epp/datalayer/metrics/logger.go b/pkg/epp/datalayer/metrics/logger.go new file mode 100644 index 000000000..fac757dbe --- /dev/null +++ b/pkg/epp/datalayer/metrics/logger.go @@ -0,0 +1,138 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "time" + + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/log" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +const debugPrintInterval = 5 * time.Second + +// StartMetricsLogger starts background goroutines for: +// 1. Refreshing Prometheus metrics periodically +// 2. Debug logging (if DEBUG level enabled) +func StartMetricsLogger(ctx context.Context, datastore datalayer.PoolInfo, refreshInterval, stalenessThreshold time.Duration) { + logger := log.FromContext(ctx) + + go runPrometheusRefresher(ctx, logger, datastore, refreshInterval, stalenessThreshold) + + if logger.V(logutil.DEBUG).Enabled() { + go runDebugLogger(ctx, logger, datastore, stalenessThreshold) + } +} + +func runPrometheusRefresher(ctx context.Context, logger logr.Logger, datastore datalayer.PoolInfo, interval, stalenessThreshold time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread") + return + case <-ticker.C: + refreshPrometheusMetrics(logger, datastore, stalenessThreshold) + } + } +} + +func runDebugLogger(ctx context.Context, logger logr.Logger, datastore datalayer.PoolInfo, stalenessThreshold time.Duration) { + ticker := time.NewTicker(debugPrintInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + logger.V(logutil.DEFAULT).Info("Shutting down metrics logger thread") + return + case <-ticker.C: + printDebugMetrics(logger, datastore, stalenessThreshold) + } + } +} + +func podsWithFreshMetrics(stalenessThreshold time.Duration) func(datalayer.Endpoint) bool { + return func(ep datalayer.Endpoint) bool { + if ep == nil { + return false // Skip nil pods + } + return time.Since(ep.GetMetrics().UpdateTime) <= stalenessThreshold + } +} + +func podsWithStaleMetrics(stalenessThreshold time.Duration) func(datalayer.Endpoint) bool { + return func(ep datalayer.Endpoint) bool { + if ep == nil { + return false // Skip nil pods + } + return time.Since(ep.GetMetrics().UpdateTime) > stalenessThreshold + } +} + +func printDebugMetrics(logger logr.Logger, datastore datalayer.PoolInfo, stalenessThreshold time.Duration) { + freshPods := datastore.PodList(podsWithFreshMetrics(stalenessThreshold)) + stalePods := datastore.PodList(podsWithStaleMetrics(stalenessThreshold)) + + logger.V(logutil.VERBOSE).Info("Current Pods and metrics gathered", + "Fresh metrics", freshPods, "Stale metrics", stalePods) +} + +func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, stalenessThreshold time.Duration) { + pool, err := datastore.PoolGet() + if err != nil { + logger.V(logutil.DEFAULT).Info("Pool is not initialized, skipping refreshing metrics") + return + } + + podMetrics := datastore.PodList(podsWithFreshMetrics(stalenessThreshold)) + logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics)) + + if len(podMetrics) == 0 { + return + } + + totals := calculateTotals(podMetrics) + podCount := len(podMetrics) + + metrics.RecordInferencePoolAvgKVCache(pool.Name, totals.kvCache/float64(podCount)) + metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(totals.queueSize/podCount)) + metrics.RecordInferencePoolReadyPods(pool.Name, float64(podCount)) +} + +// totals holds aggregated metric values +type totals struct { + kvCache float64 + queueSize int +} + +func calculateTotals(endpoints []datalayer.Endpoint) totals { + var result totals + for _, pod := range endpoints { + metrics := pod.GetMetrics() + result.kvCache += metrics.KVCacheUsagePercent + result.queueSize += metrics.WaitingQueueSize + } + return result +} diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 00275b946..38127f886 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" podutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pod" ) @@ -116,6 +117,11 @@ func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *v1 oldPool := ds.pool ds.pool = pool + if oldPool == nil || pool.Spec.TargetPorts[0] != oldPool.Spec.TargetPorts[0] { + if source, found := datalayer.GetNamedSource[*dlmetrics.DataSource](dlmetrics.DataSourceName); found { + source.SetPort(int32(pool.Spec.TargetPorts[0].Number)) + } + } if oldPool == nil || !reflect.DeepEqual(pool.Spec.Selector, oldPool.Spec.Selector) { logger.V(logutil.DEFAULT).Info("Updating inference pool endpoints", "selector", pool.Spec.Selector) // A full resync is required to address two cases: diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 1b84bd70c..69a928eda 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/controller" + dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" @@ -56,6 +57,7 @@ type ExtProcServerRunner struct { MetricsStalenessThreshold time.Duration Director *requestcontrol.Director SaturationDetector requestcontrol.SaturationDetector + UseExperimentalDatalayerV2 bool // Pluggable data layer feature flag // This should only be used in tests. We won't need this once we do not inject metrics in the tests. // TODO:(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/432) Cleanup @@ -138,7 +140,12 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man // The runnable implements LeaderElectionRunnable with leader election disabled. func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable { return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error { - backendmetrics.StartMetricsLogger(ctx, r.Datastore, r.RefreshPrometheusMetricsInterval, r.MetricsStalenessThreshold) + if r.UseExperimentalDatalayerV2 { + dlmetrics.StartMetricsLogger(ctx, r.Datastore, r.RefreshPrometheusMetricsInterval, r.MetricsStalenessThreshold) + } else { + backendmetrics.StartMetricsLogger(ctx, r.Datastore, r.RefreshPrometheusMetricsInterval, r.MetricsStalenessThreshold) + } + var srv *grpc.Server if r.SecureServing { var cert tls.Certificate