Skip to content

refactor: Replace ListerWatcher with ListerWatcherWithContext #2724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions internal/store/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,6 @@
metricFamilies = generator.FilterFamilyGenerators(b.familyGeneratorFilter, metricFamilies)
composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies)
familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies)
var listerWatcher func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher

if b.namespaces.IsAllNamespaces() {
store := metricsstore.NewMetricsStore(
Expand All @@ -532,8 +531,8 @@
if b.fieldSelectorFilter != "" {
klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter)
}
listWatcher := listerWatcher(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit)
listWatcherWithContext := listWatchWithContextFunc(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit)
return []cache.Store{store}
}

Expand All @@ -546,8 +545,8 @@
if b.fieldSelectorFilter != "" {
klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter)
}
listWatcher := listerWatcher(b.kubeClient, ns, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit)
listWatcherWithContext := listWatchWithContextFunc(b.kubeClient, ns, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit)
stores = append(stores, store)
}

Expand All @@ -566,8 +565,6 @@

familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies)

var listerWatcher func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher

gvr, err := util.GVRFromType(resourceName, expectedType)
if err != nil {
klog.ErrorS(err, "Failed to get GVR from type", "resourceName", resourceName, "expectedType", expectedType)
Expand All @@ -592,8 +589,8 @@
if b.fieldSelectorFilter != "" {
klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter)
}
listWatcher := listerWatcher(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit)
listWatcherWithContext := listWatchWithContextFunc(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit)
return []cache.Store{store}
}

Expand All @@ -604,8 +601,8 @@
composedMetricGenFuncs,
)
klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter)
listWatcher := listerWatcher(customResourceClient, ns, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit)
listWatcherWithContext := listWatchWithContextFunc(customResourceClient, ns, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit)
stores = append(stores, store)
}

Expand All @@ -617,12 +614,12 @@
func (b *Builder) startReflector(
expectedType interface{},
store cache.Store,
listWatcher cache.ListerWatcher,
listWatcherWithContext cache.ListerWatcherWithContext,
useAPIServerCache bool,
objectLimit int64,
) {
instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit)
reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0})
instrumentedListWatchWithContext := watch.NewInstrumentedListerWatcher(listWatcherWithContext, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit)
reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0})

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-go-lint

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List)) (typecheck)

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-go-lint

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List)) (typecheck)

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-go-lint

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List)) (typecheck)

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-go-lint

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List) (typecheck)

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-go-lint

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List)) (typecheck)

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-unit-tests

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List)

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-benchmark-tests

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List)

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-build-kube-state-metrics

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List)

Check failure on line 622 in internal/store/builder.go

View workflow job for this annotation

GitHub Actions / ci-validate-docs

cannot use sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext) (value of interface type "k8s.io/client-go/tools/cache".ListerWatcherWithContext) as "k8s.io/client-go/tools/cache".ListerWatcher value in argument to cache.NewReflectorWithOptions: "k8s.io/client-go/tools/cache".ListerWatcherWithContext does not implement "k8s.io/client-go/tools/cache".ListerWatcher (missing method List)
Copy link
Author

@onasser1 onasser1 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue:
newReflectorWithOptions which is used from client-go is taking a ListerWatcher as an argument and inside its logic converts it to ListerWatcherWithContext
https://github.com/kubernetes/client-go/blob/v0.33.3/tools/cache/reflector.go#L259

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch. Do you want to work with client-go folks to update the implementation to support ListerWatcherWithContext as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely Yes! but my question is should we update the parameter list to receive ListerWatcherWithContext but wouldn't this make a dependency conflict? Or we should support another version of the function with different parameter list as we would do an override.
or Let's move this question to a new issue at client-go repo? WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since lister watcher without context is deprecated, I think we need a new NewReflector function in client-go. Yeah please open up an issue with client-go

if cr, ok := expectedType.(*unstructured.Unstructured); ok {
go reflector.Run((*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()])
} else {
Expand Down
17 changes: 9 additions & 8 deletions pkg/sharding/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sharding

import (
"context"
"hash/fnv"

jump "github.com/dgryski/go-jump"
Expand All @@ -29,23 +30,23 @@ import (

type shardedListWatch struct {
sharding *sharding
lw cache.ListerWatcher
lwc cache.ListerWatcherWithContext
}

// NewShardedListWatch returns a new shardedListWatch via the cache.ListerWatcher interface.
// In the case of no sharding needed, it returns the provided cache.ListerWatcher
func NewShardedListWatch(shard int32, totalShards int, lw cache.ListerWatcher) cache.ListerWatcher {
func NewShardedListWatch(shard int32, totalShards int, lwc cache.ListerWatcherWithContext) cache.ListerWatcherWithContext {
// This is an "optimization" as this configuration means no sharding is to
// be performed.
if shard == 0 && totalShards == 1 {
return lw
return lwc
}

return &shardedListWatch{sharding: &sharding{shard: shard, totalShards: totalShards}, lw: lw}
return &shardedListWatch{sharding: &sharding{shard: shard, totalShards: totalShards}, lwc: lwc}
}

func (s *shardedListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
list, err := s.lw.List(options)
func (s *shardedListWatch) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
list, err := s.lwc.ListWithContext(ctx, options)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -74,8 +75,8 @@ func (s *shardedListWatch) List(options metav1.ListOptions) (runtime.Object, err
return res, nil
}

func (s *shardedListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
w, err := s.lw.Watch(options)
func (s *shardedListWatch) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
w, err := s.lwc.WatchWithContext(ctx, options)
if err != nil {
return nil, err
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package watch

import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -73,17 +74,17 @@ func NewListWatchMetrics(r prometheus.Registerer) *ListWatchMetrics {
// InstrumentedListerWatcher provides the kube_state_metrics_watch_total metric
// with a cache.ListerWatcher obj and the related resource.
type InstrumentedListerWatcher struct {
lw cache.ListerWatcher
lwc cache.ListerWatcherWithContext
metrics *ListWatchMetrics
resource string
useAPIServerCache bool
limit int64
}

// NewInstrumentedListerWatcher returns a new InstrumentedListerWatcher.
func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcher {
func NewInstrumentedListerWatcher(lwc cache.ListerWatcherWithContext, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcherWithContext {
return &InstrumentedListerWatcher{
lw: lw,
lwc: lwc,
metrics: metrics,
resource: resource,
useAPIServerCache: useAPIServerCache,
Expand All @@ -95,7 +96,7 @@ func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetr
// / counters based on the outcome of the List operation it instruments.
// It supports setting object limits, this means if it is set it will only list and process
// n objects of the same resource type.
func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
func (i *InstrumentedListerWatcher) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {

if i.useAPIServerCache {
options.ResourceVersion = "0"
Expand All @@ -106,7 +107,7 @@ func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Ob
i.metrics.ListObjectsLimit.WithLabelValues(i.resource).Set(float64(i.limit))
}

res, err := i.lw.List(options)
res, err := i.lwc.ListWithContext(ctx, options)

if err != nil {
i.metrics.ListRequestsTotal.WithLabelValues("error", i.resource).Inc()
Expand Down Expand Up @@ -134,8 +135,8 @@ func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Ob

// Watch is a wrapper func around the cache.ListerWatcher.Watch func. It increases the success/error
// counters based on the outcome of the Watch operation it instruments.
func (i *InstrumentedListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
res, err := i.lw.Watch(options)
func (i *InstrumentedListerWatcher) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
res, err := i.lwc.WatchWithContext(ctx, options)
if err != nil {
i.metrics.WatchRequestsTotal.WithLabelValues("error", i.resource).Inc()
return nil, err
Expand Down
Loading