Skip to content

Commit 8ed2f47

Browse files
authored
Merge pull request kubernetes#83520 from jpbetz/reflector-relist-rv
Avoid going back in time in Reflector relist (revived)
2 parents 695c306 + 57b451c commit 8ed2f47

File tree

7 files changed

+263
-15
lines changed

7 files changed

+263
-15
lines changed

staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func NewApplyConflict(causes []metav1.StatusCause, message string) *StatusError
223223
}
224224

225225
// NewGone returns an error indicating the item no longer available at the server and no forwarding address is known.
226+
// DEPRECATED: Please use NewResourceExpired instead.
226227
func NewGone(message string) *StatusError {
227228
return &StatusError{metav1.Status{
228229
Status: metav1.StatusFailure,

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
468468
return result, nil
469469
}
470470
if resourceVersion < oldest-1 {
471-
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
471+
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
472472
}
473473

474474
// Binary search the smallest index at which resourceVersion is greater than the given one.

staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,8 @@ func TestWatch(t *testing.T) {
444444
}
445445
defer tooOldWatcher.Stop()
446446
// Ensure we get a "Gone" error
447-
expectedGoneError := errors.NewGone("").ErrStatus
448-
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)
447+
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
448+
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
449449

450450
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
451451
if err != nil {
@@ -668,8 +668,8 @@ func TestEmptyWatchEventCache(t *testing.T) {
668668
t.Fatalf("Unexpected error: %v", err)
669669
}
670670
defer watcher.Stop()
671-
expectedGoneError := errors.NewGone("").ErrStatus
672-
verifyWatchEvent(t, watcher, watch.Error, &expectedGoneError)
671+
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
672+
verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError)
673673
}
674674

675675
{

staging/src/k8s.io/client-go/tools/cache/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ go_test(
2727
race = "off",
2828
deps = [
2929
"//staging/src/k8s.io/api/core/v1:go_default_library",
30+
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
3031
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
3132
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
3233
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ type Reflector struct {
7474
// observed when doing a sync with the underlying store
7575
// it is thread safe, but not synchronized with the underlying store
7676
lastSyncResourceVersion string
77+
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
78+
// failed with an HTTP 410 (Gone) status code.
79+
isLastSyncResourceVersionGone bool
7780
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
7881
lastSyncResourceVersionMutex sync.RWMutex
7982
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
@@ -185,10 +188,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
185188
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
186189
var resourceVersion string
187190

188-
// Explicitly set "0" as resource version - it's fine for the List()
189-
// to be served from cache and potentially be delayed relative to
190-
// etcd contents. Reflector framework will catch up via Watch() eventually.
191-
options := metav1.ListOptions{ResourceVersion: "0"}
191+
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
192192

193193
if err := func() error {
194194
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
@@ -211,8 +211,17 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
211211
if r.WatchListPageSize != 0 {
212212
pager.PageSize = r.WatchListPageSize
213213
}
214-
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
214+
215215
list, err = pager.List(context.Background(), options)
216+
if isExpiredError(err) {
217+
r.setIsLastSyncResourceVersionExpired(true)
218+
// Retry immediately if the resource version used to list is expired.
219+
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
220+
// continuation pages, but the pager might not be enabled, or the full list might fail because the
221+
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
222+
// to recover and ensure the reflector makes forward progress.
223+
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
224+
}
216225
close(listCh)
217226
}()
218227
select {
@@ -225,6 +234,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
225234
if err != nil {
226235
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
227236
}
237+
r.setIsLastSyncResourceVersionExpired(false) // list was successful
228238
initTrace.Step("Objects listed")
229239
listMetaInterface, err := meta.ListAccessor(list)
230240
if err != nil {
@@ -298,10 +308,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
298308

299309
w, err := r.listerWatcher.Watch(options)
300310
if err != nil {
301-
switch err {
302-
case io.EOF:
311+
switch {
312+
case isExpiredError(err):
313+
r.setIsLastSyncResourceVersionExpired(true)
314+
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
315+
case err == io.EOF:
303316
// watch closed normally
304-
case io.ErrUnexpectedEOF:
317+
case err == io.ErrUnexpectedEOF:
305318
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
306319
default:
307320
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
@@ -320,7 +333,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
320333
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
321334
if err != errorStopRequested {
322335
switch {
323-
case apierrs.IsResourceExpired(err):
336+
case isExpiredError(err):
337+
r.setIsLastSyncResourceVersionExpired(true)
324338
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
325339
default:
326340
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
@@ -432,3 +446,42 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
432446
defer r.lastSyncResourceVersionMutex.Unlock()
433447
r.lastSyncResourceVersion = v
434448
}
449+
450+
// relistResourceVersion determines the resource version the reflector should list or relist from.
451+
// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
452+
// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
453+
// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
454+
// etcd via a quorum read.
455+
func (r *Reflector) relistResourceVersion() string {
456+
r.lastSyncResourceVersionMutex.RLock()
457+
defer r.lastSyncResourceVersionMutex.RUnlock()
458+
459+
if r.isLastSyncResourceVersionGone {
460+
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
461+
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
462+
// to the latest available ResourceVersion, using a consistent read from etcd.
463+
return ""
464+
}
465+
if r.lastSyncResourceVersion == "" {
466+
// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
467+
// be served from the watch cache if it is enabled.
468+
return "0"
469+
}
470+
return r.lastSyncResourceVersion
471+
}
472+
473+
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
474+
// expired error: HTTP 410 (Gone) Status Code.
475+
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
476+
r.lastSyncResourceVersionMutex.Lock()
477+
defer r.lastSyncResourceVersionMutex.Unlock()
478+
r.isLastSyncResourceVersionGone = isExpired
479+
}
480+
481+
func isExpiredError(err error) bool {
482+
// In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and
483+
// apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
484+
// and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone
485+
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
486+
return apierrs.IsResourceExpired(err) || apierrs.IsGone(err)
487+
}

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"k8s.io/api/core/v1"
29+
apierrs "k8s.io/apimachinery/pkg/api/errors"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3132
"k8s.io/apimachinery/pkg/runtime"
@@ -434,6 +435,194 @@ func TestReflectorWatchListPageSize(t *testing.T) {
434435
}
435436
}
436437

438+
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
439+
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
440+
// etcd that is partitioned and serving older data than the reflector has already processed.
441+
func TestReflectorResyncWithResourceVersion(t *testing.T) {
442+
stopCh := make(chan struct{})
443+
s := NewStore(MetaNamespaceKeyFunc)
444+
listCallRVs := []string{}
445+
446+
lw := &testLW{
447+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
448+
// Stop once the reflector begins watching since we're only interested in the list.
449+
close(stopCh)
450+
fw := watch.NewFake()
451+
return fw, nil
452+
},
453+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
454+
listCallRVs = append(listCallRVs, options.ResourceVersion)
455+
pods := make([]v1.Pod, 8)
456+
for i := 0; i < 8; i++ {
457+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
458+
}
459+
switch options.ResourceVersion {
460+
case "0":
461+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
462+
case "10":
463+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
464+
default:
465+
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
466+
}
467+
return nil, nil
468+
},
469+
}
470+
r := NewReflector(lw, &v1.Pod{}, s, 0)
471+
472+
// Initial list should use RV=0
473+
r.ListAndWatch(stopCh)
474+
475+
results := s.List()
476+
if len(results) != 4 {
477+
t.Errorf("Expected 4 results, got %d", len(results))
478+
}
479+
480+
// relist should use lastSyncResourceVersions (RV=10)
481+
stopCh = make(chan struct{})
482+
r.ListAndWatch(stopCh)
483+
484+
results = s.List()
485+
if len(results) != 8 {
486+
t.Errorf("Expected 8 results, got %d", len(results))
487+
}
488+
489+
expectedRVs := []string{"0", "10"}
490+
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
491+
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
492+
}
493+
}
494+
495+
// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier
496+
// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion,
497+
// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it).
498+
// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than
499+
// the requested ResourceVersion).
500+
func TestReflectorExpiredExactResourceVersion(t *testing.T) {
501+
stopCh := make(chan struct{})
502+
s := NewStore(MetaNamespaceKeyFunc)
503+
listCallRVs := []string{}
504+
505+
lw := &testLW{
506+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
507+
// Stop once the reflector begins watching since we're only interested in the list.
508+
close(stopCh)
509+
fw := watch.NewFake()
510+
return fw, nil
511+
},
512+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
513+
listCallRVs = append(listCallRVs, options.ResourceVersion)
514+
pods := make([]v1.Pod, 8)
515+
for i := 0; i < 8; i++ {
516+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
517+
}
518+
switch options.ResourceVersion {
519+
case "0":
520+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
521+
case "10":
522+
// When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned.
523+
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
524+
case "":
525+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
526+
default:
527+
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
528+
}
529+
return nil, nil
530+
},
531+
}
532+
r := NewReflector(lw, &v1.Pod{}, s, 0)
533+
534+
// Initial list should use RV=0
535+
r.ListAndWatch(stopCh)
536+
537+
results := s.List()
538+
if len(results) != 4 {
539+
t.Errorf("Expected 4 results, got %d", len(results))
540+
}
541+
542+
// relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="".
543+
stopCh = make(chan struct{})
544+
r.ListAndWatch(stopCh)
545+
546+
results = s.List()
547+
if len(results) != 8 {
548+
t.Errorf("Expected 8 results, got %d", len(results))
549+
}
550+
551+
expectedRVs := []string{"0", "10", ""}
552+
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
553+
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
554+
}
555+
}
556+
557+
func TestReflectorFullListIfExpired(t *testing.T) {
558+
stopCh := make(chan struct{})
559+
s := NewStore(MetaNamespaceKeyFunc)
560+
listCallRVs := []string{}
561+
562+
lw := &testLW{
563+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
564+
// Stop once the reflector begins watching since we're only interested in the list.
565+
close(stopCh)
566+
fw := watch.NewFake()
567+
return fw, nil
568+
},
569+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
570+
listCallRVs = append(listCallRVs, options.ResourceVersion)
571+
pods := make([]v1.Pod, 8)
572+
for i := 0; i < 8; i++ {
573+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
574+
}
575+
switch options.ResourceVersion {
576+
case "0":
577+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
578+
case "10":
579+
switch options.Limit {
580+
case 4:
581+
switch options.Continue {
582+
case "":
583+
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
584+
case "C1":
585+
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
586+
default:
587+
t.Fatalf("Unrecognized Continue: %s", options.Continue)
588+
}
589+
case 0:
590+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
591+
default:
592+
t.Fatalf("Unrecognized Limit: %d", options.Limit)
593+
}
594+
default:
595+
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
596+
}
597+
return nil, nil
598+
},
599+
}
600+
r := NewReflector(lw, &v1.Pod{}, s, 0)
601+
r.WatchListPageSize = 4
602+
603+
// Initial list should use RV=0
604+
r.ListAndWatch(stopCh)
605+
606+
results := s.List()
607+
if len(results) != 4 {
608+
t.Errorf("Expected 4 results, got %d", len(results))
609+
}
610+
611+
// relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10
612+
stopCh = make(chan struct{})
613+
r.ListAndWatch(stopCh)
614+
615+
results = s.List()
616+
if len(results) != 8 {
617+
t.Errorf("Expected 8 results, got %d", len(results))
618+
}
619+
620+
expectedRVs := []string{"0", "10", "10", "10"}
621+
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
622+
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
623+
}
624+
}
625+
437626
func TestReflectorSetExpectedType(t *testing.T) {
438627
obj := &unstructured.Unstructured{}
439628
gvk := schema.GroupVersionKind{

staging/src/k8s.io/client-go/tools/pager/pager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
8787

8888
obj, err := p.PageFn(ctx, options)
8989
if err != nil {
90-
if !errors.IsResourceExpired(err) || !p.FullListIfExpired {
90+
// Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
91+
// the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
92+
// failing when the resource versions is established by the first page request falls out of the compaction
93+
// during the subsequent list requests).
94+
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
9195
return nil, err
9296
}
9397
// the list expired while we were processing, fall back to a full list

0 commit comments

Comments
 (0)