Skip to content

Commit 25cfb90

Browse files
authored
Define and register plugin factories for datalayer (kubernetes-sigs#1911)
* Define and register plugin factories for datalayer. - Factories for metrics DataSource and Extracor. - Configuration parameters (with defaults set from command line flags). - Consistently use typedName instead of `tn`. Signed-off-by: Etai Lev Ran <[email protected]> * boilerplate newlines Signed-off-by: Etai Lev Ran <[email protected]> * use more descriptive names, per PR review Signed-off-by: Etai Lev Ran <[email protected]> --------- Signed-off-by: Etai Lev Ran <[email protected]>
1 parent db8e7be commit 25cfb90

File tree

7 files changed

+220
-34
lines changed

7 files changed

+220
-34
lines changed

cmd/epp/runner/runner.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,9 @@ func (r *Runner) registerInTreePlugins() {
438438
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
439439
// register response received plugin for test purpose only (used in conformance tests)
440440
plugins.Register(testresponsereceived.DestinationEndpointServedVerifierType, testresponsereceived.DestinationEndpointServedVerifierFactory)
441+
// register datalayer metrics collection plugins
442+
plugins.Register(dlmetrics.MetricsDataSourceType, dlmetrics.MetricsDataSourceFactory)
443+
plugins.Register(dlmetrics.MetricsExtractorType, dlmetrics.ModelServerExtractorFactory)
441444
}
442445

443446
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.EndpointPickerConfig, error) {
@@ -476,7 +479,7 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.End
476479
// Return a function that can be used in the EPP Handle to list pod names.
477480
func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {
478481
return func() []types.NamespacedName {
479-
pods := ds.PodList(func(_ backendmetrics.PodMetrics) bool { return true })
482+
pods := ds.PodList(backendmetrics.AllPodsPredicate)
480483
names := make([]types.NamespacedName, 0, len(pods))
481484

482485
for _, p := range pods {
@@ -615,10 +618,10 @@ func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
615618
// are to be configured), must be done before the EndpointFactory is initialized.
616619
func setupDatalayer(logger logr.Logger) (datalayer.EndpointFactory, error) {
617620
// create and register a metrics data source and extractor.
618-
source := dlmetrics.NewDataSource(*modelServerMetricsScheme,
621+
source := dlmetrics.NewMetricsDataSource(*modelServerMetricsScheme,
619622
*modelServerMetricsPath,
620623
*modelServerMetricsHttpsInsecureSkipVerify)
621-
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
624+
extractor, err := dlmetrics.NewModelServerExtractor(*totalQueuedRequestsMetric,
622625
*totalRunningRequestsMetric,
623626
*kvCacheUsagePercentageMetric,
624627
*loraInfoMetric, *cacheInfoMetric)

pkg/epp/datalayer/datasource_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@ const (
3131
)
3232

3333
type mockDataSource struct {
34-
tn plugins.TypedName
34+
typedName plugins.TypedName
3535
}
3636

37-
func (m *mockDataSource) TypedName() plugins.TypedName { return m.tn }
37+
func (m *mockDataSource) TypedName() plugins.TypedName { return m.typedName }
3838
func (m *mockDataSource) Extractors() []string { return []string{} }
3939
func (m *mockDataSource) AddExtractor(_ Extractor) error { return nil }
4040
func (m *mockDataSource) Collect(_ context.Context, _ Endpoint) error { return nil }
4141

4242
func TestRegisterAndGetSource(t *testing.T) {
4343
reg := DataSourceRegistry{}
44-
ds := &mockDataSource{tn: plugins.TypedName{Type: testType, Name: testType}}
44+
ds := &mockDataSource{typedName: plugins.TypedName{Type: testType, Name: testType}}
4545

4646
err := reg.Register(ds)
4747
assert.NoError(t, err, "expected no error on first registration")

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,20 @@ import (
2828
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2929
)
3030

31-
const (
32-
DataSourceType = "metrics-data-source"
33-
)
34-
3531
// DataSource is a Model Server Protocol (MSP) compliant metrics data source,
3632
// returning Prometheus formatted metrics for an endpoint.
3733
type DataSource struct {
38-
tn plugins.TypedName
34+
typedName plugins.TypedName
3935
metricsScheme string // scheme to use in metrics URL
4036
metricsPath string // path to use in metrics URL
4137

4238
client Client // client (e.g. a wrapped http.Client) used to get metrics
4339
extractors sync.Map // key: name, value: extractor
4440
}
4541

46-
// NewDataSource returns a new MSP compliant metrics data source, configured with
47-
// the provided client configuration.
48-
// The Scheme, path and certificate validation setting are command line options.
49-
func NewDataSource(metricsScheme string, metricsPath string, skipCertVerification bool) *DataSource {
42+
// NewMetricsDataSource returns a new MSP compliant metrics data source, configured with
43+
// the provided scheme, path and certificate verification parameters.
44+
func NewMetricsDataSource(metricsScheme string, metricsPath string, skipCertVerification bool) *DataSource {
5045
if metricsScheme == "https" {
5146
httpsTransport := baseTransport.Clone()
5247
httpsTransport.TLSClientConfig = &tls.Config{
@@ -56,9 +51,9 @@ func NewDataSource(metricsScheme string, metricsPath string, skipCertVerificatio
5651
}
5752

5853
dataSrc := &DataSource{
59-
tn: plugins.TypedName{
60-
Type: DataSourceType,
61-
Name: DataSourceType,
54+
typedName: plugins.TypedName{
55+
Type: MetricsDataSourceType,
56+
Name: MetricsDataSourceType,
6257
},
6358
metricsScheme: metricsScheme,
6459
metricsPath: metricsPath,
@@ -69,7 +64,7 @@ func NewDataSource(metricsScheme string, metricsPath string, skipCertVerificatio
6964

7065
// TypedName returns the metrics data source type and name.
7166
func (dataSrc *DataSource) TypedName() plugins.TypedName {
72-
return dataSrc.tn
67+
return dataSrc.typedName
7368
}
7469

7570
// Extractors returns a list of registered Extractor names.

pkg/epp/datalayer/metrics/datasource_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import (
2828
)
2929

3030
func TestDatasource(t *testing.T) {
31-
source := NewDataSource("https", "/metrics", true)
32-
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
31+
source := NewMetricsDataSource("https", "/metrics", true)
32+
extractor, err := NewModelServerExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
3333
assert.Nil(t, err, "failed to create extractor")
3434

3535
dsType := source.TypedName().Type
36-
assert.Equal(t, DataSourceType, dsType)
36+
assert.Equal(t, MetricsDataSourceType, dsType)
3737

3838
err = source.AddExtractor(extractor)
3939
assert.Nil(t, err, "failed to add extractor")

pkg/epp/datalayer/metrics/extractor.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ import (
3535
)
3636

3737
const (
38-
extractorType = "model-server-protocol-metrics"
39-
4038
// LoRA metrics based on MSP
4139
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
4240
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
@@ -49,10 +47,12 @@ const (
4947
// Extractor implements the metrics extraction based on the model
5048
// server protocol standard.
5149
type Extractor struct {
52-
tn plugins.TypedName
53-
mapping *Mapping
50+
typedName plugins.TypedName
51+
mapping *Mapping
5452
}
5553

54+
// Produces returns the data attributes that are provided by the datalayer.metrics
55+
// package.
5656
func Produces() map[string]any {
5757
return map[string]any{
5858
metrics.WaitingQueueSizeKey: int(0),
@@ -64,27 +64,27 @@ func Produces() map[string]any {
6464
}
6565
}
6666

67-
// NewExtractor returns a new model server protocol (MSP) metrics extractor,
67+
// NewModelServerExtractor returns a new model server protocol (MSP) metrics extractor,
6868
// configured with the given metrics' specifications.
6969
// These are mandatory metrics per the MSP specification, and are used
7070
// as the basis for the built-in scheduling plugins.
71-
func NewExtractor(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec string) (*Extractor, error) {
71+
func NewModelServerExtractor(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec string) (*Extractor, error) {
7272
mapping, err := NewMapping(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec)
7373
if err != nil {
7474
return nil, fmt.Errorf("failed to create extractor metrics Mapping - %w", err)
7575
}
7676
return &Extractor{
77-
tn: plugins.TypedName{
78-
Type: extractorType,
79-
Name: extractorType,
77+
typedName: plugins.TypedName{
78+
Type: MetricsExtractorType,
79+
Name: MetricsExtractorType,
8080
},
8181
mapping: mapping,
8282
}, nil
8383
}
8484

8585
// TypedName returns the type and name of the metrics.Extractor.
8686
func (ext *Extractor) TypedName() plugins.TypedName {
87-
return ext.tn
87+
return ext.typedName
8888
}
8989

9090
// ExpectedType defines the type expected by the metrics.Extractor - a

pkg/epp/datalayer/metrics/extractor_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ const (
4040
func TestExtractorExtract(t *testing.T) {
4141
ctx := context.Background()
4242

43-
if _, err := NewExtractor("vllm: dummy", "", "", "", ""); err == nil {
43+
if _, err := NewModelServerExtractor("vllm: dummy", "", "", "", ""); err == nil {
4444
t.Error("expected to fail to create extractor with invalid specification")
4545
}
4646

47-
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, defaultTotalRunningRequestsMetric,
47+
extractor, err := NewModelServerExtractor(defaultTotalQueuedRequestsMetric, defaultTotalRunningRequestsMetric,
4848
defaultKvCacheUsagePercentageMetric, defaultLoraInfoMetric, defaultCacheInfoMetric)
4949
if err != nil {
5050
t.Fatalf("failed to create extractor: %v", err)
@@ -54,6 +54,10 @@ func TestExtractorExtract(t *testing.T) {
5454
t.Error("empty extractor type")
5555
}
5656

57+
if exName := extractor.TypedName().Name; exName == "" {
58+
t.Error("empty extractor name")
59+
}
60+
5761
if inputType := extractor.ExpectedInputType(); inputType != PrometheusMetricType {
5862
t.Errorf("incorrect expected input type: %v", inputType)
5963
}
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"encoding/json"
21+
"flag"
22+
"fmt"
23+
"strconv"
24+
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
26+
)
27+
28+
const (
29+
MetricsDataSourceType = "metrics-data-source"
30+
MetricsExtractorType = "model-server-protocol-metrics"
31+
)
32+
33+
// Configuration parameters for metrics data source and extractor.
34+
type (
35+
// Data source configuration parameters
36+
metricsDatasourceParams struct {
37+
// Scheme defines the protocol scheme used in metrics retrieval (e.g., "http").
38+
Scheme string // `json:"scheme"`
39+
// Path defines the URL path used in metrics retrieval (e.g., "/metrics").
40+
Path string // `json:"path"`
41+
// InsecureSkipVerify defines whether model server certificate should be verified or not.
42+
InsecureSkipVerify bool // `json:"insecureSkipVerify"`
43+
}
44+
45+
// Extractor configuration parameters
46+
modelServerExtractorParams struct {
47+
// QueueRequestsSpec defines the metric specification string for retrieving queued request count.
48+
QueueRequestsSpec string // `json:"queuedRequestsSpec"`
49+
// RunningRequestsSpec defines the metric specification string for retrieving running requests count.
50+
RunningRequestsSpec string // `json:"runningRequestsSpec"`
51+
// KVUsage defines the metric specification string for retrieving KV cache usage.
52+
KVUsageSpec string // `json:"kvUsageSpec"`
53+
// LoRASpec defines the metric specification string for retrieving LoRA availability.
54+
LoRASpec string // `json:"loraSpec"`
55+
// CacheInfoSpec defines the metrics specification string for retrieving KV cache configuration.
56+
CacheInfoSpec string // `json:"cacheInfoSpec"`
57+
}
58+
)
59+
60+
// MetricsDataSourceFactory is a factory function used to instantiate data layer's
61+
// metrics data source plugins specified in a configuration.
62+
func MetricsDataSourceFactory(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
63+
cfg, err := defaultDataSourceConfigParams()
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
if parameters != nil { // overlay the defaults with configured values
69+
if err := json.Unmarshal(parameters, cfg); err != nil {
70+
return nil, err
71+
}
72+
}
73+
74+
ds := NewMetricsDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify)
75+
ds.typedName.Name = name
76+
return ds, nil
77+
}
78+
79+
// ModelServerExtractorFactory is a factory function used to instantiate data layer's metrics
80+
// Extractor plugins specified in a configuration.
81+
func ModelServerExtractorFactory(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
82+
cfg, err := defaultExtractorConfigParams()
83+
if err != nil {
84+
return nil, err
85+
}
86+
87+
if parameters != nil { // overlay the defaults with configured values
88+
if err := json.Unmarshal(parameters, cfg); err != nil {
89+
return nil, err
90+
}
91+
}
92+
93+
extractor, err := NewModelServerExtractor(cfg.QueueRequestsSpec, cfg.RunningRequestsSpec, cfg.KVUsageSpec,
94+
cfg.LoRASpec, cfg.CacheInfoSpec)
95+
if err != nil {
96+
return nil, err
97+
}
98+
extractor.typedName.Name = name
99+
return extractor, nil
100+
}
101+
102+
// Names of CLI flags in main
103+
//
104+
// TODO:
105+
//
106+
// 1. Consider having a cli package with all flag names and constants?
107+
// Can't use values from runserver as this creates an import cycle with datalayer.
108+
// Given that relevant issues/PRs have been closed so may be able to remove the cycle?
109+
// Comment from runserver package (regarding TestPodMetricsClient *backendmetrics.FakePodMetricsClient)
110+
// This should only be used in tests. We won't need this once we do not inject metrics in the tests.
111+
// TODO:(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/432) Cleanup
112+
//
113+
// 2. Deprecation notice on these flags being moved to the configuration file
114+
const (
115+
totalQueuedRequestsMetricSpecFlag = "total-queued-requests-metric"
116+
totalRunningRequestsMetricSpecFlag = "total-running-requests-metric"
117+
kvCacheUsagePercentageMetricSpecFlags = "kv-cache-usage-percentage-metric"
118+
loraInfoMetricSpecFlag = "lora-info-metric"
119+
cacheInfoMetricSpecFlag = "cache-info-metric"
120+
modelServerMetricsPathFlag = "model-server-metrics-path"
121+
modelServerMetricsSchemeFlag = "model-server-metrics-scheme"
122+
modelServerMetricsInsecureSkipVerifyFlag = "model-server-metrics-https-insecure-skip-verify"
123+
)
124+
125+
// return the default configuration state. The defaults are populated from
126+
// existing command line flags.
127+
func defaultDataSourceConfigParams() (*metricsDatasourceParams, error) {
128+
var err error
129+
cfg := &metricsDatasourceParams{}
130+
131+
if cfg.Scheme, err = fromStringFlag(modelServerMetricsSchemeFlag); err != nil {
132+
return nil, err
133+
}
134+
if cfg.Path, err = fromStringFlag(modelServerMetricsPathFlag); err != nil {
135+
return nil, err
136+
}
137+
if cfg.InsecureSkipVerify, err = fromBoolFlag(modelServerMetricsInsecureSkipVerifyFlag); err != nil {
138+
return nil, err
139+
}
140+
return cfg, nil
141+
}
142+
143+
func defaultExtractorConfigParams() (*modelServerExtractorParams, error) {
144+
var err error
145+
cfg := &modelServerExtractorParams{}
146+
147+
if cfg.QueueRequestsSpec, err = fromStringFlag(totalQueuedRequestsMetricSpecFlag); err != nil {
148+
return nil, err
149+
}
150+
if cfg.RunningRequestsSpec, err = fromStringFlag(totalRunningRequestsMetricSpecFlag); err != nil {
151+
return nil, err
152+
}
153+
if cfg.KVUsageSpec, err = fromStringFlag(kvCacheUsagePercentageMetricSpecFlags); err != nil {
154+
return nil, err
155+
}
156+
if cfg.LoRASpec, err = fromStringFlag(loraInfoMetricSpecFlag); err != nil {
157+
return nil, err
158+
}
159+
if cfg.CacheInfoSpec, err = fromStringFlag(cacheInfoMetricSpecFlag); err != nil {
160+
return nil, err
161+
}
162+
163+
return cfg, nil
164+
}
165+
166+
func fromStringFlag(name string) (string, error) {
167+
f := flag.Lookup(name)
168+
if f == nil {
169+
return "", fmt.Errorf("flag not found: %s", name)
170+
}
171+
return f.Value.String(), nil
172+
}
173+
174+
func fromBoolFlag(name string) (bool, error) {
175+
f := flag.Lookup(name)
176+
if f == nil {
177+
return false, fmt.Errorf("flag not found: %s", name)
178+
}
179+
b, err := strconv.ParseBool(f.Value.String())
180+
if err != nil {
181+
return false, fmt.Errorf("invalid bool flag %q: %w", name, err)
182+
}
183+
return b, nil
184+
}

0 commit comments

Comments
 (0)