diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 124ebd159..bf17d0aa5 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -209,7 +209,7 @@ type clusterCache struct { // namespacedResources is a simple map which indicates a groupKind is namespaced namespacedResources map[schema.GroupKind]bool - // maximum time we allow watches to run before relisting the group/kind and restarting the watch + // maximum time we allow watches to run before restarting them watchResyncTimeout time.Duration // sync retry timeout for cluster when sync error happens clusterSyncRetryTimeout time.Duration @@ -601,6 +601,10 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso } listRetry.Steps = int(c.listRetryLimit) + // We set the resource version to 0 below to proactively prevent the + // list API call from reaching etcd and make the server fetch the data + // from the watch cache instead. + opts.ResourceVersion = "0" err := retry.OnError(listRetry, c.listRetryFunc, func() error { var ierr error res, ierr = resClient.List(ctx, opts) @@ -651,6 +655,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc } func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) { + timeoutSeconds := int64(c.watchResyncTimeout.Seconds()) kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) { defer func() { if r := recover(); r != nil { @@ -668,6 +673,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.TimeoutSeconds = &timeoutSeconds res, err := resClient.Watch(ctx, options) if apierrors.IsNotFound(err) { c.stopWatching(api.GroupKind, ns) @@ -679,17 +685,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo return err } - defer func() { - w.Stop() - resourceVersion = "" - }() - - var watchResyncTimeoutCh <-chan time.Time - if c.watchResyncTimeout > 0 { - shouldResync := time.NewTimer(c.watchResyncTimeout) - defer shouldResync.Stop() - watchResyncTimeoutCh = shouldResync.C - } + defer w.Stop() for { select { @@ -697,12 +693,13 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo case <-ctx.Done(): return nil - // re-synchronize API state and restart watch periodically - case <-watchResyncTimeoutCh: - return fmt.Errorf("resyncing %s on %s due to timeout", api.GroupKind, c.config.Host) - // re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version case <-w.Done(): + // The underlying retry watcher has stopped, possibly due to specifying an RV in + // the watch request that is stale (error code 410). This forces us to relist + // objects from the kube-apiserver to get a fresher RV and we invoke that relist + // by resetting the locally stored RV. + resourceVersion = "" return fmt.Errorf("watch %s on %s has closed", api.GroupKind, c.config.Host) case event, ok := <-w.ResultChan(): @@ -712,8 +709,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo obj, ok := event.Object.(*unstructured.Unstructured) if !ok { + // We failed to cast the object received in the watch event to something + // that contains a resource version field. Because of that, we don't know + // from what RV we should reinitialize the watch connection, so in order to + // avoid any inconsistencies due to accidental skipping of a potential RV, + // we reset the locally stored RV to forcefully invoke the list API call to + // get it from the kube-apiserver. + resourceVersion = "" return fmt.Errorf("failed to convert to *unstructured.Unstructured: %v", event.Object) } + resourceVersion = obj.GetResourceVersion() c.recordEvent(event.Type, obj) if kube.IsCRD(obj) {