Skip to content

Commit e5558a8

Browse files
authored
Merge pull request kubernetes#130899 from serathius/watchcache-error
Implement watchcache returning error from etcd that caused cache reinitialization
2 parents fba6365 + c09d87f commit e5558a8

File tree

5 files changed

+115
-91
lines changed

5 files changed

+115
-91
lines changed

staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion/conversion_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -686,18 +686,14 @@ func expectConversionFailureMessage(id, message string) func(t *testing.T, ctc *
686686
objv1beta2 := newConversionMultiVersionFixture(ns, id, "v1beta2")
687687
meta, _, _ := unstructured.NestedFieldCopy(obj.Object, "metadata")
688688
unstructured.SetNestedField(objv1beta2.Object, meta, "metadata")
689-
lastRV := objv1beta2.GetResourceVersion()
690689

691690
for _, verb := range []string{"get", "list", "create", "update", "patch", "delete", "deletecollection"} {
692691
t.Run(verb, func(t *testing.T) {
693692
switch verb {
694693
case "get":
695694
_, err = clients["v1beta2"].Get(context.TODO(), obj.GetName(), metav1.GetOptions{})
696695
case "list":
697-
// With ResilientWatchcCacheInitialization feature, List requests are rejected with 429 if watchcache is not initialized.
698-
// However, in some of these tests that install faulty converter webhook, watchcache will never initialize by definition (as list will never succeed due to faulty converter webook).
699-
// In such case, the returned error will differ from the one returned from the etcd, so we need to force the request to go to etcd.
700-
_, err = clients["v1beta2"].List(context.TODO(), metav1.ListOptions{ResourceVersion: lastRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact})
696+
_, err = clients["v1beta2"].List(context.TODO(), metav1.ListOptions{})
701697
case "create":
702698
_, err = clients["v1beta2"].Create(context.TODO(), newConversionMultiVersionFixture(ns, id, "v1beta2"), metav1.CreateOptions{})
703699
case "update":

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -463,31 +463,19 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
463463
}
464464

465465
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
466-
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
467-
// It is safe to use the cache after a successful list until a disconnection.
468-
// We start with usable (write) locked. The below OnReplace function will
469-
// unlock it after a successful list. The below defer will then re-lock
470-
// it when this function exits (always due to disconnection), only if
471-
// we actually got a successful list. This cycle will repeat as needed.
472-
successfulList := false
473466
c.watchCache.SetOnReplace(func() {
474-
successfulList = true
475-
c.ready.set(true)
467+
c.ready.setReady()
476468
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
477469
metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
478470
})
471+
var err error
479472
defer func() {
480-
if successfulList {
481-
c.ready.set(false)
482-
}
473+
c.ready.setError(err)
483474
}()
484475

485476
c.terminateAllWatchers()
486-
// Note that since onReplace may be not called due to errors, we explicitly
487-
// need to retry it on errors under lock.
488-
// Also note that startCaching is called in a loop, so there's no need
489-
// to have another loop here.
490-
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
477+
err = c.reflector.ListAndWatch(stopChannel)
478+
if err != nil {
491479
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
492480
}
493481
}
@@ -506,11 +494,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
506494

507495
var readyGeneration int
508496
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
509-
var ok bool
497+
var err error
510498
var downtime time.Duration
511-
readyGeneration, downtime, ok = c.ready.checkAndReadGeneration()
512-
if !ok {
513-
return nil, errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
499+
readyGeneration, downtime, err = c.ready.checkAndReadGeneration()
500+
if err != nil {
501+
return nil, errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
514502
}
515503
} else {
516504
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
@@ -631,7 +619,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
631619
c.Lock()
632620
defer c.Unlock()
633621

634-
if generation, _, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
622+
if generation, _, err := c.ready.checkAndReadGeneration(); generation != readyGeneration || err != nil {
635623
// We went unready or are already on a different generation.
636624
// Avoid registering and starting the watch as it will have to be
637625
// terminated immediately anyway.
@@ -749,10 +737,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
749737
defer span.End(500 * time.Millisecond)
750738

751739
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
752-
if downtime, ok := c.ready.check(); !ok {
740+
if downtime, err := c.ready.check(); err != nil {
753741
// If Cacher is not initialized, reject List requests
754742
// as described in https://kep.k8s.io/4568
755-
return errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
743+
return errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
756744
}
757745
} else {
758746
if err := c.ready.wait(ctx); err != nil {
@@ -1304,8 +1292,8 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
13041292
}
13051293

13061294
func (c *Cacher) Ready() bool {
1307-
_, ok := c.ready.check()
1308-
return ok
1295+
_, err := c.ready.check()
1296+
return err == nil
13091297
}
13101298

13111299
// errWatcher implements watch.Interface to return a single error

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3213,7 +3213,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
32133213
t.Fatalf("Unexpected error waiting for the cache to be ready")
32143214
}
32153215

3216-
cacher.ready.set(false)
3216+
cacher.ready.setError(nil)
32173217
clock.Step(14 * time.Second)
32183218

32193219
opts := storage.ListOptions{

staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ const (
4141
// | ^
4242
// └---------------------------┘
4343
type ready struct {
44-
state status // represent the state of the variable
44+
state status // represent the state of the variable
45+
lastErr error
4546
generation int // represent the number of times we have transtioned to ready
4647
lock sync.RWMutex // protect the state and generation variables
4748
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
@@ -87,49 +88,69 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
8788
}
8889

8990
r.lock.RLock()
90-
switch r.state {
91-
case Pending:
91+
if r.state == Pending {
9292
// since we allow to switch between the states Pending and Ready
9393
// if there is a quick transition from Pending -> Ready -> Pending
9494
// a process that was waiting can get unblocked and see a Pending
9595
// state again. If the state is Pending we have to wait again to
9696
// avoid an inconsistent state on the system, with some processes not
9797
// waiting despite the state moved back to Pending.
9898
r.lock.RUnlock()
99-
case Ready:
100-
generation := r.generation
101-
r.lock.RUnlock()
102-
return generation, nil
103-
case Stopped:
104-
r.lock.RUnlock()
105-
return 0, fmt.Errorf("apiserver cacher is stopped")
106-
default:
107-
r.lock.RUnlock()
108-
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
99+
continue
109100
}
101+
generation, err := r.readGenerationLocked()
102+
r.lock.RUnlock()
103+
return generation, err
110104
}
111105
}
112106

113107
// check returns the time elapsed since the state was last changed and the current value.
114-
func (r *ready) check() (time.Duration, bool) {
115-
_, elapsed, ok := r.checkAndReadGeneration()
116-
return elapsed, ok
108+
func (r *ready) check() (time.Duration, error) {
109+
_, elapsed, err := r.checkAndReadGeneration()
110+
return elapsed, err
117111
}
118112

119113
// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value.
120-
func (r *ready) checkAndReadGeneration() (int, time.Duration, bool) {
114+
func (r *ready) checkAndReadGeneration() (int, time.Duration, error) {
121115
r.lock.RLock()
122116
defer r.lock.RUnlock()
123-
return r.generation, r.clock.Since(r.lastStateChangeTime), r.state == Ready
117+
generation, err := r.readGenerationLocked()
118+
return generation, r.clock.Since(r.lastStateChangeTime), err
119+
}
120+
121+
func (r *ready) readGenerationLocked() (int, error) {
122+
switch r.state {
123+
case Pending:
124+
if r.lastErr == nil {
125+
return 0, fmt.Errorf("storage is (re)initializing")
126+
} else {
127+
return 0, fmt.Errorf("storage is (re)initializing: %w", r.lastErr)
128+
}
129+
case Ready:
130+
return r.generation, nil
131+
case Stopped:
132+
return 0, fmt.Errorf("apiserver cacher is stopped")
133+
default:
134+
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
135+
}
136+
}
137+
138+
func (r *ready) setReady() {
139+
r.set(true, nil)
140+
}
141+
142+
func (r *ready) setError(err error) {
143+
r.set(false, err)
124144
}
125145

126146
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
127-
func (r *ready) set(ok bool) {
147+
func (r *ready) set(ok bool, err error) {
128148
r.lock.Lock()
129149
defer r.lock.Unlock()
130150
if r.state == Stopped {
131151
return
132152
}
153+
r.lastErr = err
133154
if ok && r.state == Pending {
134155
r.state = Ready
135156
r.generation++

0 commit comments

Comments
 (0)