Skip to content

Commit b986fef

Browse files
committed
enable v2 data layer
Signed-off-by: Etai Lev Ran <[email protected]>
1 parent 3bcb68a commit b986fef

File tree

6 files changed

+199
-40
lines changed

6 files changed

+199
-40
lines changed

cmd/epp/runner/runner.go

Lines changed: 80 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ import (
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
4747
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4848
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
49+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
50+
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
4951
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
5052
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
5153
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
@@ -245,40 +247,11 @@ func (r *Runner) Run(ctx context.Context) error {
245247
}
246248

247249
// --- Setup Datastore ---
248-
mapping, err := backendmetrics.NewMetricMapping(
249-
*totalQueuedRequestsMetric,
250-
*kvCacheUsagePercentageMetric,
251-
*loraInfoMetric,
252-
)
250+
epf, err := r.setupMetricsCollection(setupLog)
253251
if err != nil {
254-
setupLog.Error(err, "Failed to create metric mapping from flags.")
255252
return err
256253
}
257-
verifyMetricMapping(*mapping, setupLog)
258-
259-
var metricsHttpClient *http.Client
260-
if *modelServerMetricsScheme == "https" {
261-
metricsHttpClient = &http.Client{
262-
Transport: &http.Transport{
263-
TLSClientConfig: &tls.Config{
264-
InsecureSkipVerify: *modelServerMetricsHttpsInsecureSkipVerify,
265-
},
266-
},
267-
}
268-
} else {
269-
metricsHttpClient = http.DefaultClient
270-
}
271-
272-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{
273-
MetricMapping: mapping,
274-
ModelServerMetricsPort: int32(*modelServerMetricsPort),
275-
ModelServerMetricsPath: *modelServerMetricsPath,
276-
ModelServerMetricsScheme: *modelServerMetricsScheme,
277-
Client: metricsHttpClient,
278-
},
279-
*refreshMetricsInterval)
280-
281-
datastore := datastore.NewDatastore(ctx, pmf)
254+
datastore := datastore.NewDatastore(ctx, epf)
282255

283256
// --- Setup Metrics Server ---
284257
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
@@ -446,6 +419,82 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
446419
return nil
447420
}
448421

422+
func (r *Runner) setupMetricsCollection(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
423+
if datalayer.Enabled(setupLog) {
424+
return setupDatalayer(setupLog)
425+
}
426+
427+
if len(datalayer.GetSources()) != 0 {
428+
setupLog.Info("data sources registered but pluggable datalayer is disabled")
429+
}
430+
return setupMetricsV1(setupLog)
431+
}
432+
433+
func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
434+
mapping, err := backendmetrics.NewMetricMapping(
435+
*totalQueuedRequestsMetric,
436+
*kvCacheUsagePercentageMetric,
437+
*loraInfoMetric,
438+
)
439+
if err != nil {
440+
setupLog.Error(err, "Failed to create metric mapping from flags.")
441+
return nil, err
442+
}
443+
verifyMetricMapping(*mapping, setupLog)
444+
445+
var metricsHttpClient *http.Client
446+
if *modelServerMetricsScheme == "https" {
447+
metricsHttpClient = &http.Client{
448+
Transport: &http.Transport{
449+
TLSClientConfig: &tls.Config{
450+
InsecureSkipVerify: *modelServerMetricsHttpsInsecureSkipVerify,
451+
},
452+
},
453+
}
454+
} else {
455+
metricsHttpClient = http.DefaultClient
456+
}
457+
458+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{
459+
MetricMapping: mapping,
460+
ModelServerMetricsPort: int32(*modelServerMetricsPort),
461+
ModelServerMetricsPath: *modelServerMetricsPath,
462+
ModelServerMetricsScheme: *modelServerMetricsScheme,
463+
Client: metricsHttpClient,
464+
},
465+
*refreshMetricsInterval)
466+
return pmf, nil
467+
}
468+
469+
func setupDatalayer(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
470+
// create and register a metrics data source and extractor. In the future,
471+
// data sources and extractors might be configured via a file. Once done,
472+
// this (and registering the sources with the endpoint factory) should
473+
// be moved accordingly.
474+
source := dlmetrics.NewDataSource(*modelServerMetricsScheme,
475+
int32(*modelServerMetricsPort),
476+
*modelServerMetricsPath,
477+
*modelServerMetricsHttpsInsecureSkipVerify,
478+
nil)
479+
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
480+
*kvCacheUsagePercentageMetric,
481+
*loraInfoMetric)
482+
483+
if err != nil {
484+
return nil, err
485+
}
486+
if err := source.AddExtractor(extractor); err != nil {
487+
return nil, err
488+
}
489+
if err := datalayer.RegisterSource(source); err != nil {
490+
return nil, err
491+
}
492+
493+
factory := datalayer.NewEndpointFactory(setupLog, *refreshMetricsInterval)
494+
factory.SetSources(datalayer.GetSources())
495+
return factory, nil
496+
}
497+
449498
func initLogging(opts *zap.Options) {
450499
// Unless -zap-log-level is explicitly set, use -v
451500
useV := true

pkg/epp/datalayer/collector.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync"
2323
"time"
2424

25+
"github.com/go-logr/logr"
2526
"sigs.k8s.io/controller-runtime/pkg/log"
2627

2728
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -73,6 +74,8 @@ type Collector struct {
7374
startOnce sync.Once
7475
stopOnce sync.Once
7576

77+
logger logr.Logger
78+
7679
// TODO: optional metrics tracking collection (e.g., errors, invocations, ...)
7780
}
7881

@@ -82,11 +85,13 @@ func NewCollector() *Collector {
8285
}
8386

8487
// Start initiates data source collection for the endpoint.
88+
// TODO: pass PoolInfo for backward compatibility
8589
func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error {
8690
var ready chan struct{}
8791
started := false
8892

8993
c.startOnce.Do(func() {
94+
c.logger = log.FromContext(ctx)
9095
c.ctx, c.cancel = context.WithCancel(ctx)
9196
started = true
9297
ready = make(chan struct{})
@@ -107,6 +112,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
107112
case <-c.ctx.Done(): // per endpoint context cancelled
108113
return
109114
case <-ticker.Channel():
115+
// TODO: do not collect if there's no pool specified?
110116
for _, src := range sources {
111117
ctx, cancel := context.WithTimeout(c.ctx, defaultCollectionTimeout)
112118
_ = src.Collect(ctx, endpoint) // TODO: track errors per collector?

pkg/epp/datalayer/factory.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@ package datalayer
1818

1919
import (
2020
"context"
21+
"sync"
22+
"time"
2123

24+
"github.com/go-logr/logr"
2225
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/types"
2327

28+
"sigs.k8s.io/controller-runtime/pkg/log"
2429
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
2530
)
2631

@@ -44,3 +49,83 @@ type EndpointFactory interface {
4449
NewEndpoint(parent context.Context, inpod *corev1.Pod, poolinfo PoolInfo) Endpoint
4550
ReleaseEndpoint(ep Endpoint)
4651
}
52+
53+
// EndpointLifecycle manages the life cycle (creation and termination) of
54+
// endpoints.
55+
type EndpointLifecycle struct {
56+
sources []DataSource // data sources for collectors
57+
collectors sync.Map // collectors map. key: Pod namespaced name, value: *Collector
58+
refreshInterval time.Duration // metrics refresh interval
59+
}
60+
61+
// NewEndpointFactory returns a new endpoint for factory, managing collectors for
62+
// its endpoints.
63+
// TODO: consider making a config object? Only caveat is that sources might not be
64+
// known at creation time (e.g., loaded from configuration file).
65+
func NewEndpointFactory(log logr.Logger, refreshMetricsInterval time.Duration) *EndpointLifecycle {
66+
return &EndpointLifecycle{
67+
sources: []DataSource{},
68+
collectors: sync.Map{},
69+
refreshInterval: refreshMetricsInterval,
70+
}
71+
}
72+
73+
// NewEndpoint implements EndpointFactory.NewEndpoint.
74+
// Creates a new endpoint and starts its associated collector with its own ticker.
75+
// Guards against multiple concurrent calls for the same endpoint.
76+
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *corev1.Pod, _ PoolInfo) Endpoint {
77+
key := types.NamespacedName{Namespace: inpod.Namespace, Name: inpod.Name}
78+
logger := log.FromContext(parent).WithValues("pod", key)
79+
80+
if _, ok := lc.collectors.Load(key); ok {
81+
logger.Info("collector already running for endpoint", "endpoint", key)
82+
return nil
83+
}
84+
85+
endpoint := NewEndpoint()
86+
endpoint.UpdatePod(inpod)
87+
collector := NewCollector() // for full backward compatibility, set the logger and poolinfo
88+
89+
if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded {
90+
// another goroutine already created and stored a collector for this endpoint.
91+
// No need to start the new collector.
92+
logger.Info("collector already running for endpoint", "endpoint", key)
93+
return nil
94+
}
95+
96+
ticker := NewTimeTicker(lc.refreshInterval)
97+
if err := collector.Start(parent, ticker, endpoint, lc.sources); err != nil {
98+
logger.Error(err, "failed to start collector for endpoint", "endpoint", key)
99+
lc.collectors.Delete(key)
100+
}
101+
102+
return endpoint
103+
}
104+
105+
// ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint
106+
// Stops the collector and cleans up resources for the endpoint
107+
func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) {
108+
key := ep.GetPod().GetNamespacedName()
109+
110+
if value, ok := lc.collectors.LoadAndDelete(key); ok {
111+
collector := value.(*Collector)
112+
_ = collector.Stop()
113+
}
114+
}
115+
116+
// Shutdown gracefully stops all collectors and cleans up all resources.
117+
func (lc *EndpointLifecycle) Shutdown() {
118+
lc.collectors.Range(func(key, value interface{}) bool {
119+
collector := value.(*Collector)
120+
_ = collector.Stop()
121+
lc.collectors.Delete(key)
122+
return true
123+
})
124+
}
125+
126+
// SetSources configures the data sources available for collection.
127+
// This should be called after all data sources have been configured and before
128+
// any endpoint is created.
129+
func (lc *EndpointLifecycle) SetSources(sources []DataSource) {
130+
_ = copy(lc.sources, sources)
131+
}

pkg/epp/datalayer/metrics/client.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ const (
4141
)
4242

4343
var (
44+
baseTransport = &http.Transport{
45+
MaxIdleConns: maxIdleConnections,
46+
MaxIdleConnsPerHost: 4, // host is defined as scheme://host:port
47+
// TODO: set additional timeouts, transport options, etc.
48+
}
4449
defaultClient = &client{
4550
Client: http.Client{
46-
Timeout: timeout,
47-
Transport: &http.Transport{
48-
MaxIdleConns: maxIdleConnections,
49-
MaxIdleConnsPerHost: 4, // host is defined as scheme://host:port
50-
},
51-
// TODO: set additional timeouts, transport options, etc.
51+
Timeout: timeout,
52+
Transport: baseTransport,
5253
},
5354
}
5455
)

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package metrics
1818

1919
import (
2020
"context"
21+
"crypto/tls"
2122
"errors"
2223
"fmt"
2324
"net"
@@ -30,7 +31,7 @@ import (
3031
)
3132

3233
const (
33-
dataSourceName = "metrics-data-source"
34+
DataSourceName = "metrics-data-source"
3435
)
3536

3637
// DataSource is a Model Server Protocol (MSP) compliant metrics data source,
@@ -46,7 +47,15 @@ type DataSource struct {
4647

4748
// NewDataSource returns a new MSP compliant metrics data source, configured with the provided
4849
// client factory. If ClientFactory is nil, a default factory is used.
49-
func NewDataSource(metricsScheme string, metricsPort int32, metricsPath string, cl Client) *DataSource {
50+
func NewDataSource(metricsScheme string, metricsPort int32, metricsPath string, skipCertVerification bool, cl Client) *DataSource {
51+
if metricsScheme == "https" {
52+
httpsTransport := baseTransport.Clone()
53+
httpsTransport.TLSClientConfig = &tls.Config{
54+
InsecureSkipVerify: skipCertVerification,
55+
}
56+
defaultClient.Transport = httpsTransport
57+
}
58+
5059
if cl == nil {
5160
cl = defaultClient
5261
}
@@ -68,7 +77,7 @@ func (dataSrc *DataSource) SetPort(metricsPort int32) {
6877

6978
// Name returns the metrics data source name.
7079
func (dataSrc *DataSource) Name() string {
71-
return dataSourceName
80+
return DataSourceName
7281
}
7382

7483
// AddExtractor adds an extractor to the data source, validating it can process

pkg/epp/datastore/datastore.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
3434
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3535
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
36+
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
3637
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3738
podutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pod"
3839
)
@@ -116,6 +117,14 @@ func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *v1
116117

117118
oldPool := ds.pool
118119
ds.pool = pool
120+
if oldPool == nil || pool.Spec.TargetPorts[0] != oldPool.Spec.TargetPorts[0] {
121+
if datalayer.Enabled(logger) {
122+
if source, found := datalayer.GetNamedSource(dlmetrics.DataSourceName); found {
123+
metricsSource := source.(*dlmetrics.DataSource)
124+
metricsSource.SetPort(int32(pool.Spec.TargetPorts[0].Number))
125+
}
126+
}
127+
}
119128
if oldPool == nil || !reflect.DeepEqual(pool.Spec.Selector, oldPool.Spec.Selector) {
120129
logger.V(logutil.DEFAULT).Info("Updating inference pool endpoints", "selector", pool.Spec.Selector)
121130
// A full resync is required to address two cases:

0 commit comments

Comments
 (0)