Skip to content

Commit 9511b93

Browse files
committed
Add a metrics to expose informer cache's length
metrics explain controller_runtime_cache_resources{group="webapp.my.domain",kind="Guestbook",version="v1"} 1 Signed-off-by: Yan Zhu <[email protected]>
1 parent d8b6793 commit 9511b93

File tree

10 files changed

+1249
-0
lines changed

10 files changed

+1249
-0
lines changed

pkg/cache/internal/informers.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,12 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
392392
return nil, false, err
393393
}
394394

395+
// Add metric event handler to track cache resources count
396+
metricsHandler := NewMetricsResourceEventHandler(gvk, sharedIndexInformer)
397+
if _, err := sharedIndexInformer.AddEventHandler(metricsHandler); err != nil {
398+
return nil, false, err
399+
}
400+
395401
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
396402
if err != nil {
397403
return nil, false, err
@@ -614,3 +620,22 @@ func restrictNamespaceBySelector(namespaceOpt string, s Selector) string {
614620
}
615621
return ""
616622
}
623+
624+
// VisitInformers calls the given function for each informer in the cache
625+
func (ip *Informers) VisitInformers(visitor func(gvk schema.GroupVersionKind, informer cache.SharedIndexInformer)) {
626+
ip.mu.RLock()
627+
defer ip.mu.RUnlock()
628+
629+
// Visit each tracked informer
630+
for gvk, _cache := range ip.tracker.Structured {
631+
visitor(gvk, _cache.Informer)
632+
}
633+
634+
for gvk, _cache := range ip.tracker.Unstructured {
635+
visitor(gvk, _cache.Informer)
636+
}
637+
638+
for gvk, _cache := range ip.tracker.Metadata {
639+
visitor(gvk, _cache.Informer)
640+
}
641+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"k8s.io/apimachinery/pkg/runtime/schema"
21+
"k8s.io/client-go/tools/cache"
22+
23+
"sigs.k8s.io/controller-runtime/pkg/metrics"
24+
)
25+
26+
// NewMetricsResourceEventHandler creates a new metrics-collecting event handler for an informer.
27+
// It counts resource additions, updates, and deletions and records them in metrics.
28+
func NewMetricsResourceEventHandler(gvk schema.GroupVersionKind, informer cache.SharedIndexInformer) cache.ResourceEventHandler {
29+
handler := &metricsResourceEventHandler{
30+
gvk: gvk,
31+
informer: informer,
32+
}
33+
34+
// Initialize the initial count
35+
handler.updateCount()
36+
37+
return handler
38+
}
39+
40+
// metricsResourceEventHandler implements cache.ResourceEventHandler interface
41+
// to collect metrics about resources in the cache
42+
type metricsResourceEventHandler struct {
43+
gvk schema.GroupVersionKind
44+
informer cache.SharedIndexInformer
45+
}
46+
47+
// OnAdd is called when an object is added.
48+
func (h *metricsResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
49+
h.updateCount()
50+
}
51+
52+
// OnUpdate is called when an object is modified.
53+
func (h *metricsResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
54+
// No need to update counts on update as the total count hasn't changed
55+
}
56+
57+
// OnDelete is called when an object is deleted.
58+
func (h *metricsResourceEventHandler) OnDelete(obj interface{}) {
59+
h.updateCount()
60+
}
61+
62+
// updateCount updates the metrics with the current count of resources.
63+
func (h *metricsResourceEventHandler) updateCount() {
64+
count := len(h.informer.GetIndexer().ListKeys())
65+
metrics.RecordCacheResourceCount(h.gvk, count)
66+
}
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
. "github.com/onsi/ginkgo/v2"
24+
. "github.com/onsi/gomega"
25+
"github.com/prometheus/client_golang/prometheus"
26+
dto "github.com/prometheus/client_model/go"
27+
"k8s.io/apimachinery/pkg/runtime/schema"
28+
toolscache "k8s.io/client-go/tools/cache"
29+
30+
"sigs.k8s.io/controller-runtime/pkg/metrics"
31+
)
32+
33+
var _ = Describe("Metrics Handler", func() {
34+
35+
Describe("RecordCacheResourceCount", func() {
36+
var (
37+
podGVK schema.GroupVersionKind
38+
)
39+
40+
BeforeEach(func() {
41+
podGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
42+
})
43+
44+
DescribeTable("recording different resource counts",
45+
func(count int) {
46+
// Directly call RecordCacheResourceCount to record metrics
47+
metrics.RecordCacheResourceCount(podGVK, count)
48+
// Since we cannot directly verify prometheus metric values in tests
49+
// we can only ensure the function doesn't panic
50+
Expect(true).To(BeTrue()) // Simple assertion to show test passed
51+
},
52+
Entry("empty", 0),
53+
Entry("one pod", 1),
54+
Entry("multiple pods", 5),
55+
)
56+
})
57+
58+
Describe("MetricsResourceEventHandler", func() {
59+
var (
60+
podGVK schema.GroupVersionKind
61+
objects []interface{}
62+
indexer *mockIndexer
63+
informer *mockSharedIndexInformer
64+
handler *metricsResourceEventHandler
65+
metricRegistry *prometheus.Registry
66+
)
67+
68+
BeforeEach(func() {
69+
podGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
70+
objects = []interface{}{}
71+
indexer = &mockIndexer{getListFunc: func() []interface{} { return objects }}
72+
informer = &mockSharedIndexInformer{indexer: indexer}
73+
74+
// Reset metrics Registry
75+
metricRegistry = prometheus.NewRegistry()
76+
metrics.Registry = metricRegistry
77+
metrics.Registry.MustRegister(metrics.CacheResourceCount)
78+
79+
handler = NewMetricsResourceEventHandler(podGVK, informer)
80+
})
81+
82+
verifyMetricValue := func(gvk schema.GroupVersionKind, expectedValue float64) {
83+
gauge := metrics.CacheResourceCount.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind)
84+
var metric dto.Metric
85+
err := gauge.Write(&metric)
86+
Expect(err).NotTo(HaveOccurred(), "Failed to write metric")
87+
88+
actualValue := metric.GetGauge().GetValue()
89+
Expect(actualValue).To(Equal(expectedValue), "Metric value does not match expected")
90+
}
91+
92+
It("should update metrics on events", func() {
93+
// Verify initial state - empty list, count should be 0
94+
verifyMetricValue(podGVK, 0)
95+
96+
// Test OnAdd - adding a pod should update the count
97+
objects = append(objects, "pod-1")
98+
handler.OnAdd("pod-1", false)
99+
verifyMetricValue(podGVK, 1)
100+
101+
// Test OnUpdate - should not change the count since total object count hasn't changed
102+
handler.OnUpdate("pod-1", "pod-1-updated")
103+
verifyMetricValue(podGVK, 1)
104+
105+
// Add another pod
106+
objects = append(objects, "pod-2")
107+
handler.OnAdd("pod-2", false)
108+
verifyMetricValue(podGVK, 2)
109+
110+
// Test OnDelete - deleting a pod should update the count
111+
objects = objects[:1] // Only keep the first pod
112+
handler.OnDelete("pod-2")
113+
verifyMetricValue(podGVK, 1)
114+
115+
// Delete all pods
116+
objects = []interface{}{}
117+
handler.OnDelete("pod-1")
118+
verifyMetricValue(podGVK, 0)
119+
})
120+
})
121+
})
122+
123+
// mockIndexer is a simple Indexer implementation for testing
124+
type mockIndexer struct {
125+
getListFunc func() []interface{}
126+
}
127+
128+
func (m *mockIndexer) Add(obj interface{}) error {
129+
return nil
130+
}
131+
132+
func (m *mockIndexer) Update(obj interface{}) error {
133+
return nil
134+
}
135+
136+
func (m *mockIndexer) Delete(obj interface{}) error {
137+
return nil
138+
}
139+
140+
func (m *mockIndexer) List() []interface{} {
141+
return m.getListFunc()
142+
}
143+
144+
func (m *mockIndexer) ListKeys() []string {
145+
return nil
146+
}
147+
148+
func (m *mockIndexer) Get(obj interface{}) (item interface{}, exists bool, err error) {
149+
return nil, false, nil
150+
}
151+
152+
func (m *mockIndexer) GetByKey(key string) (item interface{}, exists bool, err error) {
153+
return nil, false, nil
154+
}
155+
156+
func (m *mockIndexer) Replace(list []interface{}, resourceVersion string) error {
157+
return nil
158+
}
159+
160+
func (m *mockIndexer) Resync() error {
161+
return nil
162+
}
163+
164+
func (m *mockIndexer) Index(indexName string, obj interface{}) ([]interface{}, error) {
165+
return nil, nil
166+
}
167+
168+
func (m *mockIndexer) IndexKeys(indexName, indexedValue string) ([]string, error) {
169+
return nil, nil
170+
}
171+
172+
func (m *mockIndexer) ListIndexFuncValues(indexName string) []string {
173+
return nil
174+
}
175+
176+
func (m *mockIndexer) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
177+
return nil, nil
178+
}
179+
180+
func (m *mockIndexer) GetIndexers() toolscache.Indexers {
181+
return nil
182+
}
183+
184+
func (m *mockIndexer) AddIndexers(newIndexers toolscache.Indexers) error {
185+
return nil
186+
}
187+
188+
// mockSharedIndexInformer is a simple SharedIndexInformer implementation for testing
189+
type mockSharedIndexInformer struct {
190+
indexer toolscache.Indexer
191+
}
192+
193+
func (m *mockSharedIndexInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) {
194+
return nil, nil
195+
}
196+
197+
func (m *mockSharedIndexInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) {
198+
return nil, nil
199+
}
200+
201+
func (m *mockSharedIndexInformer) AddEventHandlerWithOptions(handler toolscache.ResourceEventHandler, options toolscache.HandlerOptions) (toolscache.ResourceEventHandlerRegistration, error) {
202+
return nil, nil
203+
}
204+
205+
func (m *mockSharedIndexInformer) RemoveEventHandler(registration toolscache.ResourceEventHandlerRegistration) error {
206+
return nil
207+
}
208+
209+
func (m *mockSharedIndexInformer) GetStore() toolscache.Store {
210+
return m.indexer
211+
}
212+
213+
func (m *mockSharedIndexInformer) GetController() toolscache.Controller {
214+
return nil
215+
}
216+
217+
func (m *mockSharedIndexInformer) Run(stopCh <-chan struct{}) {
218+
}
219+
220+
func (m *mockSharedIndexInformer) RunWithContext(ctx context.Context) {
221+
}
222+
223+
func (m *mockSharedIndexInformer) HasSynced() bool {
224+
return true
225+
}
226+
227+
func (m *mockSharedIndexInformer) LastSyncResourceVersion() string {
228+
return ""
229+
}
230+
231+
func (m *mockSharedIndexInformer) SetWatchErrorHandler(handler toolscache.WatchErrorHandler) error {
232+
return nil
233+
}
234+
235+
func (m *mockSharedIndexInformer) SetWatchErrorHandlerWithContext(handler toolscache.WatchErrorHandlerWithContext) error {
236+
return nil
237+
}
238+
239+
func (m *mockSharedIndexInformer) SetTransform(transformer toolscache.TransformFunc) error {
240+
return nil
241+
}
242+
243+
func (m *mockSharedIndexInformer) GetIndexer() toolscache.Indexer {
244+
return m.indexer
245+
}
246+
247+
func (m *mockSharedIndexInformer) AddIndexers(indexers toolscache.Indexers) error {
248+
return nil
249+
}
250+
251+
func (m *mockSharedIndexInformer) IsStopped() bool {
252+
return false
253+
}

pkg/cache/internal/suite_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/ginkgo/v2"
23+
. "github.com/onsi/gomega"
24+
)
25+
26+
func TestCacheInternal(t *testing.T) {
27+
RegisterFailHandler(Fail)
28+
RunSpecs(t, "Cache Internal Suite")
29+
}

0 commit comments

Comments
 (0)