Skip to content

Commit 5bb70f6

Browse files
authored
Pluggable metrics collection (#1237)
* pluggable metrics collection Signed-off-by: Etai Lev Ran <[email protected]> * use the provided context Signed-off-by: Etai Lev Ran <[email protected]> * derive new context outside anonymous function Signed-off-by: Etai Lev Ran <[email protected]> * make fatcontext happy? Signed-off-by: Etai Lev Ran <[email protected]> * mock ticker Signed-off-by: Etai Lev Ran <[email protected]> * split LoRA metric Spec from standard Spec Signed-off-by: Etai Lev Ran <[email protected]> * add collector test Signed-off-by: Etai Lev Ran <[email protected]> * address review comments Signed-off-by: Etai Lev Ran <[email protected]> * review comments on use of client Signed-off-by: Etai Lev Ran <[email protected]> --------- Signed-off-by: Etai Lev Ran <[email protected]>
1 parent 69dfa9b commit 5bb70f6

17 files changed

+1645
-44
lines changed

go.mod

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/prometheus/client_golang v1.23.0
1616
github.com/prometheus/client_model v0.6.2
1717
github.com/prometheus/common v0.65.0
18+
github.com/prometheus/prometheus v0.305.0
1819
github.com/stretchr/testify v1.10.0
1920
go.uber.org/multierr v1.11.0
2021
go.uber.org/zap v1.27.0
@@ -43,14 +44,15 @@ require (
4344
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
4445
github.com/beorn7/perks v1.0.1 // indirect
4546
github.com/blang/semver/v4 v4.0.0 // indirect
46-
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
47+
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
4748
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
4849
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
50+
github.com/dennwc/varint v1.0.0 // indirect
4951
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
5052
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
5153
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
5254
github.com/felixge/httpsnoop v1.0.4 // indirect
53-
github.com/fsnotify/fsnotify v1.7.0 // indirect
55+
github.com/fsnotify/fsnotify v1.8.0 // indirect
5456
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
5557
github.com/go-logr/stdr v1.2.2 // indirect
5658
github.com/go-logr/zapr v1.3.0 // indirect
@@ -64,9 +66,10 @@ require (
6466
github.com/google/btree v1.1.3 // indirect
6567
github.com/google/cel-go v0.23.2 // indirect
6668
github.com/google/gnostic-models v0.6.9 // indirect
67-
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
69+
github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a // indirect
6870
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
69-
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
71+
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
72+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
7073
github.com/huandu/xstrings v1.3.3 // indirect
7174
github.com/imdario/mergo v0.3.16 // indirect
7275
github.com/inconshreveable/mousetrap v1.1.0 // indirect
@@ -90,30 +93,31 @@ require (
9093
github.com/stoewer/go-strcase v1.3.0 // indirect
9194
github.com/x448/float16 v0.8.4 // indirect
9295
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
93-
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
96+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
9497
go.opentelemetry.io/otel v1.36.0 // indirect
95-
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect
96-
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 // indirect
98+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 // indirect
99+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.36.0 // indirect
97100
go.opentelemetry.io/otel/metric v1.36.0 // indirect
98101
go.opentelemetry.io/otel/sdk v1.36.0 // indirect
99102
go.opentelemetry.io/otel/trace v1.36.0 // indirect
100-
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
103+
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
104+
go.uber.org/atomic v1.11.0 // indirect
101105
go.uber.org/automaxprocs v1.6.0 // indirect
102106
go.yaml.in/yaml/v2 v2.4.2 // indirect
103107
golang.org/x/crypto v0.40.0 // indirect
104-
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
108+
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 // indirect
105109
golang.org/x/mod v0.26.0 // indirect
106110
golang.org/x/net v0.42.0 // indirect
107111
golang.org/x/oauth2 v0.30.0 // indirect
108112
golang.org/x/sys v0.34.0 // indirect
109113
golang.org/x/term v0.33.0 // indirect
110114
golang.org/x/text v0.27.0 // indirect
111-
golang.org/x/time v0.9.0 // indirect
115+
golang.org/x/time v0.12.0 // indirect
112116
golang.org/x/tools v0.35.0 // indirect
113117
golang.org/x/tools/go/expect v0.1.1-deprecated // indirect
114118
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
115-
google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a // indirect
116-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a // indirect
119+
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect
120+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
117121
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
118122
gopkg.in/inf.v0 v0.9.1 // indirect
119123
gopkg.in/yaml.v2 v2.4.0 // indirect

go.sum

Lines changed: 99 additions & 24 deletions
Large diffs are not rendered by default.

pkg/epp/datalayer/attributemap.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@ type AttributeMap interface {
3232
Put(string, Cloneable)
3333
Get(string) (Cloneable, bool)
3434
Keys() []string
35-
Clone() *Attributes
3635
}
3736

3837
// Attributes provides a goroutine-safe implementation of AttributeMap.
3938
type Attributes struct {
40-
data sync.Map
39+
data sync.Map // key: attribute name (string), value: attribute value (opaque, Cloneable)
4140
}
4241

4342
// NewAttributes returns a new instance of Attributes.
@@ -76,7 +75,7 @@ func (a *Attributes) Keys() []string {
7675
return keys
7776
}
7877

79-
// Clone creates a deep copy of the entire Attributes map.
78+
// Clone creates a deep copy of the entire attribute map.
8079
func (a *Attributes) Clone() *Attributes {
8180
clone := NewAttributes()
8281
a.data.Range(func(key, value any) bool {

pkg/epp/datalayer/collector.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
import (
20+
"context"
21+
"errors"
22+
"sync"
23+
"time"
24+
25+
"sigs.k8s.io/controller-runtime/pkg/log"
26+
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
28+
)
29+
30+
// TODO:
31+
// currently the data store is expected to manage the state of multiple
32+
// Collectors (e.g., using sync.Map mapping pod to its Collector). Alternatively,
33+
// this can be encapsulated in this file, providing the data store with an interface
34+
// to only update on endpoint addition/change and deletion. This can also be used
35+
// to centrally track statistics such errors, active routines, etc.
36+
37+
const (
38+
defaultCollectionTimeout = time.Second
39+
)
40+
41+
// Ticker implements a time source for periodic invocation.
42+
// The Ticker is passed in as parameter a Collector to allow control over time
43+
// progress in tests, ensuring tests are deterministic and fast.
44+
type Ticker interface {
45+
Channel() <-chan time.Time
46+
Stop()
47+
}
48+
49+
// TimeTicker implements a Ticker based on time.Ticker.
50+
type TimeTicker struct {
51+
*time.Ticker
52+
}
53+
54+
// NewTimeTicker returns a new time.Ticker with the configured duration.
55+
func NewTimeTicker(d time.Duration) Ticker {
56+
return &TimeTicker{
57+
Ticker: time.NewTicker(d),
58+
}
59+
}
60+
61+
// Channel exposes the ticker's channel.
62+
func (t *TimeTicker) Channel() <-chan time.Time {
63+
return t.C
64+
}
65+
66+
// Collector runs the data collection for a single endpoint.
67+
type Collector struct {
68+
// per-endpoint context and cancellation
69+
ctx context.Context
70+
cancel context.CancelFunc
71+
72+
// goroutine management
73+
startOnce sync.Once
74+
stopOnce sync.Once
75+
76+
// TODO: optional metrics tracking collection (e.g., errors, invocations, ...)
77+
}
78+
79+
// NewCollector returns a new collector.
80+
func NewCollector() *Collector {
81+
return &Collector{}
82+
}
83+
84+
// Start initiates data source collection for the endpoint.
85+
func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error {
86+
started := false
87+
c.startOnce.Do(func() {
88+
c.ctx, c.cancel = context.WithCancel(ctx)
89+
started = true
90+
91+
go func(endpoint Endpoint, sources []DataSource) {
92+
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
93+
logger.V(logging.DEFAULT).Info("starting collection")
94+
95+
defer func() {
96+
logger.V(logging.DEFAULT).Info("terminating collection")
97+
ticker.Stop()
98+
}()
99+
100+
for {
101+
select {
102+
case <-c.ctx.Done(): // per endpoint context cancelled
103+
return
104+
case <-ticker.Channel():
105+
for _, src := range sources {
106+
ctx, cancel := context.WithTimeout(c.ctx, defaultCollectionTimeout)
107+
_ = src.Collect(ctx, endpoint) // TODO: track errors per collector?
108+
cancel() // release the ctx timeout resources
109+
}
110+
}
111+
}
112+
}(ep, sources)
113+
})
114+
115+
if !started {
116+
return errors.New("collector start called multiple times")
117+
}
118+
return nil
119+
}
120+
121+
// Stop terminates the collector.
122+
func (c *Collector) Stop() error {
123+
if c.ctx == nil || c.cancel == nil {
124+
return errors.New("collector stop called before start")
125+
}
126+
127+
stopped := false
128+
c.stopOnce.Do(func() {
129+
stopped = true
130+
c.cancel()
131+
})
132+
133+
if !stopped {
134+
return errors.New("collector stop called multiple times")
135+
}
136+
return nil
137+
}

pkg/epp/datalayer/collector_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
import (
20+
"context"
21+
"sync/atomic"
22+
"testing"
23+
"time"
24+
25+
"github.com/stretchr/testify/assert"
26+
"github.com/stretchr/testify/require"
27+
corev1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/mocks"
31+
)
32+
33+
// --- Test Stubs ---
34+
35+
type DummySource struct {
36+
callCount int64
37+
}
38+
39+
func (d *DummySource) Name() string { return "test-dummy-data-source" }
40+
func (d *DummySource) AddExtractor(_ Extractor) error { return nil }
41+
func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error {
42+
atomic.AddInt64(&d.callCount, 1)
43+
return nil
44+
}
45+
46+
func defaultEndpoint() Endpoint {
47+
ms := NewEndpoint()
48+
pod := &corev1.Pod{
49+
ObjectMeta: metav1.ObjectMeta{
50+
Name: "pod-name",
51+
Namespace: "default",
52+
},
53+
Status: corev1.PodStatus{
54+
PodIP: "1.2.3.4",
55+
},
56+
}
57+
ms.UpdatePod(pod)
58+
return ms
59+
}
60+
61+
// --- Tests ---
62+
63+
var (
64+
endpoint = defaultEndpoint()
65+
sources = []DataSource{&DummySource{}}
66+
)
67+
68+
func TestCollectorCanStartOnlyOnce(t *testing.T) {
69+
c := NewCollector()
70+
ctx := context.Background()
71+
ticker := mocks.NewTicker()
72+
73+
err := c.Start(ctx, ticker, endpoint, sources)
74+
require.NoError(t, err, "first Start call should succeed")
75+
76+
err = c.Start(ctx, ticker, endpoint, sources)
77+
assert.Error(t, err, "multiple collector start should error")
78+
}
79+
80+
func TestCollectorStopBeforeStartIsAnError(t *testing.T) {
81+
c := NewCollector()
82+
err := c.Stop()
83+
assert.Error(t, err, "collector stop called before start should error")
84+
}
85+
86+
func TestCollectorCanStopOnlyOnce(t *testing.T) {
87+
c := NewCollector()
88+
ctx := context.Background()
89+
ticker := mocks.NewTicker()
90+
91+
require.NoError(t, c.Start(ctx, ticker, endpoint, sources))
92+
require.NoError(t, c.Stop(), "first Stop should succeed")
93+
assert.Error(t, c.Stop(), "second Stop should fail")
94+
}
95+
96+
func TestCollectorCollectsOnTicks(t *testing.T) {
97+
source := &DummySource{}
98+
c := NewCollector()
99+
ticker := mocks.NewTicker()
100+
ctx := context.Background()
101+
require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source}))
102+
103+
ticker.Tick()
104+
ticker.Tick()
105+
time.Sleep(20 * time.Millisecond) // let collector process the ticks
106+
107+
got := atomic.LoadInt64(&source.callCount)
108+
want := int64(2)
109+
assert.Equal(t, want, got, "call count mismatch")
110+
require.NoError(t, c.Stop())
111+
}
112+
113+
func TestCollectorStopCancelsContext(t *testing.T) {
114+
source := &DummySource{}
115+
c := NewCollector()
116+
ticker := mocks.NewTicker()
117+
ctx := context.Background()
118+
119+
require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source}))
120+
ticker.Tick() // should be processed
121+
time.Sleep(20 * time.Millisecond)
122+
123+
require.NoError(t, c.Stop())
124+
before := atomic.LoadInt64(&source.callCount)
125+
126+
ticker.Tick()
127+
time.Sleep(20 * time.Millisecond) // let collector run again
128+
after := atomic.LoadInt64(&source.callCount)
129+
assert.Equal(t, before, after, "call count changed after stop")
130+
}

pkg/epp/datalayer/datasource.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package datalayer
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"reflect"
@@ -36,7 +37,7 @@ type DataSource interface {
3637
// Collect is triggered by the data layer framework to fetch potentially new
3738
// data for an endpoint. Collect calls registered Extractors to convert the
3839
// raw data into structured attributes.
39-
Collect(ep Endpoint)
40+
Collect(ctx context.Context, ep Endpoint) error
4041
}
4142

4243
// Extractor transforms raw data into structured attributes.
@@ -46,7 +47,7 @@ type Extractor interface {
4647
ExpectedInputType() reflect.Type
4748
// Extract transforms the raw data source output into a concrete structured
4849
// attribute, stored on the given endpoint.
49-
Extract(data any, ep Endpoint)
50+
Extract(ctx context.Context, data any, ep Endpoint) error
5051
}
5152

5253
var defaultDataSources = DataSourceRegistry{}

0 commit comments

Comments
 (0)