Skip to content

Commit 7fae938

Browse files
elevranirar2
andauthored
Validate datalayer with additional testing (#1857)
* additional tests and fixes for datalayer metrics Signed-off-by: Etai Lev Ran <[email protected]> * log data layer configuration on startup Signed-off-by: Etai Lev Ran <[email protected]> * fix typo Signed-off-by: Etai Lev Ran <[email protected]> * Log pods metrics using fmt.Sprintf Signed-off-by: irar2 <[email protected]> * conforming boilerplate Signed-off-by: Etai Lev Ran <[email protected]> * address review comments Signed-off-by: Etai Lev Ran <[email protected]> --------- Signed-off-by: Etai Lev Ran <[email protected]> Signed-off-by: irar2 <[email protected]> Co-authored-by: irar2 <[email protected]>
1 parent 5ce1385 commit 7fae938

File tree

12 files changed

+220
-17
lines changed

12 files changed

+220
-17
lines changed

cmd/epp/runner/runner.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
447447

448448
func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDatalayer bool) (datalayer.EndpointFactory, error) {
449449
if useExperimentalDatalayer {
450-
return setupDatalayer()
450+
return setupDatalayer(setupLog)
451451
}
452452

453453
if len(datalayer.GetSources()) != 0 {
@@ -492,11 +492,16 @@ func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
492492
return pmf, nil
493493
}
494494

495-
func setupDatalayer() (datalayer.EndpointFactory, error) {
496-
// create and register a metrics data source and extractor. In the future,
497-
// data sources and extractors might be configured via a file. Once done,
498-
// this (and registering the sources with the endpoint factory) should
499-
// be moved accordingly.
495+
// This function serves two (independent) purposes:
496+
// - creating data sources and configuring their extractors.
497+
// - configuring endpoint factory with the provided source.
498+
// In the future, data sources and extractors might be configured via
499+
// a file. Once done, this (and registering the sources with the
500+
// endpoint factory) should be moved accordingly.
501+
// Regardless, registration of all sources (e.g., if additional sources
502+
// are to be configured), must be done before the EndpointFactory is initialized.
503+
func setupDatalayer(logger logr.Logger) (datalayer.EndpointFactory, error) {
504+
// create and register a metrics data source and extractor.
500505
source := dlmetrics.NewDataSource(*modelServerMetricsScheme,
501506
*modelServerMetricsPath,
502507
*modelServerMetricsHttpsInsecureSkipVerify,
@@ -515,7 +520,12 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
515520
return nil, err
516521
}
517522

518-
factory := datalayer.NewEndpointFactory(datalayer.GetSources(), *refreshMetricsInterval)
523+
// TODO: this could be moved to the configuration loading functions once ported over.
524+
sources := datalayer.GetSources()
525+
for _, src := range sources {
526+
logger.Info("data layer configuration", "source", src.Name(), "extractors", src.Extractors())
527+
}
528+
factory := datalayer.NewEndpointFactory(sources, *refreshMetricsInterval)
519529
return factory, nil
520530
}
521531

pkg/epp/datalayer/collector_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,22 @@ type DummySource struct {
3636
}
3737

3838
func (d *DummySource) Name() string { return "test-dummy-data-source" }
39+
func (d *DummySource) Extractors() []string { return []string{} }
3940
func (d *DummySource) AddExtractor(_ Extractor) error { return nil }
4041
func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error {
4142
atomic.AddInt64(&d.callCount, 1)
4243
return nil
4344
}
4445

4546
func defaultEndpoint() Endpoint {
46-
ms := NewEndpoint()
4747
pod := &PodInfo{
4848
NamespacedName: types.NamespacedName{
4949
Name: "pod-name",
5050
Namespace: "default",
5151
},
5252
Address: "1.2.3.4:5678",
5353
}
54-
ms.UpdatePod(pod)
54+
ms := NewEndpoint(pod, nil)
5555
return ms
5656
}
5757

pkg/epp/datalayer/datasource.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ import (
2626

2727
// DataSource provides raw data to registered Extractors.
2828
type DataSource interface {
29+
// Name of this data source.
2930
Name() string
31+
// Extractors returns a list of registered Extractor names.
32+
Extractors() []string
3033
// AddExtractor adds an extractor to the data source. Multiple
3134
// Extractors can be registered.
3235
// The extractor will be called whenever the DataSource might

pkg/epp/datalayer/datasource_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type mockDataSource struct {
2929
}
3030

3131
func (m *mockDataSource) Name() string { return m.name }
32+
func (m *mockDataSource) Extractors() []string { return []string{} }
3233
func (m *mockDataSource) AddExtractor(_ Extractor) error { return nil }
3334
func (m *mockDataSource) Collect(_ context.Context, _ Endpoint) error { return nil }
3435

pkg/epp/datalayer/endpoint.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,20 @@ type ModelServer struct {
4848
attributes *Attributes
4949
}
5050

51-
// NewEndpoint return a new (uninitialized) ModelServer.
52-
func NewEndpoint() *ModelServer {
53-
return &ModelServer{
51+
// NewEndpoint returns a new ModelServer with the given PodInfo and Metrics.
52+
func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer {
53+
if pod == nil {
54+
pod = &PodInfo{}
55+
}
56+
if metrics == nil {
57+
metrics = NewMetrics()
58+
}
59+
ep := &ModelServer{
5460
attributes: NewAttributes(),
5561
}
62+
ep.UpdatePod(pod)
63+
ep.UpdateMetrics(metrics)
64+
return ep
5665
}
5766

5867
// String returns a representation of the ModelServer. For brevity, only names of

pkg/epp/datalayer/factory.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,8 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo,
7878
return nil
7979
}
8080

81-
endpoint := NewEndpoint()
82-
endpoint.UpdatePod(inpod)
83-
collector := NewCollector() // for full backward compatibility, set the logger and poolinfo
81+
endpoint := NewEndpoint(inpod, nil)
82+
collector := NewCollector() // TODO or full backward compatibility, set the logger and poolinfo
8483

8584
if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded {
8685
// another goroutine already created and stored a collector for this endpoint.

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,18 @@ func (dataSrc *DataSource) Name() string {
7171
return DataSourceName
7272
}
7373

74+
// Extractors returns a list of registered Extractor names.
75+
func (dataSrc *DataSource) Extractors() []string {
76+
names := []string{}
77+
dataSrc.extractors.Range(func(_, val any) bool {
78+
if ex, ok := val.(datalayer.Extractor); ok {
79+
names = append(names, ex.Name())
80+
}
81+
return true // continue iteration
82+
})
83+
return names
84+
}
85+
7486
// AddExtractor adds an extractor to the data source, validating it can process
7587
// the metrics' data source output type.
7688
func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {

pkg/epp/datalayer/metrics/extractor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ import (
2626
"time"
2727

2828
dto "github.com/prometheus/client_model/go"
29+
"sigs.k8s.io/controller-runtime/pkg/log"
2930

3031
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
3132
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
33+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3234
)
3335

3436
const (
@@ -136,8 +138,10 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi
136138
}
137139
}
138140

141+
logger := log.FromContext(ctx).WithValues("pod", ep.GetPod().NamespacedName)
139142
if updated {
140143
clone.UpdateTime = time.Now()
144+
logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", clone)
141145
ep.UpdateMetrics(clone)
142146
}
143147

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/google/go-cmp/cmp"
24+
dto "github.com/prometheus/client_model/go"
25+
"k8s.io/utils/ptr"
26+
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
28+
)
29+
30+
const (
31+
// use hardcoded values - importing causes cycle
32+
defaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting"
33+
)
34+
35+
func TestExtractorExtract(t *testing.T) {
36+
ctx := context.Background()
37+
38+
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "")
39+
if err != nil {
40+
t.Fatalf("failed to create extractor: %v", err)
41+
}
42+
43+
ep := datalayer.NewEndpoint(nil, nil)
44+
if ep == nil {
45+
t.Fatal("expected non-nil endpoint")
46+
}
47+
48+
tests := []struct {
49+
name string
50+
data any
51+
wantErr bool
52+
updated bool // whether metrics are expected to change
53+
}{
54+
{
55+
name: "nil data",
56+
data: nil,
57+
wantErr: true,
58+
updated: false,
59+
},
60+
{
61+
name: "empty PrometheusMetricMap",
62+
data: PrometheusMetricMap{},
63+
wantErr: true, // errors when metrics are missing
64+
updated: false, // and also not updated...
65+
},
66+
{
67+
name: "single valid metric",
68+
data: PrometheusMetricMap{
69+
defaultTotalQueuedRequestsMetric: &dto.MetricFamily{
70+
Type: dto.MetricType_GAUGE.Enum(),
71+
Metric: []*dto.Metric{
72+
{
73+
Gauge: &dto.Gauge{Value: ptr.To(5.0)},
74+
},
75+
},
76+
},
77+
},
78+
wantErr: true, // missing metrics can return an error
79+
updated: true, // but should still update
80+
},
81+
}
82+
83+
for _, tt := range tests {
84+
t.Run(tt.name, func(t *testing.T) {
85+
defer func() {
86+
if r := recover(); r != nil {
87+
t.Errorf("Extract panicked: %v", r)
88+
}
89+
}()
90+
91+
before := ep.GetMetrics().Clone()
92+
err := extractor.Extract(ctx, tt.data, ep)
93+
after := ep.GetMetrics()
94+
95+
if tt.wantErr && err == nil {
96+
t.Errorf("expected error but got nil")
97+
}
98+
if !tt.wantErr && err != nil {
99+
t.Errorf("unexpected error: %v", err)
100+
}
101+
102+
if tt.updated {
103+
if diff := cmp.Diff(before, after); diff == "" {
104+
t.Errorf("expected metrics to be updated, but no change detected")
105+
}
106+
} else {
107+
if diff := cmp.Diff(before, after); diff != "" {
108+
t.Errorf("expected no metrics update, but got changes:\n%s", diff)
109+
}
110+
}
111+
})
112+
}
113+
}

pkg/epp/datalayer/metrics/logger.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package metrics
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"time"
2223

2324
"github.com/go-logr/logr"
@@ -95,8 +96,8 @@ func printDebugMetrics(logger logr.Logger, datastore datalayer.PoolInfo, stalene
9596
freshPods := datastore.PodList(podsWithFreshMetrics(stalenessThreshold))
9697
stalePods := datastore.PodList(podsWithStaleMetrics(stalenessThreshold))
9798

98-
logger.V(logutil.VERBOSE).Info("Current Pods and metrics gathered",
99-
"Fresh metrics", freshPods, "Stale metrics", stalePods)
99+
logger.V(logutil.TRACE).Info("Current Pods and metrics gathered",
100+
"Fresh metrics", fmt.Sprintf("%+v", freshPods), "Stale metrics", fmt.Sprintf("%+v", stalePods))
100101
}
101102

102103
func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, stalenessThreshold time.Duration) {

0 commit comments

Comments
 (0)