diff --git a/internal/store/builder.go b/internal/store/builder.go index 8cdf11dcf2..8c364ce375 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -516,7 +516,7 @@ func (b *Builder) buildIngressClassStores() []cache.Store { func (b *Builder) buildStores( metricFamilies []generator.FamilyGenerator, expectedType interface{}, - listWatchFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, + listWatchWithContextFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext, useAPIServerCache bool, objectLimit int64, ) []cache.Store { metricFamilies = generator.FilterFamilyGenerators(b.familyGeneratorFilter, metricFamilies) @@ -531,8 +531,8 @@ func (b *Builder) buildStores( if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listWatchFunc(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) + listWatcherWithContext := listWatchWithContextFunc(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit) return []cache.Store{store} } @@ -545,8 +545,8 @@ func (b *Builder) buildStores( if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listWatchFunc(b.kubeClient, ns, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) + listWatcherWithContext := listWatchWithContextFunc(b.kubeClient, ns, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit) stores = append(stores, store) } @@ -557,7 +557,7 @@ func (b *Builder) buildStores( func (b *Builder) buildCustomResourceStores(resourceName string, metricFamilies []generator.FamilyGenerator, expectedType interface{}, - listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher, + listWatchWithContextFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext, useAPIServerCache bool, objectLimit int64, ) []cache.Store { metricFamilies = generator.FilterFamilyGenerators(b.familyGeneratorFilter, metricFamilies) @@ -589,8 +589,8 @@ func (b *Builder) buildCustomResourceStores(resourceName string, if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listWatchFunc(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) + listWatcherWithContext := listWatchWithContextFunc(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit) return []cache.Store{store} } @@ -601,8 +601,8 @@ func (b *Builder) buildCustomResourceStores(resourceName string, composedMetricGenFuncs, ) klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) - listWatcher := listWatchFunc(customResourceClient, ns, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) + listWatcherWithContext := listWatchWithContextFunc(customResourceClient, ns, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit) stores = append(stores, store) } @@ -614,12 +614,12 @@ func (b *Builder) buildCustomResourceStores(resourceName string, func (b *Builder) startReflector( expectedType interface{}, store cache.Store, - listWatcher cache.ListerWatcher, + listWatcherWithContext cache.ListerWatcherWithContext, useAPIServerCache bool, objectLimit int64, ) { - instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit) - reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) + instrumentedListWatchWithContext := watch.NewInstrumentedListerWatcher(listWatcherWithContext, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit) + reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) if cr, ok := expectedType.(*unstructured.Unstructured); ok { go reflector.Run((*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()]) } else { diff --git a/internal/store/certificatesigningrequest.go b/internal/store/certificatesigningrequest.go index ea287e1490..452b0517fc 100644 --- a/internal/store/certificatesigningrequest.go +++ b/internal/store/certificatesigningrequest.go @@ -153,13 +153,13 @@ func wrapCSRFunc(f func(*certv1.CertificateSigningRequest) *metric.Family) func( } } -func createCSRListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createCSRListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CertificatesV1().CertificateSigningRequests().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.CertificatesV1().CertificateSigningRequests().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CertificatesV1().CertificateSigningRequests().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.CertificatesV1().CertificateSigningRequests().Watch(ctx, opts) }, } } diff --git a/internal/store/clusterrole.go b/internal/store/clusterrole.go index 374a48a10e..0108513303 100644 --- a/internal/store/clusterrole.go +++ b/internal/store/clusterrole.go @@ -138,13 +138,13 @@ func clusterRoleMetricFamilies(allowAnnotationsList, allowLabelsList []string) [ } } -func createClusterRoleListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createClusterRoleListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.RbacV1().ClusterRoles().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.RbacV1().ClusterRoles().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.RbacV1().ClusterRoles().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.RbacV1().ClusterRoles().Watch(ctx, opts) }, } } diff --git a/internal/store/clusterrolebinding.go b/internal/store/clusterrolebinding.go index 302a6b40f4..1adc40191f 100644 --- a/internal/store/clusterrolebinding.go +++ b/internal/store/clusterrolebinding.go @@ -140,13 +140,13 @@ func clusterRoleBindingMetricFamilies(allowAnnotationsList, allowLabelsList []st } } -func createClusterRoleBindingListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createClusterRoleBindingListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.RbacV1().ClusterRoleBindings().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.RbacV1().ClusterRoleBindings().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.RbacV1().ClusterRoleBindings().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.RbacV1().ClusterRoleBindings().Watch(ctx, opts) }, } } diff --git a/internal/store/configmap.go b/internal/store/configmap.go index 7fc7f246b0..66935516b3 100644 --- a/internal/store/configmap.go +++ b/internal/store/configmap.go @@ -134,15 +134,15 @@ func configMapMetricFamilies(allowAnnotationsList, allowLabelsList []string) []g } } -func createConfigMapListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createConfigMapListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ConfigMaps(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().ConfigMaps(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ConfigMaps(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().ConfigMaps(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/cronjob.go b/internal/store/cronjob.go index eae698fafb..bdbba39e76 100644 --- a/internal/store/cronjob.go +++ b/internal/store/cronjob.go @@ -338,15 +338,15 @@ func wrapCronJobFunc(f func(*batchv1.CronJob) *metric.Family) func(interface{}) } } -func createCronJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createCronJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.BatchV1().CronJobs(ns).List(context.TODO(), opts) + return kubeClient.BatchV1().CronJobs(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.BatchV1().CronJobs(ns).Watch(context.TODO(), opts) + return kubeClient.BatchV1().CronJobs(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/daemonset.go b/internal/store/daemonset.go index 2ef2bb0c4b..196f5d4d00 100644 --- a/internal/store/daemonset.go +++ b/internal/store/daemonset.go @@ -284,15 +284,15 @@ func wrapDaemonSetFunc(f func(*v1.DaemonSet) *metric.Family) func(interface{}) * } } -func createDaemonSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createDaemonSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().DaemonSets(ns).List(context.TODO(), opts) + return kubeClient.AppsV1().DaemonSets(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().DaemonSets(ns).Watch(context.TODO(), opts) + return kubeClient.AppsV1().DaemonSets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/deployment.go b/internal/store/deployment.go index 7f902b0c16..302a5c41c3 100644 --- a/internal/store/deployment.go +++ b/internal/store/deployment.go @@ -344,15 +344,15 @@ func wrapDeploymentFunc(f func(*v1.Deployment) *metric.Family) func(interface{}) } } -func createDeploymentListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createDeploymentListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().Deployments(ns).List(context.TODO(), opts) + return kubeClient.AppsV1().Deployments(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().Deployments(ns).Watch(context.TODO(), opts) + return kubeClient.AppsV1().Deployments(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/endpoint.go b/internal/store/endpoint.go index fddfa4fde5..5388e045dc 100644 --- a/internal/store/endpoint.go +++ b/internal/store/endpoint.go @@ -199,15 +199,15 @@ func wrapEndpointFunc(f func(*v1.Endpoints) *metric.Family) func(interface{}) *m } } -func createEndpointsListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createEndpointsListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Endpoints(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().Endpoints(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Endpoints(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().Endpoints(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/endpointslice.go b/internal/store/endpointslice.go index 6f5eeae751..a70d47cc85 100644 --- a/internal/store/endpointslice.go +++ b/internal/store/endpointslice.go @@ -255,15 +255,15 @@ func wrapEndpointSliceFunc(f func(*discoveryv1.EndpointSlice) *metric.Family) fu } } -func createEndpointSliceListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createEndpointSliceListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), opts) + return kubeClient.DiscoveryV1().EndpointSlices(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.DiscoveryV1().EndpointSlices(ns).Watch(context.TODO(), opts) + return kubeClient.DiscoveryV1().EndpointSlices(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/horizontalpodautoscaler.go b/internal/store/horizontalpodautoscaler.go index d0bc3f28b9..afc49e2e37 100644 --- a/internal/store/horizontalpodautoscaler.go +++ b/internal/store/horizontalpodautoscaler.go @@ -83,15 +83,15 @@ func wrapHPAFunc(f func(*autoscaling.HorizontalPodAutoscaler) *metric.Family) fu } } -func createHPAListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createHPAListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).List(context.TODO(), opts) + return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).Watch(context.TODO(), opts) + return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/ingress.go b/internal/store/ingress.go index 00edd6f181..560030975e 100644 --- a/internal/store/ingress.go +++ b/internal/store/ingress.go @@ -223,15 +223,15 @@ func wrapIngressFunc(f func(*networkingv1.Ingress) *metric.Family) func(interfac } } -func createIngressListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createIngressListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.NetworkingV1().Ingresses(ns).List(context.TODO(), opts) + return kubeClient.NetworkingV1().Ingresses(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.NetworkingV1().Ingresses(ns).Watch(context.TODO(), opts) + return kubeClient.NetworkingV1().Ingresses(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/ingressclass.go b/internal/store/ingressclass.go index 0724c9110a..7b4bea2d80 100644 --- a/internal/store/ingressclass.go +++ b/internal/store/ingressclass.go @@ -134,13 +134,13 @@ func wrapIngressClassFunc(f func(*networkingv1.IngressClass) *metric.Family) fun } } -func createIngressClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createIngressClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.NetworkingV1().IngressClasses().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.NetworkingV1().IngressClasses().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.NetworkingV1().IngressClasses().Watch(ctx, opts) }, } } diff --git a/internal/store/job.go b/internal/store/job.go index bac4806384..0240019045 100644 --- a/internal/store/job.go +++ b/internal/store/job.go @@ -440,15 +440,15 @@ func wrapJobFunc(f func(*v1batch.Job) *metric.Family) func(interface{}) *metric. } } -func createJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.BatchV1().Jobs(ns).List(context.TODO(), opts) + return kubeClient.BatchV1().Jobs(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.BatchV1().Jobs(ns).Watch(context.TODO(), opts) + return kubeClient.BatchV1().Jobs(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/lease.go b/internal/store/lease.go index 3df4251da0..eac54d56ab 100644 --- a/internal/store/lease.go +++ b/internal/store/lease.go @@ -116,15 +116,15 @@ func wrapLeaseFunc(f func(*coordinationv1.Lease) *metric.Family) func(interface{ } } -func createLeaseListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createLeaseListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoordinationV1().Leases(ns).List(context.TODO(), opts) + return kubeClient.CoordinationV1().Leases(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoordinationV1().Leases(ns).Watch(context.TODO(), opts) + return kubeClient.CoordinationV1().Leases(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/limitrange.go b/internal/store/limitrange.go index 1be7c1c370..a717e687e2 100644 --- a/internal/store/limitrange.go +++ b/internal/store/limitrange.go @@ -129,15 +129,15 @@ func wrapLimitRangeFunc(f func(*v1.LimitRange) *metric.Family) func(interface{}) } } -func createLimitRangeListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createLimitRangeListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().LimitRanges(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().LimitRanges(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().LimitRanges(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().LimitRanges(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/mutatingwebhookconfiguration.go b/internal/store/mutatingwebhookconfiguration.go index 8380346ace..4d8a604ed8 100644 --- a/internal/store/mutatingwebhookconfiguration.go +++ b/internal/store/mutatingwebhookconfiguration.go @@ -111,13 +111,13 @@ var ( } ) -func createMutatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createMutatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Watch(ctx, opts) }, } } diff --git a/internal/store/namespace.go b/internal/store/namespace.go index e46c73cf28..48cf5e7efd 100644 --- a/internal/store/namespace.go +++ b/internal/store/namespace.go @@ -176,13 +176,13 @@ func wrapNamespaceFunc(f func(*v1.Namespace) *metric.Family) func(interface{}) * } } -func createNamespaceListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createNamespaceListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CoreV1().Namespaces().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.CoreV1().Namespaces().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CoreV1().Namespaces().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.CoreV1().Namespaces().Watch(ctx, opts) }, } } diff --git a/internal/store/networkpolicy.go b/internal/store/networkpolicy.go index 7d546b8c29..c36c451dc9 100644 --- a/internal/store/networkpolicy.go +++ b/internal/store/networkpolicy.go @@ -156,15 +156,15 @@ func wrapNetworkPolicyFunc(f func(*networkingv1.NetworkPolicy) *metric.Family) f } } -func createNetworkPolicyListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createNetworkPolicyListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), opts) + return kubeClient.NetworkingV1().NetworkPolicies(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.NetworkingV1().NetworkPolicies(ns).Watch(context.TODO(), opts) + return kubeClient.NetworkingV1().NetworkPolicies(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/node.go b/internal/store/node.go index 7bd6e4aa8f..85f1e579fd 100644 --- a/internal/store/node.go +++ b/internal/store/node.go @@ -520,13 +520,13 @@ func wrapNodeFunc(f func(*v1.Node) *metric.Family) func(interface{}) *metric.Fam } } -func createNodeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createNodeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CoreV1().Nodes().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.CoreV1().Nodes().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CoreV1().Nodes().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.CoreV1().Nodes().Watch(ctx, opts) }, } } diff --git a/internal/store/persistentvolume.go b/internal/store/persistentvolume.go index c24ea76da1..d6a1a92723 100644 --- a/internal/store/persistentvolume.go +++ b/internal/store/persistentvolume.go @@ -76,13 +76,13 @@ func wrapPersistentVolumeFunc(f func(*v1.PersistentVolume) *metric.Family) func( } } -func createPersistentVolumeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createPersistentVolumeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CoreV1().PersistentVolumes().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.CoreV1().PersistentVolumes().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CoreV1().PersistentVolumes().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.CoreV1().PersistentVolumes().Watch(ctx, opts) }, } } diff --git a/internal/store/persistentvolumeclaim.go b/internal/store/persistentvolumeclaim.go index a8f7cdedc2..4fee337ea1 100644 --- a/internal/store/persistentvolumeclaim.go +++ b/internal/store/persistentvolumeclaim.go @@ -282,15 +282,15 @@ func wrapPersistentVolumeClaimFunc(f func(*v1.PersistentVolumeClaim) *metric.Fam } } -func createPersistentVolumeClaimListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createPersistentVolumeClaimListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().PersistentVolumeClaims(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().PersistentVolumeClaims(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().PersistentVolumeClaims(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/pod.go b/internal/store/pod.go index d4744dbb2d..8e0bd231be 100644 --- a/internal/store/pod.go +++ b/internal/store/pod.go @@ -1819,15 +1819,15 @@ func wrapPodFunc(f func(*v1.Pod) *metric.Family) func(interface{}) *metric.Famil } } -func createPodListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createPodListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Pods(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().Pods(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Pods(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().Pods(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/poddisruptionbudget.go b/internal/store/poddisruptionbudget.go index c4fa0cb5b0..378e9c9ada 100644 --- a/internal/store/poddisruptionbudget.go +++ b/internal/store/poddisruptionbudget.go @@ -202,15 +202,15 @@ func wrapPodDisruptionBudgetFunc(f func(*policyv1.PodDisruptionBudget) *metric.F } } -func createPodDisruptionBudgetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createPodDisruptionBudgetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.PolicyV1().PodDisruptionBudgets(ns).List(context.TODO(), opts) + return kubeClient.PolicyV1().PodDisruptionBudgets(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.PolicyV1().PodDisruptionBudgets(ns).Watch(context.TODO(), opts) + return kubeClient.PolicyV1().PodDisruptionBudgets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/replicaset.go b/internal/store/replicaset.go index 192299ff6d..05b686ccc8 100644 --- a/internal/store/replicaset.go +++ b/internal/store/replicaset.go @@ -270,15 +270,15 @@ func wrapReplicaSetFunc(f func(*v1.ReplicaSet) *metric.Family) func(interface{}) } } -func createReplicaSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createReplicaSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().ReplicaSets(ns).List(context.TODO(), opts) + return kubeClient.AppsV1().ReplicaSets(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().ReplicaSets(ns).Watch(context.TODO(), opts) + return kubeClient.AppsV1().ReplicaSets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/replicationcontroller.go b/internal/store/replicationcontroller.go index f29b1a9226..6c8ef5bece 100644 --- a/internal/store/replicationcontroller.go +++ b/internal/store/replicationcontroller.go @@ -226,15 +226,15 @@ func wrapReplicationControllerFunc(f func(*v1.ReplicationController) *metric.Fam } } -func createReplicationControllerListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createReplicationControllerListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ReplicationControllers(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().ReplicationControllers(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ReplicationControllers(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().ReplicationControllers(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/resourcequota.go b/internal/store/resourcequota.go index 6fc017cfc6..40dfb8c1be 100644 --- a/internal/store/resourcequota.go +++ b/internal/store/resourcequota.go @@ -154,15 +154,15 @@ func wrapResourceQuotaFunc(f func(*v1.ResourceQuota) *metric.Family) func(interf } } -func createResourceQuotaListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createResourceQuotaListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ResourceQuotas(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().ResourceQuotas(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ResourceQuotas(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().ResourceQuotas(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/role.go b/internal/store/role.go index 1ba5707c92..e1f2267fb1 100644 --- a/internal/store/role.go +++ b/internal/store/role.go @@ -138,15 +138,15 @@ func roleMetricFamilies(allowAnnotationsList, allowLabelsList []string) []genera } } -func createRoleListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createRoleListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.RbacV1().Roles(ns).List(context.TODO(), opts) + return kubeClient.RbacV1().Roles(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.RbacV1().Roles(ns).Watch(context.TODO(), opts) + return kubeClient.RbacV1().Roles(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/rolebinding.go b/internal/store/rolebinding.go index f371581dfe..4327a4d52c 100644 --- a/internal/store/rolebinding.go +++ b/internal/store/rolebinding.go @@ -140,15 +140,15 @@ func roleBindingMetricFamilies(allowAnnotationsList, allowLabelsList []string) [ } } -func createRoleBindingListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createRoleBindingListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.RbacV1().RoleBindings(ns).List(context.TODO(), opts) + return kubeClient.RbacV1().RoleBindings(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.RbacV1().RoleBindings(ns).Watch(context.TODO(), opts) + return kubeClient.RbacV1().RoleBindings(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/secret.go b/internal/store/secret.go index 644cbb7798..0f49935b8b 100644 --- a/internal/store/secret.go +++ b/internal/store/secret.go @@ -218,15 +218,15 @@ func wrapSecretFunc(f func(*v1.Secret) *metric.Family) func(interface{}) *metric } } -func createSecretListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createSecretListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Secrets(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().Secrets(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Secrets(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().Secrets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/service.go b/internal/store/service.go index e4a8282e58..bcdb59977a 100644 --- a/internal/store/service.go +++ b/internal/store/service.go @@ -201,15 +201,15 @@ func wrapSvcFunc(f func(*v1.Service) *metric.Family) func(interface{}) *metric.F } } -func createServiceListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createServiceListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Services(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().Services(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Services(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().Services(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/serviceaccount.go b/internal/store/serviceaccount.go index f7b7f9394f..4f8529a188 100644 --- a/internal/store/serviceaccount.go +++ b/internal/store/serviceaccount.go @@ -235,15 +235,15 @@ func wrapServiceAccountFunc(f func(*v1.ServiceAccount) *metric.Family) func(inte } } -func createServiceAccountListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createServiceAccountListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ServiceAccounts(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().ServiceAccounts(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ServiceAccounts(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().ServiceAccounts(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/statefulset.go b/internal/store/statefulset.go index 2cb6329d7c..3268ea7925 100644 --- a/internal/store/statefulset.go +++ b/internal/store/statefulset.go @@ -338,15 +338,15 @@ func wrapStatefulSetFunc(f func(*v1.StatefulSet) *metric.Family) func(interface{ } } -func createStatefulSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createStatefulSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().StatefulSets(ns).List(context.TODO(), opts) + return kubeClient.AppsV1().StatefulSets(ns).List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().StatefulSets(ns).Watch(context.TODO(), opts) + return kubeClient.AppsV1().StatefulSets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/storageclass.go b/internal/store/storageclass.go index 15c76d7965..bdd0fc16cd 100644 --- a/internal/store/storageclass.go +++ b/internal/store/storageclass.go @@ -146,13 +146,13 @@ func wrapStorageClassFunc(f func(*storagev1.StorageClass) *metric.Family) func(i } } -func createStorageClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createStorageClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.StorageV1().StorageClasses().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.StorageV1().StorageClasses().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.StorageV1().StorageClasses().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.StorageV1().StorageClasses().Watch(ctx, opts) }, } } diff --git a/internal/store/validatingwebhookconfiguration.go b/internal/store/validatingwebhookconfiguration.go index 070daba51a..9ebc814cc2 100644 --- a/internal/store/validatingwebhookconfiguration.go +++ b/internal/store/validatingwebhookconfiguration.go @@ -111,13 +111,13 @@ var ( } ) -func createValidatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createValidatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(ctx, opts) }, } } diff --git a/internal/store/volumeattachment.go b/internal/store/volumeattachment.go index 22cdd8e963..0f3783633d 100644 --- a/internal/store/volumeattachment.go +++ b/internal/store/volumeattachment.go @@ -167,13 +167,13 @@ func wrapVolumeAttachmentFunc(f func(*storagev1.VolumeAttachment) *metric.Family } } -func createVolumeAttachmentListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createVolumeAttachmentListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.StorageV1().VolumeAttachments().List(context.TODO(), opts) + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.StorageV1().VolumeAttachments().List(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.StorageV1().VolumeAttachments().Watch(context.TODO(), opts) + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeClient.StorageV1().VolumeAttachments().Watch(ctx, opts) }, } } diff --git a/pkg/app/server_test.go b/pkg/app/server_test.go index afc7d82e03..eb4fee3bfb 100644 --- a/pkg/app/server_test.go +++ b/pkg/app/server_test.go @@ -627,9 +627,6 @@ func TestCustomResourceExtension(t *testing.T) { customResourceClients[f.Name()] = customResourceClient } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - reg := prometheus.NewRegistry() builder := store.NewBuilder() builder.WithCustomResourceStoreFactories(factories...) @@ -659,7 +656,7 @@ func TestCustomResourceExtension(t *testing.T) { }) handler := metricshandler.New(&options.Options{}, kubeClient, builder, false) - handler.ConfigureSharding(ctx, 0, 1) + handler.ConfigureSharding(builder.ctx, 0, 1) // Wait for caches to fill time.Sleep(time.Second) @@ -968,14 +965,14 @@ func (f *fooFactory) ExpectedType() interface{} { return &samplev1alpha1.Foo{} } -func (f *fooFactory) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher { +func (f *fooFactory) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext { client := customResourceClient.(*samplefake.Clientset) return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return client.SamplecontrollerV1alpha1().Foos(ns).List(context.Background(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return client.SamplecontrollerV1alpha1().Foos(ns).Watch(context.Background(), opts) }, diff --git a/pkg/builder/builder_test.go b/pkg/builder/builder_test.go index 4ecd59a29e..cde6f5493e 100644 --- a/pkg/builder/builder_test.go +++ b/pkg/builder/builder_test.go @@ -65,7 +65,7 @@ func TestBuilderWithCustomStore(t *testing.T) { func customStore(_ []generator.FamilyGenerator, _ interface{}, - _ func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, + _ func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext, _ bool, _ int64, ) []cache.Store { diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index bc13ca3c3e..a9d31bc913 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -56,7 +56,7 @@ type BuilderInterface interface { // BuildStoresFunc function signature that is used to return a list of cache.Store type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator, expectedType interface{}, - listWatchFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, + listWatchWithContextFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext, useAPIServerCache bool, limit int64, ) []cache.Store @@ -64,7 +64,7 @@ type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator, type BuildCustomResourceStoresFunc func(resourceName string, metricFamilies []generator.FamilyGenerator, expectedType interface{}, - listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher, + listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext, useAPIServerCache bool, limit int64, ) []cache.Store diff --git a/pkg/customresource/registry_factory.go b/pkg/customresource/registry_factory.go index 2edc0fe1eb..5650b62268 100644 --- a/pkg/customresource/registry_factory.go +++ b/pkg/customresource/registry_factory.go @@ -114,5 +114,5 @@ type RegistryFactory interface { // }, // } // } - ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher + ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext } diff --git a/pkg/customresourcestate/custom_resource_metrics.go b/pkg/customresourcestate/custom_resource_metrics.go index 9ca2700835..ed707489cd 100644 --- a/pkg/customresourcestate/custom_resource_metrics.go +++ b/pkg/customresourcestate/custom_resource_metrics.go @@ -90,15 +90,14 @@ func (s customResourceMetrics) ExpectedType() interface{} { return &u } -func (s customResourceMetrics) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher { +func (s customResourceMetrics) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext { api := customResourceClient.(dynamic.NamespaceableResourceInterface).Namespace(ns) - ctx := context.Background() return &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector return api.List(ctx, options) }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { options.FieldSelector = fieldSelector return api.Watch(ctx, options) }, diff --git a/pkg/sharding/listwatch.go b/pkg/sharding/listwatch.go index 99ca091a8c..05e9a714e7 100644 --- a/pkg/sharding/listwatch.go +++ b/pkg/sharding/listwatch.go @@ -17,6 +17,7 @@ limitations under the License. package sharding import ( + "context" "hash/fnv" jump "github.com/dgryski/go-jump" @@ -29,23 +30,23 @@ import ( type shardedListWatch struct { sharding *sharding - lw cache.ListerWatcher + lwc cache.ListerWatcherWithContext } // NewShardedListWatch returns a new shardedListWatch via the cache.ListerWatcher interface. // In the case of no sharding needed, it returns the provided cache.ListerWatcher -func NewShardedListWatch(shard int32, totalShards int, lw cache.ListerWatcher) cache.ListerWatcher { +func NewShardedListWatch(shard int32, totalShards int, lwc cache.ListerWatcherWithContext) cache.ListerWatcherWithContext { // This is an "optimization" as this configuration means no sharding is to // be performed. if shard == 0 && totalShards == 1 { - return lw + return lwc } - return &shardedListWatch{sharding: &sharding{shard: shard, totalShards: totalShards}, lw: lw} + return &shardedListWatch{sharding: &sharding{shard: shard, totalShards: totalShards}, lwc: lwc} } -func (s *shardedListWatch) List(options metav1.ListOptions) (runtime.Object, error) { - list, err := s.lw.List(options) +func (s *shardedListWatch) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + list, err := s.lwc.ListWithContext(ctx, options) if err != nil { return nil, err } @@ -74,8 +75,8 @@ func (s *shardedListWatch) List(options metav1.ListOptions) (runtime.Object, err return res, nil } -func (s *shardedListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { - w, err := s.lw.Watch(options) +func (s *shardedListWatch) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + w, err := s.lwc.WatchWithContext(ctx, options) if err != nil { return nil, err } diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index d65b8a3d74..07f894eed5 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -17,6 +17,7 @@ limitations under the License. package watch import ( + "context" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "k8s.io/apimachinery/pkg/api/meta" @@ -73,7 +74,7 @@ func NewListWatchMetrics(r prometheus.Registerer) *ListWatchMetrics { // InstrumentedListerWatcher provides the kube_state_metrics_watch_total metric // with a cache.ListerWatcher obj and the related resource. type InstrumentedListerWatcher struct { - lw cache.ListerWatcher + lwc cache.ListerWatcherWithContext metrics *ListWatchMetrics resource string useAPIServerCache bool @@ -81,9 +82,9 @@ type InstrumentedListerWatcher struct { } // NewInstrumentedListerWatcher returns a new InstrumentedListerWatcher. -func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcher { +func NewInstrumentedListerWatcher(lwc cache.ListerWatcherWithContext, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcherWithContext { return &InstrumentedListerWatcher{ - lw: lw, + lwc: lwc, metrics: metrics, resource: resource, useAPIServerCache: useAPIServerCache, @@ -95,7 +96,7 @@ func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetr // / counters based on the outcome of the List operation it instruments. // It supports setting object limits, this means if it is set it will only list and process // n objects of the same resource type. -func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { +func (i *InstrumentedListerWatcher) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { if i.useAPIServerCache { options.ResourceVersion = "0" @@ -106,7 +107,7 @@ func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Ob i.metrics.ListObjectsLimit.WithLabelValues(i.resource).Set(float64(i.limit)) } - res, err := i.lw.List(options) + res, err := i.lwc.ListWithContext(ctx, options) if err != nil { i.metrics.ListRequestsTotal.WithLabelValues("error", i.resource).Inc() @@ -134,8 +135,8 @@ func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Ob // Watch is a wrapper func around the cache.ListerWatcher.Watch func. It increases the success/error // counters based on the outcome of the Watch operation it instruments. -func (i *InstrumentedListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - res, err := i.lw.Watch(options) +func (i *InstrumentedListerWatcher) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + res, err := i.lwc.WatchWithContext(ctx, options) if err != nil { i.metrics.WatchRequestsTotal.WithLabelValues("error", i.resource).Inc() return nil, err