Skip to content

Commit b4258ec

Browse files
authored
Merge pull request kubernetes#95869 from wojtek-t/debug_watchcache_test
Fix cacher test flakiness
2 parents 4df8d97 + 6eb71c4 commit b4258ec

File tree

4 files changed

+34
-11
lines changed

4 files changed

+34
-11
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ type Cacher struct {
263263

264264
// Defines a time budget that can be spend on waiting for not-ready watchers
265265
// while dispatching event before shutting them down.
266-
dispatchTimeoutBudget *timeBudget
266+
dispatchTimeoutBudget timeBudget
267267

268268
// Handling graceful termination.
269269
stopLock sync.RWMutex

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,14 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
998998
wg.Wait()
999999
}
10001000

1001+
type fakeTimeBudget struct{}
1002+
1003+
func (f *fakeTimeBudget) takeAvailable() time.Duration {
1004+
return 2 * time.Second
1005+
}
1006+
1007+
func (f *fakeTimeBudget) returnUnused(_ time.Duration) {}
1008+
10011009
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
10021010
backingStorage := &dummyStorage{}
10031011
cacher, _, err := newTestCacher(backingStorage)
@@ -1010,7 +1018,19 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
10101018
cacher.ready.wait()
10111019

10121020
// Ensure there is some budget for slowing down processing.
1013-
cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond)
1021+
// When using the official `timeBudgetImpl` we were observing flakiness
1022+
// due under the following conditions:
1023+
// 1) the watch w1 is blocked, so we were consuming the whole budget once
1024+
// its buffer was filled in (10 items)
1025+
// 2) the budget is refreshed once per second, so it basically wasn't
1026+
// happening in the test at all
1027+
// 3) if the test was cpu-starved and we weren't able to consume events
1028+
// from w2 ResultCh it could have happened that its buffer was also
1029+
// filling in and given we no longer had timeBudget (consumed in (1))
1030+
// trying to put next item was simply breaking the watch
1031+
// Using fakeTimeBudget gives us always a budget to wait and have a test
1032+
// pick up something from ResultCh in the meantime.
1033+
cacher.dispatchTimeoutBudget = &fakeTimeBudget{}
10141034

10151035
makePod := func(i int) *examplev1.Pod {
10161036
return &examplev1.Pod{
@@ -1055,8 +1075,6 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
10551075
shouldContinue = false
10561076
break
10571077
}
1058-
// Ensure there is some budget for fast watcher after slower one is blocked.
1059-
cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond)
10601078
if event.Type == watch.Added {
10611079
eventsCount++
10621080
if eventsCount == totalPods {

staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,21 @@ const (
3939
// NOTE: It's not recommended to be used concurrently from multiple threads -
4040
// if first user takes the whole timeout, the second one will get 0 timeout
4141
// even though the first one may return something later.
42-
type timeBudget struct {
42+
type timeBudget interface {
43+
takeAvailable() time.Duration
44+
returnUnused(unused time.Duration)
45+
}
46+
47+
type timeBudgetImpl struct {
4348
sync.Mutex
4449
budget time.Duration
4550

4651
refresh time.Duration
4752
maxBudget time.Duration
4853
}
4954

50-
func newTimeBudget(stopCh <-chan struct{}) *timeBudget {
51-
result := &timeBudget{
55+
func newTimeBudget(stopCh <-chan struct{}) timeBudget {
56+
result := &timeBudgetImpl{
5257
budget: time.Duration(0),
5358
refresh: refreshPerSecond,
5459
maxBudget: maxBudget,
@@ -57,7 +62,7 @@ func newTimeBudget(stopCh <-chan struct{}) *timeBudget {
5762
return result
5863
}
5964

60-
func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) {
65+
func (t *timeBudgetImpl) periodicallyRefresh(stopCh <-chan struct{}) {
6166
ticker := time.NewTicker(time.Second)
6267
defer ticker.Stop()
6368
for {
@@ -74,15 +79,15 @@ func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) {
7479
}
7580
}
7681

77-
func (t *timeBudget) takeAvailable() time.Duration {
82+
func (t *timeBudgetImpl) takeAvailable() time.Duration {
7883
t.Lock()
7984
defer t.Unlock()
8085
result := t.budget
8186
t.budget = time.Duration(0)
8287
return result
8388
}
8489

85-
func (t *timeBudget) returnUnused(unused time.Duration) {
90+
func (t *timeBudgetImpl) returnUnused(unused time.Duration) {
8691
t.Lock()
8792
defer t.Unlock()
8893
if unused < 0 {

staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
)
2323

2424
func TestTimeBudget(t *testing.T) {
25-
budget := &timeBudget{
25+
budget := &timeBudgetImpl{
2626
budget: time.Duration(0),
2727
maxBudget: time.Duration(200),
2828
}

0 commit comments

Comments
 (0)