Skip to content

Commit 49f419e

Browse files
authored
Merge pull request kubernetes#129657 from p0lyn0mial/upstream-cacher-resilient-init-back-off
storage/cacher/ready: dynamically calculate the retryAfterSeconds
2 parents 4b12e89 + 04f0bd4 commit 49f419e

File tree

6 files changed

+173
-35
lines changed

6 files changed

+173
-35
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
374374
objType := reflect.TypeOf(obj)
375375
cacher := &Cacher{
376376
resourcePrefix: config.ResourcePrefix,
377-
ready: newReady(),
377+
ready: newReady(config.Clock),
378378
storage: config.Storage,
379379
objectType: objType,
380380
groupResource: config.GroupResource,
@@ -506,9 +506,10 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
506506
var readyGeneration int
507507
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
508508
var ok bool
509-
readyGeneration, ok = c.ready.checkAndReadGeneration()
509+
var downtime time.Duration
510+
readyGeneration, downtime, ok = c.ready.checkAndReadGeneration()
510511
if !ok {
511-
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
512+
return nil, errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
512513
}
513514
} else {
514515
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
@@ -629,7 +630,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
629630
c.Lock()
630631
defer c.Unlock()
631632

632-
if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
633+
if generation, _, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
633634
// We went unready or are already on a different generation.
634635
// Avoid registering and starting the watch as it will have to be
635636
// terminated immediately anyway.
@@ -783,10 +784,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
783784
defer span.End(500 * time.Millisecond)
784785

785786
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
786-
if !c.ready.check() {
787+
if downtime, ok := c.ready.check(); !ok {
787788
// If Cacher is not initialized, reject List requests
788789
// as described in https://kep.k8s.io/4568
789-
return errors.NewTooManyRequests("storage is (re)initializing", 1)
790+
return errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
790791
}
791792
} else {
792793
if err := c.ready.wait(ctx); err != nil {
@@ -1338,7 +1339,8 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
13381339
}
13391340

13401341
func (c *Cacher) Ready() bool {
1341-
return c.ready.check()
1342+
_, ok := c.ready.check()
1343+
return ok
13421344
}
13431345

13441346
// errWatcher implements watch.Interface to return a single error

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) {
750750
// constantly failing lists to the underlying storage.
751751
dummyErr := fmt.Errorf("dummy")
752752
backingStorage := &dummyStorage{err: dummyErr}
753-
cacher, _, err := newTestCacherWithoutSyncing(backingStorage, clock.RealClock{})
753+
cacher, _, err := newTestCacherWithoutSyncing(backingStorage, testingclock.NewFakeClock(time.Now()))
754754
if err != nil {
755755
t.Fatalf("Couldn't create cacher: %v", err)
756756
}
@@ -3155,3 +3155,44 @@ func TestListIndexer(t *testing.T) {
31553155
})
31563156
}
31573157
}
3158+
3159+
func TestRetryAfterForUnreadyCache(t *testing.T) {
3160+
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
3161+
t.Skipf("the test requires %v to be enabled", features.ResilientWatchCacheInitialization)
3162+
}
3163+
backingStorage := &dummyStorage{}
3164+
clock := testingclock.NewFakeClock(time.Now())
3165+
cacher, _, err := newTestCacherWithoutSyncing(backingStorage, clock)
3166+
if err != nil {
3167+
t.Fatalf("Couldn't create cacher: %v", err)
3168+
}
3169+
defer cacher.Stop()
3170+
if err = cacher.ready.wait(context.Background()); err != nil {
3171+
t.Fatalf("Unexpected error waiting for the cache to be ready")
3172+
}
3173+
3174+
cacher.ready.set(false)
3175+
clock.Step(14 * time.Second)
3176+
3177+
opts := storage.ListOptions{
3178+
ResourceVersion: "0",
3179+
Predicate: storage.Everything,
3180+
}
3181+
result := &example.PodList{}
3182+
proxy := NewCacheProxy(cacher, backingStorage)
3183+
err = proxy.GetList(context.TODO(), "/pods/ns", opts, result)
3184+
3185+
if !apierrors.IsTooManyRequests(err) {
3186+
t.Fatalf("Unexpected GetList error: %v", err)
3187+
}
3188+
var statusError apierrors.APIStatus
3189+
if !errors.As(err, &statusError) {
3190+
t.Fatalf("Unexpected error: %v, expected apierrors.APIStatus", err)
3191+
}
3192+
if statusError.Status().Details == nil {
3193+
t.Fatalf("Expected to get status details, got none")
3194+
}
3195+
if statusError.Status().Details.RetryAfterSeconds != 2 {
3196+
t.Fatalf("Unexpected retry after: %v", statusError.Status().Details.RetryAfterSeconds)
3197+
}
3198+
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
24+
25+
"k8s.io/utils/clock"
2326
)
2427

2528
type status int
@@ -43,13 +46,20 @@ type ready struct {
4346
lock sync.RWMutex // protect the state and generation variables
4447
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
4548
waitCh chan struct{} // blocks until is ready or stopped
49+
50+
clock clock.Clock
51+
lastStateChangeTime time.Time
4652
}
4753

48-
func newReady() *ready {
49-
return &ready{
54+
func newReady(c clock.Clock) *ready {
55+
r := &ready{
5056
waitCh: make(chan struct{}),
5157
state: Pending,
58+
clock: c,
5259
}
60+
r.updateLastStateChangeTimeLocked()
61+
62+
return r
5363
}
5464

5565
// done close the channel once the state is Ready or Stopped
@@ -100,17 +110,17 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
100110
}
101111
}
102112

103-
// check returns true only if it is Ready.
104-
func (r *ready) check() bool {
105-
_, ok := r.checkAndReadGeneration()
106-
return ok
113+
// check returns the time elapsed since the state was last changed and the current value.
114+
func (r *ready) check() (time.Duration, bool) {
115+
_, elapsed, ok := r.checkAndReadGeneration()
116+
return elapsed, ok
107117
}
108118

109-
// checkAndReadGeneration returns the current generation and whether it is Ready.
110-
func (r *ready) checkAndReadGeneration() (int, bool) {
119+
// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value.
120+
func (r *ready) checkAndReadGeneration() (int, time.Duration, bool) {
111121
r.lock.RLock()
112122
defer r.lock.RUnlock()
113-
return r.generation, r.state == Ready
123+
return r.generation, r.clock.Since(r.lastStateChangeTime), r.state == Ready
114124
}
115125

116126
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
@@ -123,6 +133,7 @@ func (r *ready) set(ok bool) {
123133
if ok && r.state == Pending {
124134
r.state = Ready
125135
r.generation++
136+
r.updateLastStateChangeTimeLocked()
126137
select {
127138
case <-r.waitCh:
128139
default:
@@ -139,6 +150,7 @@ func (r *ready) set(ok bool) {
139150
default:
140151
}
141152
r.state = Pending
153+
r.updateLastStateChangeTimeLocked()
142154
}
143155
}
144156

@@ -148,10 +160,15 @@ func (r *ready) stop() {
148160
defer r.lock.Unlock()
149161
if r.state != Stopped {
150162
r.state = Stopped
163+
r.updateLastStateChangeTimeLocked()
151164
}
152165
select {
153166
case <-r.waitCh:
154167
default:
155168
close(r.waitCh)
156169
}
157170
}
171+
172+
func (r *ready) updateLastStateChangeTimeLocked() {
173+
r.lastStateChangeTime = r.clock.Now()
174+
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ import (
2121
"sync"
2222
"testing"
2323
"time"
24+
25+
testingclock "k8s.io/utils/clock/testing"
2426
)
2527

2628
func Test_newReady(t *testing.T) {
2729
errCh := make(chan error, 10)
28-
ready := newReady()
30+
ready := newReady(testingclock.NewFakeClock(time.Now()))
2931
ready.set(false)
3032
// create 10 goroutines waiting for ready
3133
for i := 0; i < 10; i++ {
@@ -48,20 +50,20 @@ func Test_newReady(t *testing.T) {
4850

4951
func Test_newReadySetIdempotent(t *testing.T) {
5052
errCh := make(chan error, 10)
51-
ready := newReady()
53+
ready := newReady(testingclock.NewFakeClock(time.Now()))
5254
ready.set(false)
5355
ready.set(false)
5456
ready.set(false)
55-
if generation, ok := ready.checkAndReadGeneration(); generation != 0 || ok {
57+
if generation, _, ok := ready.checkAndReadGeneration(); generation != 0 || ok {
5658
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
5759
}
5860
ready.set(true)
59-
if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
61+
if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
6062
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
6163
}
6264
ready.set(true)
6365
ready.set(true)
64-
if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
66+
if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
6567
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
6668
}
6769
ready.set(false)
@@ -77,7 +79,7 @@ func Test_newReadySetIdempotent(t *testing.T) {
7779
t.Errorf("ready should be blocking")
7880
}
7981
ready.set(true)
80-
if generation, ok := ready.checkAndReadGeneration(); generation != 2 || !ok {
82+
if generation, _, ok := ready.checkAndReadGeneration(); generation != 2 || !ok {
8183
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
8284
}
8385
for i := 0; i < 10; i++ {
@@ -92,7 +94,7 @@ func Test_newReadySetIdempotent(t *testing.T) {
9294
func Test_newReadyRacy(t *testing.T) {
9395
concurrency := 1000
9496
errCh := make(chan error, concurrency)
95-
ready := newReady()
97+
ready := newReady(testingclock.NewFakeClock(time.Now()))
9698
ready.set(false)
9799

98100
wg := sync.WaitGroup{}
@@ -123,7 +125,7 @@ func Test_newReadyRacy(t *testing.T) {
123125

124126
func Test_newReadyStop(t *testing.T) {
125127
errCh := make(chan error, 10)
126-
ready := newReady()
128+
ready := newReady(testingclock.NewFakeClock(time.Now()))
127129
ready.set(false)
128130
// create 10 goroutines waiting for ready and stop
129131
for i := 0; i < 10; i++ {
@@ -145,24 +147,24 @@ func Test_newReadyStop(t *testing.T) {
145147
}
146148

147149
func Test_newReadyCheck(t *testing.T) {
148-
ready := newReady()
150+
ready := newReady(testingclock.NewFakeClock(time.Now()))
149151
// it starts as false
150-
if ready.check() {
151-
t.Errorf("unexpected ready state %v", ready.check())
152+
if _, ok := ready.check(); ok {
153+
t.Errorf("unexpected ready state %v", ok)
152154
}
153155
ready.set(true)
154-
if !ready.check() {
155-
t.Errorf("unexpected ready state %v", ready.check())
156+
if _, ok := ready.check(); !ok {
157+
t.Errorf("unexpected ready state %v", ok)
156158
}
157159
// stop sets ready to false
158160
ready.stop()
159-
if ready.check() {
160-
t.Errorf("unexpected ready state %v", ready.check())
161+
if _, ok := ready.check(); ok {
162+
t.Errorf("unexpected ready state %v", ok)
161163
}
162164
// can not set to true if is stopped
163165
ready.set(true)
164-
if ready.check() {
165-
t.Errorf("unexpected ready state %v", ready.check())
166+
if _, ok := ready.check(); ok {
167+
t.Errorf("unexpected ready state %v", ok)
166168
}
167169
err := ready.wait(context.Background())
168170
if err == nil {
@@ -172,7 +174,7 @@ func Test_newReadyCheck(t *testing.T) {
172174

173175
func Test_newReadyCancelPending(t *testing.T) {
174176
errCh := make(chan error, 10)
175-
ready := newReady()
177+
ready := newReady(testingclock.NewFakeClock(time.Now()))
176178
ready.set(false)
177179
ctx, cancel := context.WithCancel(context.Background())
178180
// create 10 goroutines stuck on pending
@@ -193,3 +195,40 @@ func Test_newReadyCancelPending(t *testing.T) {
193195
}
194196
}
195197
}
198+
199+
func Test_newReadyStateChangeTimestamp(t *testing.T) {
200+
fakeClock := testingclock.NewFakeClock(time.Now())
201+
fakeClock.SetTime(time.Now())
202+
203+
ready := newReady(fakeClock)
204+
fakeClock.Step(time.Minute)
205+
checkReadyTransitionTime(t, ready, time.Minute)
206+
207+
ready.set(true)
208+
fakeClock.Step(time.Minute)
209+
checkReadyTransitionTime(t, ready, time.Minute)
210+
fakeClock.Step(time.Minute)
211+
checkReadyTransitionTime(t, ready, 2*time.Minute)
212+
213+
ready.set(false)
214+
fakeClock.Step(time.Minute)
215+
checkReadyTransitionTime(t, ready, time.Minute)
216+
fakeClock.Step(time.Minute)
217+
checkReadyTransitionTime(t, ready, 2*time.Minute)
218+
219+
ready.set(true)
220+
fakeClock.Step(time.Minute)
221+
checkReadyTransitionTime(t, ready, time.Minute)
222+
223+
ready.stop()
224+
fakeClock.Step(time.Minute)
225+
checkReadyTransitionTime(t, ready, time.Minute)
226+
fakeClock.Step(time.Minute)
227+
checkReadyTransitionTime(t, ready, 2*time.Minute)
228+
}
229+
230+
func checkReadyTransitionTime(t *testing.T, r *ready, expectedLastStateChangeDuration time.Duration) {
231+
if lastStateChangeDuration, _ := r.check(); lastStateChangeDuration != expectedLastStateChangeDuration {
232+
t.Errorf("unexpected last state change duration: %v, expected: %v", lastStateChangeDuration, expectedLastStateChangeDuration)
233+
}
234+
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ limitations under the License.
1717
package cacher
1818

1919
import (
20+
"math"
2021
"strings"
22+
"time"
2123
)
2224

2325
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
@@ -44,3 +46,11 @@ func hasPathPrefix(s, pathPrefix string) bool {
4446
}
4547
return false
4648
}
49+
50+
// calculateRetryAfterForUnreadyCache calculates the retry duration based on the cache downtime.
51+
func calculateRetryAfterForUnreadyCache(downtime time.Duration) int {
52+
factor := 0.06
53+
result := math.Exp(factor * downtime.Seconds())
54+
result = math.Min(30, math.Max(1, result))
55+
return int(result)
56+
}

0 commit comments

Comments
 (0)