Skip to content

Commit 1f3dc14

Browse files
committed
Handle expired errors with RV>0 in pager, don't full list if 1st page is expired
1 parent b2b285a commit 1f3dc14

File tree

4 files changed

+223
-15
lines changed

4 files changed

+223
-15
lines changed

staging/src/k8s.io/client-go/tools/cache/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ go_test(
2727
race = "off",
2828
deps = [
2929
"//staging/src/k8s.io/api/core/v1:go_default_library",
30+
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
3031
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
3132
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
3233
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,6 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
114114
period: time.Second,
115115
resyncPeriod: resyncPeriod,
116116
clock: &clock.RealClock{},
117-
// We set lastSyncResourceVersion to "0", because it's the value which
118-
// we set as ResourceVersion to the first List() request.
119-
lastSyncResourceVersion: "0",
120117
}
121118
r.setExpectedType(expectedType)
122119
return r
@@ -188,16 +185,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
188185
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
189186
var resourceVersion string
190187

191-
// Explicitly set resource version to have it list from cache for
192-
// performance reasons.
193-
// It's fine for the returned state to be stale (we will catch up via
194-
// Watch() eventually), but can't set "0" to avoid going back in time
195-
// if we hit apiserver that is significantly delayed compared to the
196-
// state we already had.
197-
// TODO: There is still a potential to go back in time after component
198-
// restart when we set ResourceVersion: "0". For more details see:
199-
// https://github.com/kubernetes/kubernetes/issues/59848
200-
options := metav1.ListOptions{ResourceVersion: r.LastSyncResourceVersion()}
188+
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
201189

202190
if err := func() error {
203191
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
@@ -220,8 +208,20 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
220208
if r.WatchListPageSize != 0 {
221209
pager.PageSize = r.WatchListPageSize
222210
}
223-
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
211+
// Pager falls back to full list if paginated list calls fail due to an "Expired" error on the 2nd page or later,
212+
// but still my return an "Expired" error if the 1st page fails with "Expired" or the full list fails with "Expired".
224213
list, err = pager.List(context.Background(), options)
214+
if apierrs.IsResourceExpired(err) {
215+
// For Kubernetes 1.16 and earlier, if the watch cache is disabled for a resource, list requests
216+
// with LastSyncResourceVersion set to a non-zero ResourceVersion will fail if the exact ResourceVersion
217+
// requested is expired (e.g. an etcd compaction has remove it).
218+
// To prevent the reflector from getting stuck retrying a list for an expired resource version in this
219+
// case, we set ResourceVersion="" and list again to re-establish reflector to the latest available
220+
// ResourceVersion, using a consistent read from etcd. This is also safe to do if watch cache is enabled
221+
// and the list request returned a "Expired" error.
222+
options = metav1.ListOptions{ResourceVersion: ""}
223+
list, err = pager.List(context.Background(), options)
224+
}
225225
close(listCh)
226226
}()
227227
select {
@@ -441,3 +441,17 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
441441
defer r.lastSyncResourceVersionMutex.Unlock()
442442
r.lastSyncResourceVersion = v
443443
}
444+
445+
// relistResourceVersion is the resource version the reflector should list or relist from.
446+
func (r *Reflector) relistResourceVersion() string {
447+
lastSyncRV := r.LastSyncResourceVersion()
448+
if lastSyncRV == "" {
449+
// Explicitly set resource version to have it list from cache for
450+
// performance reasons.
451+
// It's fine for the returned state to be stale (we will catch up via Watch()
452+
// eventually), but we need to be at least as new as the last resource version we
453+
// synced to avoid going back in time.
454+
return "0"
455+
}
456+
return lastSyncRV
457+
}

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"k8s.io/api/core/v1"
29+
apierrs "k8s.io/apimachinery/pkg/api/errors"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3132
"k8s.io/apimachinery/pkg/runtime"
@@ -434,6 +435,194 @@ func TestReflectorWatchListPageSize(t *testing.T) {
434435
}
435436
}
436437

438+
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
439+
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
440+
// etcd that is partitioned and serving older data than the reflector has already processed.
441+
func TestReflectorResyncWithResourceVersion(t *testing.T) {
442+
stopCh := make(chan struct{})
443+
s := NewStore(MetaNamespaceKeyFunc)
444+
listCallRVs := []string{}
445+
446+
lw := &testLW{
447+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
448+
// Stop once the reflector begins watching since we're only interested in the list.
449+
close(stopCh)
450+
fw := watch.NewFake()
451+
return fw, nil
452+
},
453+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
454+
listCallRVs = append(listCallRVs, options.ResourceVersion)
455+
pods := make([]v1.Pod, 8)
456+
for i := 0; i < 8; i++ {
457+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
458+
}
459+
switch options.ResourceVersion {
460+
case "0":
461+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
462+
case "10":
463+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
464+
default:
465+
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
466+
}
467+
return nil, nil
468+
},
469+
}
470+
r := NewReflector(lw, &v1.Pod{}, s, 0)
471+
472+
// Initial list should use RV=0
473+
r.ListAndWatch(stopCh)
474+
475+
results := s.List()
476+
if len(results) != 4 {
477+
t.Errorf("Expected 4 results, got %d", len(results))
478+
}
479+
480+
// relist should use lastSyncResourceVersions (RV=10)
481+
stopCh = make(chan struct{})
482+
r.ListAndWatch(stopCh)
483+
484+
results = s.List()
485+
if len(results) != 8 {
486+
t.Errorf("Expected 8 results, got %d", len(results))
487+
}
488+
489+
expectedRVs := []string{"0", "10"}
490+
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
491+
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
492+
}
493+
}
494+
495+
// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier
496+
// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion,
497+
// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it).
498+
// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than
499+
// the requested ResourceVersion).
500+
func TestReflectorExpiredExactResourceVersion(t *testing.T) {
501+
stopCh := make(chan struct{})
502+
s := NewStore(MetaNamespaceKeyFunc)
503+
listCallRVs := []string{}
504+
505+
lw := &testLW{
506+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
507+
// Stop once the reflector begins watching since we're only interested in the list.
508+
close(stopCh)
509+
fw := watch.NewFake()
510+
return fw, nil
511+
},
512+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
513+
listCallRVs = append(listCallRVs, options.ResourceVersion)
514+
pods := make([]v1.Pod, 8)
515+
for i := 0; i < 8; i++ {
516+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
517+
}
518+
switch options.ResourceVersion {
519+
case "0":
520+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
521+
case "10":
522+
// When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned.
523+
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
524+
case "":
525+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
526+
default:
527+
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
528+
}
529+
return nil, nil
530+
},
531+
}
532+
r := NewReflector(lw, &v1.Pod{}, s, 0)
533+
534+
// Initial list should use RV=0
535+
r.ListAndWatch(stopCh)
536+
537+
results := s.List()
538+
if len(results) != 4 {
539+
t.Errorf("Expected 4 results, got %d", len(results))
540+
}
541+
542+
// relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="".
543+
stopCh = make(chan struct{})
544+
r.ListAndWatch(stopCh)
545+
546+
results = s.List()
547+
if len(results) != 8 {
548+
t.Errorf("Expected 8 results, got %d", len(results))
549+
}
550+
551+
expectedRVs := []string{"0", "10", ""}
552+
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
553+
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
554+
}
555+
}
556+
557+
func TestReflectorFullListIfExpired(t *testing.T) {
558+
stopCh := make(chan struct{})
559+
s := NewStore(MetaNamespaceKeyFunc)
560+
listCallRVs := []string{}
561+
562+
lw := &testLW{
563+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
564+
// Stop once the reflector begins watching since we're only interested in the list.
565+
close(stopCh)
566+
fw := watch.NewFake()
567+
return fw, nil
568+
},
569+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
570+
listCallRVs = append(listCallRVs, options.ResourceVersion)
571+
pods := make([]v1.Pod, 8)
572+
for i := 0; i < 8; i++ {
573+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
574+
}
575+
switch options.ResourceVersion {
576+
case "0":
577+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
578+
case "10":
579+
switch options.Limit {
580+
case 4:
581+
switch options.Continue {
582+
case "":
583+
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
584+
case "C1":
585+
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
586+
default:
587+
t.Fatalf("Unrecognized Continue: %s", options.Continue)
588+
}
589+
case 0:
590+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
591+
default:
592+
t.Fatalf("Unrecognized Limit: %d", options.Limit)
593+
}
594+
default:
595+
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
596+
}
597+
return nil, nil
598+
},
599+
}
600+
r := NewReflector(lw, &v1.Pod{}, s, 0)
601+
r.WatchListPageSize = 4
602+
603+
// Initial list should use RV=0
604+
r.ListAndWatch(stopCh)
605+
606+
results := s.List()
607+
if len(results) != 4 {
608+
t.Errorf("Expected 4 results, got %d", len(results))
609+
}
610+
611+
// relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10
612+
stopCh = make(chan struct{})
613+
r.ListAndWatch(stopCh)
614+
615+
results = s.List()
616+
if len(results) != 8 {
617+
t.Errorf("Expected 8 results, got %d", len(results))
618+
}
619+
620+
expectedRVs := []string{"0", "10", "10", "10"}
621+
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
622+
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
623+
}
624+
}
625+
437626
func TestReflectorSetExpectedType(t *testing.T) {
438627
obj := &unstructured.Unstructured{}
439628
gvk := schema.GroupVersionKind{

staging/src/k8s.io/client-go/tools/pager/pager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
8787

8888
obj, err := p.PageFn(ctx, options)
8989
if err != nil {
90-
if !errors.IsResourceExpired(err) || !p.FullListIfExpired {
90+
// Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
91+
// the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
92+
// failing when the resource versions is established by the first page request falls out of the compaction
93+
// during the subsequent list requests).
94+
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
9195
return nil, err
9296
}
9397
// the list expired while we were processing, fall back to a full list

0 commit comments

Comments
 (0)