From 808a91f46f796fca92342d16d765b396abb665fa Mon Sep 17 00:00:00 2001 From: Andrii Korotkov Date: Sun, 4 May 2025 11:34:49 -0700 Subject: [PATCH 1/3] chore: Use watch APIs to list k8s resources Signed-off-by: Andrii Korotkov --- pkg/cache/cluster.go | 24 ++++++++++++++++++++++-- pkg/cache/cluster_test.go | 14 +++++++++++++- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 626c70e81..70b0ddc78 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -7,6 +7,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -30,6 +31,7 @@ import ( "k8s.io/client-go/tools/pager" watchutil "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/watchlist" "k8s.io/klog/v2/textlogger" "k8s.io/kubectl/pkg/util/openapi" @@ -248,7 +250,9 @@ type clusterCache struct { openAPISchema openapi.Resources gvkParser *managedfields.GvkParser - respectRBAC int + respectRBAC int + listResourcesUsingWatchAPI atomic.Int32 + listResourcesUsingRegularAPI atomic.Int32 } type clusterCacheSync struct { @@ -600,10 +604,26 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso listRetry = retry.DefaultRetry } + if opts.ResourceVersion == "" { + opts.ResourceVersion = "0" + } + + watchListOpts, success, watchListErr := watchlist.PrepareWatchListOptionsFromListOptions(opts) + var listOpts metav1.ListOptions + if success { + listOpts = watchListOpts + c.listResourcesUsingWatchAPI.Add(1) + c.log.Info("Would use watch list options to list resources.") + } else { + listOpts = opts + c.listResourcesUsingRegularAPI.Add(1) + c.log.Info(fmt.Sprintf("Would use regular options to list resources. Watch list options couldn't be prepared. Optional error: %v", watchListErr)) + } + listRetry.Steps = int(c.listRetryLimit) err := retry.OnError(listRetry, c.listRetryFunc, func() error { var ierr error - res, ierr = resClient.List(ctx, opts) + res, ierr = resClient.List(ctx, listOpts) if ierr != nil { // Log out a retry if c.listRetryLimit > 1 && c.listRetryFunc(ierr) { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 893c6b877..86c36bc83 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic/fake" + clientfeatures "k8s.io/client-go/features" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" testcore "k8s.io/client-go/testing" @@ -189,7 +190,15 @@ func Benchmark_sync(t *testing.B) { } } -func TestEnsureSynced(t *testing.T) { +type AlwaysEnabledGates struct{} + +func (AlwaysEnabledGates) Enabled(clientfeatures.Feature) bool { + return true +} + +func TestEnsureSyncedAndUsingWatchAPIs(t *testing.T) { + // Enable WatchListClient in particular. Setting via env variable here is too late. + clientfeatures.ReplaceFeatureGates(AlwaysEnabledGates{}) obj1 := &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", @@ -224,6 +233,9 @@ func TestEnsureSynced(t *testing.T) { names = append(names, k.Name) } assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names) + + assert.Greater(t, cluster.listResourcesUsingWatchAPI.Load(), int32(0)) + assert.Equal(t, cluster.listResourcesUsingRegularAPI.Load(), int32(0)) } func TestStatefulSetOwnershipInferred(t *testing.T) { From 349dfe58816ffce04ac997de4e79a1f785e7de84 Mon Sep 17 00:00:00 2001 From: Andrii Korotkov Date: Sun, 4 May 2025 11:44:40 -0700 Subject: [PATCH 2/3] Fix lint errors Signed-off-by: Andrii Korotkov --- pkg/cache/cluster_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 86c36bc83..dafa65933 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -234,8 +234,8 @@ func TestEnsureSyncedAndUsingWatchAPIs(t *testing.T) { } assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names) - assert.Greater(t, cluster.listResourcesUsingWatchAPI.Load(), int32(0)) - assert.Equal(t, cluster.listResourcesUsingRegularAPI.Load(), int32(0)) + assert.Positive(t, cluster.listResourcesUsingWatchAPI.Load()) + assert.Equal(t, int32(0), cluster.listResourcesUsingRegularAPI.Load()) } func TestStatefulSetOwnershipInferred(t *testing.T) { From d6d89f4f7a748456902b903936d5f43a7b7d31ac Mon Sep 17 00:00:00 2001 From: Andrii Korotkov Date: Sun, 11 May 2025 15:34:44 -0700 Subject: [PATCH 3/3] Update tests Signed-off-by: Andrii Korotkov --- pkg/cache/cluster.go | 2 +- pkg/cache/cluster_test.go | 94 +++++++++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 34 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 70b0ddc78..d3070cad3 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -613,7 +613,7 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso if success { listOpts = watchListOpts c.listResourcesUsingWatchAPI.Add(1) - c.log.Info("Would use watch list options to list resources.") + c.log.Info("Would try to use watch list options to list resources.") } else { listOpts = opts c.listResourcesUsingRegularAPI.Add(1) diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index dafa65933..cfd31a53a 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -196,46 +196,74 @@ func (AlwaysEnabledGates) Enabled(clientfeatures.Feature) bool { return true } -func TestEnsureSyncedAndUsingWatchAPIs(t *testing.T) { - // Enable WatchListClient in particular. Setting via env variable here is too late. - clientfeatures.ReplaceFeatureGates(AlwaysEnabledGates{}) - obj1 := &appsv1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployment", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "helm-guestbook1", - Namespace: "default1", - }, - } - obj2 := &appsv1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployment", +func TestEnsureSynced(t *testing.T) { + tests := []struct { + name string + listUsingWatchAPIs bool + }{ + { + name: "WatchAPIUsed", + listUsingWatchAPIs: true, }, - ObjectMeta: metav1.ObjectMeta{ - Name: "helm-guestbook2", - Namespace: "default2", + { + name: "WatchAPINotUsed", + listUsingWatchAPIs: false, }, } - cluster := newCluster(t, obj1, obj2) - err := cluster.EnsureSynced() - require.NoError(t, err) + originalFeatureGates := clientfeatures.FeatureGates() - cluster.lock.Lock() - defer cluster.lock.Unlock() + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if tc.listUsingWatchAPIs { + // Enable WatchListClient in particular. Setting via env variable here is too late. + clientfeatures.ReplaceFeatureGates(AlwaysEnabledGates{}) + } + obj1 := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "helm-guestbook1", + Namespace: "default1", + }, + } + obj2 := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "helm-guestbook2", + Namespace: "default2", + }, + } - assert.Len(t, cluster.resources, 2) - var names []string - for k := range cluster.resources { - names = append(names, k.Name) - } - assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names) + cluster := newCluster(t, obj1, obj2) + err := cluster.EnsureSynced() + require.NoError(t, err) + + cluster.lock.Lock() + defer cluster.lock.Unlock() - assert.Positive(t, cluster.listResourcesUsingWatchAPI.Load()) - assert.Equal(t, int32(0), cluster.listResourcesUsingRegularAPI.Load()) + assert.Len(t, cluster.resources, 2) + var names []string + for k := range cluster.resources { + names = append(names, k.Name) + } + assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names) + + if tc.listUsingWatchAPIs { + assert.Positive(t, cluster.listResourcesUsingWatchAPI.Load()) + assert.Equal(t, int32(0), cluster.listResourcesUsingRegularAPI.Load()) + } else { + assert.Equal(t, int32(0), cluster.listResourcesUsingWatchAPI.Load()) + assert.Positive(t, cluster.listResourcesUsingRegularAPI.Load()) + } + clientfeatures.ReplaceFeatureGates(originalFeatureGates) + }) + } } func TestStatefulSetOwnershipInferred(t *testing.T) {