Skip to content

Commit 1df459c

Browse files
authored
Merge pull request kubernetes#92174 from gongguan/event-gone
restore cacher event Gone tests
2 parents 1f629ca + f3cbfc3 commit 1df459c

File tree

6 files changed

+45
-17
lines changed

6 files changed

+45
-17
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ type Config struct {
9797
NewListFunc func() runtime.Object
9898

9999
Codec runtime.Codec
100+
101+
Clock clock.Clock
100102
}
101103

102104
type watchersMap map[int]*cacheWatcher
@@ -323,7 +325,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
323325
}
324326
}
325327

326-
clock := clock.RealClock{}
328+
if config.Clock == nil {
329+
config.Clock = clock.RealClock{}
330+
}
327331
objType := reflect.TypeOf(obj)
328332
cacher := &Cacher{
329333
ready: newReady(),
@@ -346,9 +350,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
346350
// and there are no guarantees on the order that they will stop.
347351
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
348352
stopCh: stopCh,
349-
clock: clock,
353+
clock: config.Clock,
350354
timer: time.NewTimer(time.Duration(0)),
351-
bookmarkWatchers: newTimeBucketWatchers(clock, defaultBookmarkFrequency),
355+
bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
352356
}
353357

354358
// Ensure that timer is stopped.
@@ -359,7 +363,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
359363
}
360364

361365
watchCache := newWatchCache(
362-
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType)
366+
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
363367
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
364368
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
365369

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er
271271
NewFunc: func() runtime.Object { return &example.Pod{} },
272272
NewListFunc: func() runtime.Object { return &example.PodList{} },
273273
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
274+
Clock: clock.RealClock{},
274275
}
275276
cacher, err := NewCacherFromConfig(config)
276277
return cacher, testVersioner{}, err

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ func newWatchCache(
198198
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
199199
versioner storage.Versioner,
200200
indexers *cache.Indexers,
201+
clock clock.Clock,
201202
objectType reflect.Type) *watchCache {
202203
wc := &watchCache{
203204
capacity: defaultLowerBoundCapacity,
@@ -212,7 +213,7 @@ func newWatchCache(
212213
resourceVersion: 0,
213214
listResourceVersion: 0,
214215
eventHandler: eventHandler,
215-
clock: clock.RealClock{},
216+
clock: clock,
216217
versioner: versioner,
217218
objectType: objectType,
218219
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,14 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache {
8181
}
8282
versioner := etcd3.APIObjectVersioner{}
8383
mockHandler := func(*watchCacheEvent) {}
84-
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, reflect.TypeOf(&example.Pod{}))
84+
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{}))
8585
// To preserve behavior of tests that assume a given capacity,
8686
// resize it to th expected size.
8787
wc.capacity = capacity
8888
wc.cache = make([]*watchCacheEvent, capacity)
8989
wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity)
9090
wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity)
9191

92-
wc.clock = clock.NewFakeClock(time.Now())
9392
return wc
9493
}
9594

staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_test(
1515
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
1616
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
1717
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
18+
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
1819
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
1920
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2021
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/apimachinery/pkg/labels"
3535
"k8s.io/apimachinery/pkg/runtime"
3636
"k8s.io/apimachinery/pkg/runtime/serializer"
37+
"k8s.io/apimachinery/pkg/util/clock"
3738
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3839
"k8s.io/apimachinery/pkg/util/sets"
3940
"k8s.io/apimachinery/pkg/util/wait"
@@ -56,6 +57,11 @@ var (
5657
codecs = serializer.NewCodecFactory(scheme)
5758
)
5859

60+
const (
61+
// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
62+
watchCacheDefaultCapacity = 100
63+
)
64+
5965
func init() {
6066
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
6167
utilruntime.Must(example.AddToScheme(scheme))
@@ -100,6 +106,10 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ
100106
}
101107

102108
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) {
109+
return newTestCacherWithClock(s, cap, clock.RealClock{})
110+
}
111+
112+
func newTestCacherWithClock(s storage.Interface, cap int, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
103113
prefix := "pods"
104114
v := etcd3.APIObjectVersioner{}
105115
config := cacherstorage.Config{
@@ -112,6 +122,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
112122
NewFunc: func() runtime.Object { return &example.Pod{} },
113123
NewListFunc: func() runtime.Object { return &example.PodList{} },
114124
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
125+
Clock: clock,
115126
}
116127
cacher, err := cacherstorage.NewCacherFromConfig(config)
117128
return cacher, v, err
@@ -394,7 +405,8 @@ func TestWatch(t *testing.T) {
394405
// Inject one list error to make sure we test the relist case.
395406
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
396407
defer server.Terminate(t)
397-
cacher, _, err := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
408+
fakeClock := clock.NewFakeClock(time.Now())
409+
cacher, _, err := newTestCacherWithClock(etcdStorage, watchCacheDefaultCapacity, fakeClock)
398410
if err != nil {
399411
t.Fatalf("Couldn't create cacher: %v", err)
400412
}
@@ -437,15 +449,6 @@ func TestWatch(t *testing.T) {
437449
verifyWatchEvent(t, watcher, watch.Added, podFoo)
438450
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
439451

440-
// Check whether we get too-old error via the watch channel
441-
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
442-
if err != nil {
443-
t.Fatalf("Expected no direct error, got %v", err)
444-
}
445-
defer tooOldWatcher.Stop()
446-
// Events happens in eventFreshDuration, cache expand without event "Gone".
447-
verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo)
448-
449452
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
450453
if err != nil {
451454
t.Fatalf("Unexpected error: %v", err)
@@ -466,6 +469,25 @@ func TestWatch(t *testing.T) {
466469
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
467470

468471
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
472+
473+
// Add watchCacheDefaultCapacity events to make current watch cache full.
474+
// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
475+
for i := 0; i < watchCacheDefaultCapacity; i++ {
476+
fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
477+
podFoo := makeTestPod(fmt.Sprintf("foo-%d", i))
478+
updatePod(t, etcdStorage, podFoo, nil)
479+
}
480+
481+
// Check whether we get too-old error via the watch channel
482+
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
483+
if err != nil {
484+
t.Fatalf("Expected no direct error, got %v", err)
485+
}
486+
defer tooOldWatcher.Stop()
487+
488+
// Ensure we get a "Gone" error.
489+
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
490+
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
469491
}
470492

471493
func TestWatcherTimeout(t *testing.T) {

0 commit comments

Comments
 (0)