Skip to content

Commit 3704174

Browse files
committed
Fix bug in reflector not recovering from "Too large resource version" errors
1 parent c5b3dba commit 3704174

File tree

4 files changed

+89
-30
lines changed

4 files changed

+89
-30
lines changed

staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,9 @@ const (
873873
// FieldManagerConflict is used to report when another client claims to manage this field,
874874
// It should only be returned for a request using server-side apply.
875875
CauseTypeFieldManagerConflict CauseType = "FieldManagerConflict"
876+
// CauseTypeResourceVersionTooLarge is used to report that the requested resource version
877+
// is newer than the data observed by the API server, so the request cannot be served.
878+
CauseTypeResourceVersionTooLarge CauseType = "ResourceVersionTooLarge"
876879
)
877880

878881
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

staging/src/k8s.io/apiserver/pkg/storage/errors.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,12 @@ var tooLargeResourceVersionCauseMsg = "Too large resource version"
177177
// a minimum resource version that is larger than the largest currently available resource version for a requested resource.
178178
func NewTooLargeResourceVersionError(minimumResourceVersion, currentRevision uint64, retrySeconds int) error {
179179
err := errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %d, current: %d", minimumResourceVersion, currentRevision), retrySeconds)
180-
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: tooLargeResourceVersionCauseMsg}}
180+
err.ErrStatus.Details.Causes = []metav1.StatusCause{
181+
{
182+
Type: metav1.CauseTypeResourceVersionTooLarge,
183+
Message: tooLargeResourceVersionCauseMsg,
184+
},
185+
}
181186
return err
182187
}
183188

@@ -186,15 +191,5 @@ func IsTooLargeResourceVersion(err error) bool {
186191
if !errors.IsTimeout(err) {
187192
return false
188193
}
189-
switch t := err.(type) {
190-
case errors.APIStatus:
191-
if d := t.Status().Details; d != nil {
192-
for _, cause := range d.Causes {
193-
if cause.Message == tooLargeResourceVersionCauseMsg {
194-
return true
195-
}
196-
}
197-
}
198-
}
199-
return false
194+
return errors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
200195
}

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ type Reflector struct {
8282
// observed when doing a sync with the underlying store
8383
// it is thread safe, but not synchronized with the underlying store
8484
lastSyncResourceVersion string
85-
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
86-
// failed with an HTTP 410 (Gone) status code.
87-
isLastSyncResourceVersionGone bool
85+
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
86+
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
87+
isLastSyncResourceVersionUnavailable bool
8888
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
8989
lastSyncResourceVersionMutex sync.RWMutex
9090
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
@@ -115,7 +115,7 @@ type WatchErrorHandler func(r *Reflector, err error)
115115
func DefaultWatchErrorHandler(r *Reflector, err error) {
116116
switch {
117117
case isExpiredError(err):
118-
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
118+
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
119119
// has a semantic that it returns data at least as fresh as provided RV.
120120
// So first try to LIST with setting RV to resource version of last observed object.
121121
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
@@ -288,13 +288,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
288288
}
289289

290290
list, paginatedResult, err = pager.List(context.Background(), options)
291-
if isExpiredError(err) {
292-
r.setIsLastSyncResourceVersionExpired(true)
293-
// Retry immediately if the resource version used to list is expired.
291+
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
292+
r.setIsLastSyncResourceVersionUnavailable(true)
293+
// Retry immediately if the resource version used to list is unavailable.
294294
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
295-
// continuation pages, but the pager might not be enabled, or the full list might fail because the
296-
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
297-
// to recover and ensure the reflector makes forward progress.
295+
// continuation pages, but the pager might not be enabled, the full list might fail because the
296+
// resource version it is listing at is expired or the cache may not yet be synced to the provided
297+
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
298+
// the reflector makes forward progress.
298299
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
299300
}
300301
close(listCh)
@@ -324,7 +325,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
324325
r.paginatedResult = true
325326
}
326327

327-
r.setIsLastSyncResourceVersionExpired(false) // list was successful
328+
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
328329
initTrace.Step("Objects listed")
329330
listMetaInterface, err := meta.ListAccessor(list)
330331
if err != nil {
@@ -415,7 +416,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
415416
if err != errorStopRequested {
416417
switch {
417418
case isExpiredError(err):
418-
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
419+
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
419420
// has a semantic that it returns data at least as fresh as provided RV.
420421
// So first try to LIST with setting RV to resource version of last observed object.
421422
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
@@ -538,9 +539,9 @@ func (r *Reflector) relistResourceVersion() string {
538539
r.lastSyncResourceVersionMutex.RLock()
539540
defer r.lastSyncResourceVersionMutex.RUnlock()
540541

541-
if r.isLastSyncResourceVersionGone {
542+
if r.isLastSyncResourceVersionUnavailable {
542543
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
543-
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
544+
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
544545
// to the latest available ResourceVersion, using a consistent read from etcd.
545546
return ""
546547
}
@@ -552,12 +553,12 @@ func (r *Reflector) relistResourceVersion() string {
552553
return r.lastSyncResourceVersion
553554
}
554555

555-
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
556-
// expired error: HTTP 410 (Gone) Status Code.
557-
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
556+
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
557+
// "expired" or "too large resource version" error.
558+
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
558559
r.lastSyncResourceVersionMutex.Lock()
559560
defer r.lastSyncResourceVersionMutex.Unlock()
560-
r.isLastSyncResourceVersionGone = isExpired
561+
r.isLastSyncResourceVersionUnavailable = isUnavailable
561562
}
562563

563564
func isExpiredError(err error) bool {
@@ -567,3 +568,7 @@ func isExpiredError(err error) bool {
567568
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
568569
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
569570
}
571+
572+
func isTooLargeResourceVersionError(err error) bool {
573+
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
574+
}

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,62 @@ func TestReflectorFullListIfExpired(t *testing.T) {
714714
}
715715
}
716716

717+
func TestReflectorFullListIfTooLarge(t *testing.T) {
718+
stopCh := make(chan struct{})
719+
s := NewStore(MetaNamespaceKeyFunc)
720+
listCallRVs := []string{}
721+
722+
lw := &testLW{
723+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
724+
// Stop once the reflector begins watching since we're only interested in the list.
725+
close(stopCh)
726+
fw := watch.NewFake()
727+
return fw, nil
728+
},
729+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
730+
listCallRVs = append(listCallRVs, options.ResourceVersion)
731+
732+
switch options.ResourceVersion {
733+
// initial list
734+
case "0":
735+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
736+
// relist after the initial list
737+
case "20":
738+
err := apierrors.NewTimeoutError("too large resource version", 1)
739+
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
740+
return nil, err
741+
// relist from etcd after "too large" error
742+
case "":
743+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
744+
default:
745+
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
746+
}
747+
},
748+
}
749+
r := NewReflector(lw, &v1.Pod{}, s, 0)
750+
751+
// Initial list should use RV=0
752+
if err := r.ListAndWatch(stopCh); err != nil {
753+
t.Fatal(err)
754+
}
755+
756+
// Relist from the future version.
757+
// This may happen, as watchcache is initialized from "current global etcd resource version"
758+
// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
759+
// may be synced to a different version and they will never converge.
760+
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
761+
// done we simply try to relist from now to avoid continuous errors on relists.
762+
stopCh = make(chan struct{})
763+
if err := r.ListAndWatch(stopCh); err != nil {
764+
t.Fatal(err)
765+
}
766+
767+
expectedRVs := []string{"0", "20", ""}
768+
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
769+
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
770+
}
771+
}
772+
717773
func TestReflectorSetExpectedType(t *testing.T) {
718774
obj := &unstructured.Unstructured{}
719775
gvk := schema.GroupVersionKind{

0 commit comments

Comments
 (0)