diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 626c70e81..d3070cad3 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -7,6 +7,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -30,6 +31,7 @@ import ( "k8s.io/client-go/tools/pager" watchutil "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/watchlist" "k8s.io/klog/v2/textlogger" "k8s.io/kubectl/pkg/util/openapi" @@ -248,7 +250,9 @@ type clusterCache struct { openAPISchema openapi.Resources gvkParser *managedfields.GvkParser - respectRBAC int + respectRBAC int + listResourcesUsingWatchAPI atomic.Int32 + listResourcesUsingRegularAPI atomic.Int32 } type clusterCacheSync struct { @@ -600,10 +604,26 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso listRetry = retry.DefaultRetry } + if opts.ResourceVersion == "" { + opts.ResourceVersion = "0" + } + + watchListOpts, success, watchListErr := watchlist.PrepareWatchListOptionsFromListOptions(opts) + var listOpts metav1.ListOptions + if success { + listOpts = watchListOpts + c.listResourcesUsingWatchAPI.Add(1) + c.log.Info("Would try to use watch list options to list resources.") + } else { + listOpts = opts + c.listResourcesUsingRegularAPI.Add(1) + c.log.Info(fmt.Sprintf("Would use regular options to list resources. Watch list options couldn't be prepared. Optional error: %v", watchListErr)) + } + listRetry.Steps = int(c.listRetryLimit) err := retry.OnError(listRetry, c.listRetryFunc, func() error { var ierr error - res, ierr = resClient.List(ctx, opts) + res, ierr = resClient.List(ctx, listOpts) if ierr != nil { // Log out a retry if c.listRetryLimit > 1 && c.listRetryFunc(ierr) { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 893c6b877..cfd31a53a 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic/fake" + clientfeatures "k8s.io/client-go/features" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" testcore "k8s.io/client-go/testing" @@ -189,41 +190,80 @@ func Benchmark_sync(t *testing.B) { } } +type AlwaysEnabledGates struct{} + +func (AlwaysEnabledGates) Enabled(clientfeatures.Feature) bool { + return true +} + func TestEnsureSynced(t *testing.T) { - obj1 := &appsv1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployment", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "helm-guestbook1", - Namespace: "default1", - }, - } - obj2 := &appsv1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployment", + tests := []struct { + name string + listUsingWatchAPIs bool + }{ + { + name: "WatchAPIUsed", + listUsingWatchAPIs: true, }, - ObjectMeta: metav1.ObjectMeta{ - Name: "helm-guestbook2", - Namespace: "default2", + { + name: "WatchAPINotUsed", + listUsingWatchAPIs: false, }, } - cluster := newCluster(t, obj1, obj2) - err := cluster.EnsureSynced() - require.NoError(t, err) + originalFeatureGates := clientfeatures.FeatureGates() - cluster.lock.Lock() - defer cluster.lock.Unlock() + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if tc.listUsingWatchAPIs { + // Enable WatchListClient in particular. Setting via env variable here is too late. + clientfeatures.ReplaceFeatureGates(AlwaysEnabledGates{}) + } + obj1 := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "helm-guestbook1", + Namespace: "default1", + }, + } + obj2 := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "helm-guestbook2", + Namespace: "default2", + }, + } - assert.Len(t, cluster.resources, 2) - var names []string - for k := range cluster.resources { - names = append(names, k.Name) + cluster := newCluster(t, obj1, obj2) + err := cluster.EnsureSynced() + require.NoError(t, err) + + cluster.lock.Lock() + defer cluster.lock.Unlock() + + assert.Len(t, cluster.resources, 2) + var names []string + for k := range cluster.resources { + names = append(names, k.Name) + } + assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names) + + if tc.listUsingWatchAPIs { + assert.Positive(t, cluster.listResourcesUsingWatchAPI.Load()) + assert.Equal(t, int32(0), cluster.listResourcesUsingRegularAPI.Load()) + } else { + assert.Equal(t, int32(0), cluster.listResourcesUsingWatchAPI.Load()) + assert.Positive(t, cluster.listResourcesUsingRegularAPI.Load()) + } + clientfeatures.ReplaceFeatureGates(originalFeatureGates) + }) } - assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names) } func TestStatefulSetOwnershipInferred(t *testing.T) {