Skip to content

Commit eef6c60

Browse files
authored
Merge pull request kubernetes#124642 from wojtek-t/resilient_watchcache_initialization
Implement ResilientWatchCacheInitialization
2 parents f057f2d + 2854d84 commit eef6c60

File tree

7 files changed

+376
-136
lines changed

7 files changed

+376
-136
lines changed

pkg/features/kube_features.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
12361236

12371237
genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
12381238

1239+
genericfeatures.ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta},
1240+
12391241
genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
12401242

12411243
genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29

staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,74 @@ func TestChangeCRD(t *testing.T) {
5959
ns := "default"
6060
noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
6161

62-
stopChan := make(chan struct{})
62+
updateCRD := func() {
63+
noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{})
64+
if err != nil {
65+
t.Error(err)
66+
return
67+
}
68+
if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
69+
v2 := noxuDefinitionToUpdate.Spec.Versions[0]
70+
v2.Name = "v2"
71+
v2.Served = true
72+
v2.Storage = false
73+
noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
74+
} else {
75+
noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
76+
}
77+
if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
78+
t.Error(err)
79+
}
80+
}
6381

82+
// Set up 10 watchers for custom resource.
83+
// We can't exercise them in a loop the same way as get requests, as watchcache
84+
// can reject them with 429 and Retry-After: 1 if it is uninitialized and even
85+
// though 429 is automatically retried, with frequent watchcache terminations and
86+
// reinitializations they could either end-up being rejected N times and fail or
87+
// or not initialize until the last watchcache reinitialization and then not be
88+
// terminated. Thus we exercise their termination explicitly at the beginning.
6489
wg := &sync.WaitGroup{}
90+
for i := 0; i < 10; i++ {
91+
wg.Add(1)
92+
go func(i int) {
93+
defer wg.Done()
94+
95+
w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{})
96+
if err != nil {
97+
t.Errorf("unexpected error establishing watch: %v", err)
98+
return
99+
}
100+
for event := range w.ResultChan() {
101+
switch event.Type {
102+
case watch.Added, watch.Modified, watch.Deleted:
103+
// all expected
104+
default:
105+
t.Errorf("unexpected watch event: %#v", event)
106+
}
107+
}
108+
}(i)
109+
}
110+
111+
// Let all the established watches soak request loops soak
112+
time.Sleep(5 * time.Second)
113+
114+
// Update CRD and ensure that all watches are gracefully terminated.
115+
updateCRD()
116+
117+
drained := make(chan struct{})
118+
go func() {
119+
defer close(drained)
120+
wg.Wait()
121+
}()
122+
123+
select {
124+
case <-drained:
125+
case <-time.After(wait.ForeverTestTimeout):
126+
t.Fatal("timed out waiting for watchers to be terminated")
127+
}
128+
129+
stopChan := make(chan struct{})
65130

66131
// Set up loop to modify CRD in the background
67132
wg.Add(1)
@@ -76,28 +141,11 @@ func TestChangeCRD(t *testing.T) {
76141

77142
time.Sleep(10 * time.Millisecond)
78143

79-
noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{})
80-
if err != nil {
81-
t.Error(err)
82-
continue
83-
}
84-
if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
85-
v2 := noxuDefinitionToUpdate.Spec.Versions[0]
86-
v2.Name = "v2"
87-
v2.Served = true
88-
v2.Storage = false
89-
noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
90-
} else {
91-
noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
92-
}
93-
if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
94-
t.Error(err)
95-
continue
96-
}
144+
updateCRD()
97145
}
98146
}()
99147

100-
// Set up 10 loops creating and reading and watching custom resources
148+
// Set up 10 loops creating and reading custom resources
101149
for i := 0; i < 10; i++ {
102150
wg.Add(1)
103151
go func(i int) {
@@ -120,32 +168,6 @@ func TestChangeCRD(t *testing.T) {
120168
}
121169
}
122170
}(i)
123-
124-
wg.Add(1)
125-
go func(i int) {
126-
defer wg.Done()
127-
for {
128-
time.Sleep(10 * time.Millisecond)
129-
select {
130-
case <-stopChan:
131-
return
132-
default:
133-
w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{})
134-
if err != nil {
135-
t.Errorf("unexpected error establishing watch: %v", err)
136-
continue
137-
}
138-
for event := range w.ResultChan() {
139-
switch event.Type {
140-
case watch.Added, watch.Modified, watch.Deleted:
141-
// all expected
142-
default:
143-
t.Errorf("unexpected watch event: %#v", event)
144-
}
145-
}
146-
}
147-
}
148-
}(i)
149171
}
150172

151173
// Let all the established get request loops soak
@@ -155,7 +177,7 @@ func TestChangeCRD(t *testing.T) {
155177
close(stopChan)
156178

157179
// Let loops drain
158-
drained := make(chan struct{})
180+
drained = make(chan struct{})
159181
go func() {
160182
defer close(drained)
161183
wg.Wait()
@@ -164,6 +186,6 @@ func TestChangeCRD(t *testing.T) {
164186
select {
165187
case <-drained:
166188
case <-time.After(wait.ForeverTestTimeout):
167-
t.Error("timed out waiting for clients to complete")
189+
t.Fatal("timed out waiting for clients to complete")
168190
}
169191
}

staging/src/k8s.io/apiserver/pkg/features/kube_features.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ const (
173173
// to a chunking list request.
174174
RemainingItemCount featuregate.Feature = "RemainingItemCount"
175175

176+
// owner: @wojtek-t
177+
// beta: v1.31
178+
//
179+
// Enables resilient watchcache initialization to avoid controlplane
180+
// overload.
181+
ResilientWatchCacheInitialization featuregate.Feature = "ResilientWatchCacheInitialization"
182+
176183
// owner: @serathius
177184
// beta: v1.30
178185
//
@@ -353,6 +360,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
353360

354361
RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
355362

363+
ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta},
364+
356365
RetryGenerateName: {Default: true, PreRelease: featuregate.Beta},
357366

358367
SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -532,9 +532,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
532532
return nil, err
533533
}
534534

535-
readyGeneration, err := c.ready.waitAndReadGeneration(ctx)
536-
if err != nil {
537-
return nil, errors.NewServiceUnavailable(err.Error())
535+
var readyGeneration int
536+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
537+
var ok bool
538+
readyGeneration, ok = c.ready.checkAndReadGeneration()
539+
if !ok {
540+
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
541+
}
542+
} else {
543+
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
544+
if err != nil {
545+
return nil, errors.NewServiceUnavailable(err.Error())
546+
}
538547
}
539548

540549
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
@@ -676,6 +685,14 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
676685
return c.storage.Get(ctx, key, opts, objPtr)
677686
}
678687

688+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
689+
if !c.ready.check() {
690+
// If Cache is not initialized, delegate Get requests to storage
691+
// as described in https://kep.k8s.io/4568
692+
return c.storage.Get(ctx, key, opts, objPtr)
693+
}
694+
}
695+
679696
// If resourceVersion is specified, serve it from cache.
680697
// It's guaranteed that the returned value is at least that
681698
// fresh as the given resourceVersion.
@@ -684,16 +701,18 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
684701
return err
685702
}
686703

687-
if getRV == 0 && !c.ready.check() {
688-
// If Cacher is not yet initialized and we don't require any specific
689-
// minimal resource version, simply forward the request to storage.
690-
return c.storage.Get(ctx, key, opts, objPtr)
691-
}
692-
693704
// Do not create a trace - it's not for free and there are tons
694705
// of Get requests. We can add it if it will be really needed.
695-
if err := c.ready.wait(ctx); err != nil {
696-
return errors.NewServiceUnavailable(err.Error())
706+
707+
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
708+
if getRV == 0 && !c.ready.check() {
709+
// If Cacher is not yet initialized and we don't require any specific
710+
// minimal resource version, simply forward the request to storage.
711+
return c.storage.Get(ctx, key, opts, objPtr)
712+
}
713+
if err := c.ready.wait(ctx); err != nil {
714+
return errors.NewServiceUnavailable(err.Error())
715+
}
697716
}
698717

699718
objVal, err := conversion.EnforcePtr(objPtr)
@@ -743,6 +762,14 @@ func shouldDelegateList(opts storage.ListOptions) bool {
743762
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
744763
}
745764

765+
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
766+
pred := opts.Predicate
767+
noLabelSelector := pred.Label == nil || pred.Label.Empty()
768+
noFieldSelector := pred.Field == nil || pred.Field.Empty()
769+
hasLimit := pred.Limit > 0
770+
return noLabelSelector && noFieldSelector && hasLimit
771+
}
772+
746773
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
747774
if !recursive {
748775
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
@@ -770,10 +797,19 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
770797
if err != nil {
771798
return err
772799
}
773-
if listRV == 0 && !c.ready.check() {
774-
// If Cacher is not yet initialized and we don't require any specific
775-
// minimal resource version, simply forward the request to storage.
776-
return c.storage.GetList(ctx, key, opts, listObj)
800+
801+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
802+
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
803+
// If Cacher is not initialized, delegate List requests to storage
804+
// as described in https://kep.k8s.io/4568
805+
return c.storage.GetList(ctx, key, opts, listObj)
806+
}
807+
} else {
808+
if listRV == 0 && !c.ready.check() {
809+
// If Cacher is not yet initialized and we don't require any specific
810+
// minimal resource version, simply forward the request to storage.
811+
return c.storage.GetList(ctx, key, opts, listObj)
812+
}
777813
}
778814
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
779815
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
@@ -788,8 +824,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
788824
attribute.Stringer("type", c.groupResource))
789825
defer span.End(500 * time.Millisecond)
790826

791-
if err := c.ready.wait(ctx); err != nil {
792-
return errors.NewServiceUnavailable(err.Error())
827+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
828+
if !c.ready.check() {
829+
// If Cacher is not initialized, reject List requests
830+
// as described in https://kep.k8s.io/4568
831+
return errors.NewTooManyRequests("storage is (re)initializing", 1)
832+
}
833+
} else {
834+
if err := c.ready.wait(ctx); err != nil {
835+
return errors.NewServiceUnavailable(err.Error())
836+
}
793837
}
794838
span.AddEvent("Ready")
795839

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,14 +464,25 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context
464464
t.Fatalf("Failed to inject list errors: %v", err)
465465
}
466466

467+
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
468+
// The tests assume that Get/GetList/Watch calls shouldn't fail.
469+
// However, 429 error can now be returned if watchcache is under initialization.
470+
// To avoid rewriting all tests, we wait for watcache to initialize.
471+
if err := cacher.ready.wait(ctx); err != nil {
472+
t.Fatal(err)
473+
}
474+
}
475+
467476
return ctx, cacher, server, terminate
468477
}
469478

470479
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
471480
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
472481

473-
if err := cacher.ready.wait(context.TODO()); err != nil {
474-
t.Fatalf("unexpected error waiting for the cache to be ready")
482+
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
483+
if err := cacher.ready.wait(context.TODO()); err != nil {
484+
t.Fatalf("unexpected error waiting for the cache to be ready")
485+
}
475486
}
476487
return &createWrapper{Cacher: cacher}, tearDown
477488
}

0 commit comments

Comments
 (0)