Skip to content

Commit a8ef6e9

Browse files
committed
Implement ResilientWatchCacheInitialization
1 parent 9fc0315 commit a8ef6e9

File tree

6 files changed

+306
-88
lines changed

6 files changed

+306
-88
lines changed

pkg/features/kube_features.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,6 +1235,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
12351235

12361236
genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
12371237

1238+
genericfeatures.ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta},
1239+
12381240
genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
12391241

12401242
genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29

staging/src/k8s.io/apiserver/pkg/features/kube_features.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ const (
173173
// to a chunking list request.
174174
RemainingItemCount featuregate.Feature = "RemainingItemCount"
175175

176+
// owner: @wojtek-t
177+
// beta: v1.31
178+
//
179+
// Enables resilient watchcache initialization to avoid controlplane
180+
// overload.
181+
ResilientWatchCacheInitialization featuregate.Feature = "ResilientWatchCacheInitialization"
182+
176183
// owner: @serathius
177184
// beta: v1.30
178185
//
@@ -353,6 +360,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
353360

354361
RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
355362

363+
ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta},
364+
356365
RetryGenerateName: {Default: true, PreRelease: featuregate.Beta},
357366

358367
SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -532,9 +532,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
532532
return nil, err
533533
}
534534

535-
readyGeneration, err := c.ready.waitAndReadGeneration(ctx)
536-
if err != nil {
537-
return nil, errors.NewServiceUnavailable(err.Error())
535+
var readyGeneration int
536+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
537+
var ok bool
538+
readyGeneration, ok = c.ready.checkAndReadGeneration()
539+
if !ok {
540+
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
541+
}
542+
} else {
543+
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
544+
if err != nil {
545+
return nil, errors.NewServiceUnavailable(err.Error())
546+
}
538547
}
539548

540549
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
@@ -676,6 +685,14 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
676685
return c.storage.Get(ctx, key, opts, objPtr)
677686
}
678687

688+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
689+
if !c.ready.check() {
690+
// If Cache is not initialized, delegate Get requests to storage
691+
// as described in https://kep.k8s.io/4568
692+
return c.storage.Get(ctx, key, opts, objPtr)
693+
}
694+
}
695+
679696
// If resourceVersion is specified, serve it from cache.
680697
// It's guaranteed that the returned value is at least that
681698
// fresh as the given resourceVersion.
@@ -684,16 +701,18 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
684701
return err
685702
}
686703

687-
if getRV == 0 && !c.ready.check() {
688-
// If Cacher is not yet initialized and we don't require any specific
689-
// minimal resource version, simply forward the request to storage.
690-
return c.storage.Get(ctx, key, opts, objPtr)
691-
}
692-
693704
// Do not create a trace - it's not for free and there are tons
694705
// of Get requests. We can add it if it will be really needed.
695-
if err := c.ready.wait(ctx); err != nil {
696-
return errors.NewServiceUnavailable(err.Error())
706+
707+
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
708+
if getRV == 0 && !c.ready.check() {
709+
// If Cacher is not yet initialized and we don't require any specific
710+
// minimal resource version, simply forward the request to storage.
711+
return c.storage.Get(ctx, key, opts, objPtr)
712+
}
713+
if err := c.ready.wait(ctx); err != nil {
714+
return errors.NewServiceUnavailable(err.Error())
715+
}
697716
}
698717

699718
objVal, err := conversion.EnforcePtr(objPtr)
@@ -743,6 +762,14 @@ func shouldDelegateList(opts storage.ListOptions) bool {
743762
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
744763
}
745764

765+
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
766+
pred := opts.Predicate
767+
noLabelSelector := pred.Label == nil || pred.Label.Empty()
768+
noFieldSelector := pred.Field == nil || pred.Field.Empty()
769+
hasLimit := pred.Limit > 0
770+
return noLabelSelector && noFieldSelector && hasLimit
771+
}
772+
746773
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
747774
if !recursive {
748775
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
@@ -770,10 +797,19 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
770797
if err != nil {
771798
return err
772799
}
773-
if listRV == 0 && !c.ready.check() {
774-
// If Cacher is not yet initialized and we don't require any specific
775-
// minimal resource version, simply forward the request to storage.
776-
return c.storage.GetList(ctx, key, opts, listObj)
800+
801+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
802+
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
803+
// If Cacher is not initialized, delegate List requests to storage
804+
// as described in https://kep.k8s.io/4568
805+
return c.storage.GetList(ctx, key, opts, listObj)
806+
}
807+
} else {
808+
if listRV == 0 && !c.ready.check() {
809+
// If Cacher is not yet initialized and we don't require any specific
810+
// minimal resource version, simply forward the request to storage.
811+
return c.storage.GetList(ctx, key, opts, listObj)
812+
}
777813
}
778814
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
779815
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
@@ -788,8 +824,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
788824
attribute.Stringer("type", c.groupResource))
789825
defer span.End(500 * time.Millisecond)
790826

791-
if err := c.ready.wait(ctx); err != nil {
792-
return errors.NewServiceUnavailable(err.Error())
827+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
828+
if !c.ready.check() {
829+
// If Cacher is not initialized, reject List requests
830+
// as described in https://kep.k8s.io/4568
831+
return errors.NewTooManyRequests("storage is (re)initializing", 1)
832+
}
833+
} else {
834+
if err := c.ready.wait(ctx); err != nil {
835+
return errors.NewServiceUnavailable(err.Error())
836+
}
793837
}
794838
span.AddEvent("Ready")
795839

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,14 +464,25 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context
464464
t.Fatalf("Failed to inject list errors: %v", err)
465465
}
466466

467+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
468+
// The tests assume that Get/GetList/Watch calls shouldn't fail.
469+
// However, 429 error can now be returned if watchcache is under initialization.
470+
// To avoid rewriting all tests, we wait for watcache to initialize.
471+
if err := cacher.ready.wait(ctx); err != nil {
472+
t.Fatal(err)
473+
}
474+
}
475+
467476
return ctx, cacher, server, terminate
468477
}
469478

470479
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
471480
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
472481

473-
if err := cacher.ready.wait(context.TODO()); err != nil {
474-
t.Fatalf("unexpected error waiting for the cache to be ready")
482+
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
483+
if err := cacher.ready.wait(context.TODO()); err != nil {
484+
t.Fatalf("unexpected error waiting for the cache to be ready")
485+
}
475486
}
476487
return &createWrapper{Cacher: cacher}, tearDown
477488
}

0 commit comments

Comments
 (0)