Skip to content

Commit ea6b123

Browse files
committed
fix: close reflectors once their corresponding CRDs are dropped
Signed-off-by: Pranshu Srivastava <[email protected]>
1 parent dbf6e9b commit ea6b123

File tree

4 files changed

+20
-3
lines changed

4 files changed

+20
-3
lines changed

internal/discovery/discovery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config)
6464
},
6565
Plural: p,
6666
}
67-
r.AppendToMap(gotGVKP)
6867
r.SafeWrite(func() {
68+
r.AppendToMap(gotGVKP)
6969
r.WasUpdated = true
7070
})
7171
}
@@ -89,8 +89,8 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config)
8989
},
9090
Plural: p,
9191
}
92-
r.RemoveFromMap(gotGVKP)
9392
r.SafeWrite(func() {
93+
r.RemoveFromMap(gotGVKP)
9494
r.WasUpdated = true
9595
})
9696
}

internal/discovery/types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type CRDiscoverer struct {
4545
CRDsCacheCountGauge prometheus.Gauge
4646
// Map is a cache of the collected GVKs.
4747
Map map[string]map[string][]kindPlural
48+
// GVKToReflectorStopChanMap is a map of GVKs to channels that can be used to stop their corresponding reflector.
49+
GVKToReflectorStopChanMap map[string]chan struct{}
4850
// m is a mutex to protect the cache.
4951
m sync.RWMutex
5052
// ShouldUpdate is a flag that indicates whether the cache was updated.
@@ -70,6 +72,9 @@ func (r *CRDiscoverer) AppendToMap(gvkps ...groupVersionKindPlural) {
7072
if r.Map == nil {
7173
r.Map = map[string]map[string][]kindPlural{}
7274
}
75+
if r.GVKToReflectorStopChanMap == nil {
76+
r.GVKToReflectorStopChanMap = map[string]chan struct{}{}
77+
}
7378
for _, gvkp := range gvkps {
7479
if _, ok := r.Map[gvkp.Group]; !ok {
7580
r.Map[gvkp.Group] = map[string][]kindPlural{}
@@ -78,6 +83,7 @@ func (r *CRDiscoverer) AppendToMap(gvkps ...groupVersionKindPlural) {
7883
r.Map[gvkp.Group][gvkp.Version] = []kindPlural{}
7984
}
8085
r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version], kindPlural{Kind: gvkp.Kind, Plural: gvkp.Plural})
86+
r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()] = make(chan struct{})
8187
}
8288
}
8389

@@ -92,6 +98,8 @@ func (r *CRDiscoverer) RemoveFromMap(gvkps ...groupVersionKindPlural) {
9298
}
9399
for i, el := range r.Map[gvkp.Group][gvkp.Version] {
94100
if el.Kind == gvkp.Kind {
101+
close(r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()])
102+
delete(r.GVKToReflectorStopChanMap, gvkp.GroupVersionKind.String())
95103
if len(r.Map[gvkp.Group][gvkp.Version]) == 1 {
96104
delete(r.Map[gvkp.Group], gvkp.Version)
97105
if len(r.Map[gvkp.Group]) == 0 {

internal/store/builder.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
policyv1 "k8s.io/api/policy/v1"
3939
rbacv1 "k8s.io/api/rbac/v1"
4040
storagev1 "k8s.io/api/storage/v1"
41+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
4142
clientset "k8s.io/client-go/kubernetes"
4243
"k8s.io/client-go/tools/cache"
4344
"k8s.io/klog/v2"
@@ -84,6 +85,8 @@ type Builder struct {
8485
shard int32
8586
useAPIServerCache bool
8687
objectLimit int64
88+
89+
GVKToReflectorStopChanMap *map[string]chan struct{}
8790
}
8891

8992
// NewBuilder returns a new builder.
@@ -617,7 +620,11 @@ func (b *Builder) startReflector(
617620
) {
618621
instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit)
619622
reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0})
620-
go reflector.Run(b.ctx.Done())
623+
if cr, ok := expectedType.(*unstructured.Unstructured); ok {
624+
go reflector.Run((*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()])
625+
} else {
626+
go reflector.Run(b.ctx.Done())
627+
}
621628
}
622629

623630
// cacheStoresToMetricStores converts []cache.Store into []*metricsstore.MetricsStore

pkg/app/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
296296
CRDsDeleteEventsCounter: crdsDeleteEventsCounter,
297297
CRDsCacheCountGauge: crdsCacheCountGauge,
298298
}
299+
// storeBuilder starts reflectors for the discovered GVKs, and as such, should close them too.
300+
storeBuilder.GVKToReflectorStopChanMap = &discovererInstance.GVKToReflectorStopChanMap
299301
// This starts a goroutine that will watch for any new GVKs to extract from CRDs.
300302
err = discovererInstance.StartDiscovery(ctx, kubeConfig)
301303
if err != nil {

0 commit comments

Comments
 (0)