@@ -74,6 +74,9 @@ type Reflector struct {
74
74
// observed when doing a sync with the underlying store
75
75
// it is thread safe, but not synchronized with the underlying store
76
76
lastSyncResourceVersion string
77
+ // isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
78
+ // failed with an HTTP 410 (Gone) status code.
79
+ isLastSyncResourceVersionGone bool
77
80
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
78
81
lastSyncResourceVersionMutex sync.RWMutex
79
82
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
@@ -208,19 +211,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
208
211
if r .WatchListPageSize != 0 {
209
212
pager .PageSize = r .WatchListPageSize
210
213
}
211
- // Pager falls back to full list if paginated list calls fail due to an "Expired" error on the 2nd page or later,
212
- // but still my return an "Expired" error if the 1st page fails with "Expired" or the full list fails with "Expired".
214
+
213
215
list , err = pager .List (context .Background (), options )
214
- if apierrs .IsResourceExpired (err ) {
215
- // For Kubernetes 1.16 and earlier, if the watch cache is disabled for a resource, list requests
216
- // with LastSyncResourceVersion set to a non-zero ResourceVersion will fail if the exact ResourceVersion
217
- // requested is expired (e.g. an etcd compaction has remove it).
218
- // To prevent the reflector from getting stuck retrying a list for an expired resource version in this
219
- // case, we set ResourceVersion="" and list again to re-establish reflector to the latest available
220
- // ResourceVersion, using a consistent read from etcd. This is also safe to do if watch cache is enabled
221
- // and the list request returned a "Expired" error.
222
- options = metav1.ListOptions {ResourceVersion : "" }
223
- list , err = pager .List (context .Background (), options )
216
+ if isExpiredError (err ) {
217
+ r .setIsLastSyncResourceVersionExpired (true )
218
+ // Retry immediately if the resource version used to list is expired.
219
+ // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
220
+ // continuation pages, but the pager might not be enabled, or the full list might fail because the
221
+ // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
222
+ // to recover and ensure the reflector makes forward progress.
223
+ list , err = pager .List (context .Background (), metav1.ListOptions {ResourceVersion : r .relistResourceVersion ()})
224
224
}
225
225
close (listCh )
226
226
}()
@@ -234,6 +234,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
234
234
if err != nil {
235
235
return fmt .Errorf ("%s: Failed to list %v: %v" , r .name , r .expectedTypeName , err )
236
236
}
237
+ r .setIsLastSyncResourceVersionExpired (false ) // list was successful
237
238
initTrace .Step ("Objects listed" )
238
239
listMetaInterface , err := meta .ListAccessor (list )
239
240
if err != nil {
@@ -307,10 +308,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
307
308
308
309
w , err := r .listerWatcher .Watch (options )
309
310
if err != nil {
310
- switch err {
311
- case io .EOF :
311
+ switch {
312
+ case isExpiredError (err ):
313
+ r .setIsLastSyncResourceVersionExpired (true )
314
+ klog .V (4 ).Infof ("%s: watch of %v closed with: %v" , r .name , r .expectedTypeName , err )
315
+ case err == io .EOF :
312
316
// watch closed normally
313
- case io .ErrUnexpectedEOF :
317
+ case err == io .ErrUnexpectedEOF :
314
318
klog .V (1 ).Infof ("%s: Watch for %v closed with unexpected EOF: %v" , r .name , r .expectedTypeName , err )
315
319
default :
316
320
utilruntime .HandleError (fmt .Errorf ("%s: Failed to watch %v: %v" , r .name , r .expectedTypeName , err ))
@@ -329,7 +333,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
329
333
if err := r .watchHandler (w , & resourceVersion , resyncerrc , stopCh ); err != nil {
330
334
if err != errorStopRequested {
331
335
switch {
332
- case apierrs .IsResourceExpired (err ):
336
+ case isExpiredError (err ):
337
+ r .setIsLastSyncResourceVersionExpired (true )
333
338
klog .V (4 ).Infof ("%s: watch of %v ended with: %v" , r .name , r .expectedTypeName , err )
334
339
default :
335
340
klog .Warningf ("%s: watch of %v ended with: %v" , r .name , r .expectedTypeName , err )
@@ -442,16 +447,41 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
442
447
r .lastSyncResourceVersion = v
443
448
}
444
449
445
- // relistResourceVersion is the resource version the reflector should list or relist from.
450
+ // relistResourceVersion determines the resource version the reflector should list or relist from.
451
+ // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
452
+ // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
453
+ // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
454
+ // etcd via a quorum read.
446
455
func (r * Reflector ) relistResourceVersion () string {
447
- lastSyncRV := r .LastSyncResourceVersion ()
448
- if lastSyncRV == "" {
449
- // Explicitly set resource version to have it list from cache for
450
- // performance reasons.
451
- // It's fine for the returned state to be stale (we will catch up via Watch()
452
- // eventually), but we need to be at least as new as the last resource version we
453
- // synced to avoid going back in time.
456
+ r .lastSyncResourceVersionMutex .RLock ()
457
+ defer r .lastSyncResourceVersionMutex .RUnlock ()
458
+
459
+ if r .isLastSyncResourceVersionGone {
460
+ // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
461
+ // if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
462
+ // to the latest available ResourceVersion, using a consistent read from etcd.
463
+ return ""
464
+ }
465
+ if r .lastSyncResourceVersion == "" {
466
+ // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
467
+ // be served from the watch cache if it is enabled.
454
468
return "0"
455
469
}
456
- return lastSyncRV
470
+ return r .lastSyncResourceVersion
471
+ }
472
+
473
+ // setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
474
+ // expired error: HTTP 410 (Gone) Status Code.
475
+ func (r * Reflector ) setIsLastSyncResourceVersionExpired (isExpired bool ) {
476
+ r .lastSyncResourceVersionMutex .Lock ()
477
+ defer r .lastSyncResourceVersionMutex .Unlock ()
478
+ r .isLastSyncResourceVersionGone = isExpired
479
+ }
480
+
481
+ func isExpiredError (err error ) bool {
482
+ // In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and
483
+ // apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
484
+ // and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone
485
+ // check when we fully drop support for Kubernetes 1.17 servers from reflectors.
486
+ return apierrs .IsResourceExpired (err ) || apierrs .IsGone (err )
457
487
}
0 commit comments