57
57
codecs = serializer .NewCodecFactory (scheme )
58
58
)
59
59
60
+ const (
61
+ // watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
62
+ watchCacheDefaultCapacity = 100
63
+ )
64
+
60
65
func init () {
61
66
metav1 .AddToGroupVersion (scheme , metav1 .SchemeGroupVersion )
62
67
utilruntime .Must (example .AddToScheme (scheme ))
@@ -101,6 +106,10 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ
101
106
}
102
107
103
108
func newTestCacher (s storage.Interface , cap int ) (* cacherstorage.Cacher , storage.Versioner , error ) {
109
+ return newTestCacherWithClock (s , cap , clock.RealClock {})
110
+ }
111
+
112
+ func newTestCacherWithClock (s storage.Interface , cap int , clock clock.Clock ) (* cacherstorage.Cacher , storage.Versioner , error ) {
104
113
prefix := "pods"
105
114
v := etcd3.APIObjectVersioner {}
106
115
config := cacherstorage.Config {
@@ -113,7 +122,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
113
122
NewFunc : func () runtime.Object { return & example.Pod {} },
114
123
NewListFunc : func () runtime.Object { return & example.PodList {} },
115
124
Codec : codecs .LegacyCodec (examplev1 .SchemeGroupVersion ),
116
- Clock : clock. RealClock {} ,
125
+ Clock : clock ,
117
126
}
118
127
cacher , err := cacherstorage .NewCacherFromConfig (config )
119
128
return cacher , v , err
@@ -396,7 +405,8 @@ func TestWatch(t *testing.T) {
396
405
// Inject one list error to make sure we test the relist case.
397
406
etcdStorage = & injectListError {errors : 1 , Interface : etcdStorage }
398
407
defer server .Terminate (t )
399
- cacher , _ , err := newTestCacher (etcdStorage , 3 ) // small capacity to trigger "too old version" error
408
+ fakeClock := clock .NewFakeClock (time .Now ())
409
+ cacher , _ , err := newTestCacherWithClock (etcdStorage , watchCacheDefaultCapacity , fakeClock )
400
410
if err != nil {
401
411
t .Fatalf ("Couldn't create cacher: %v" , err )
402
412
}
@@ -439,15 +449,6 @@ func TestWatch(t *testing.T) {
439
449
verifyWatchEvent (t , watcher , watch .Added , podFoo )
440
450
verifyWatchEvent (t , watcher , watch .Modified , podFooPrime )
441
451
442
- // Check whether we get too-old error via the watch channel
443
- tooOldWatcher , err := cacher .Watch (context .TODO (), "pods/ns/foo" , storage.ListOptions {ResourceVersion : "1" , Predicate : storage .Everything })
444
- if err != nil {
445
- t .Fatalf ("Expected no direct error, got %v" , err )
446
- }
447
- defer tooOldWatcher .Stop ()
448
- // Events happens in eventFreshDuration, cache expand without event "Gone".
449
- verifyWatchEvent (t , tooOldWatcher , watch .Added , podFoo )
450
-
451
452
initialWatcher , err := cacher .Watch (context .TODO (), "pods/ns/foo" , storage.ListOptions {ResourceVersion : fooCreated .ResourceVersion , Predicate : storage .Everything })
452
453
if err != nil {
453
454
t .Fatalf ("Unexpected error: %v" , err )
@@ -468,6 +469,25 @@ func TestWatch(t *testing.T) {
468
469
_ = updatePod (t , etcdStorage , podFooBis , fooUpdated )
469
470
470
471
verifyWatchEvent (t , nowWatcher , watch .Modified , podFooBis )
472
+
473
+ // Add watchCacheDefaultCapacity events to make current watch cache full.
474
+ // Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
475
+ for i := 0 ; i < watchCacheDefaultCapacity ; i ++ {
476
+ fakeClock .SetTime (time .Now ().Add (time .Duration (i ) * time .Minute ))
477
+ podFoo := makeTestPod (fmt .Sprintf ("foo-%d" , i ))
478
+ updatePod (t , etcdStorage , podFoo , nil )
479
+ }
480
+
481
+ // Check whether we get too-old error via the watch channel
482
+ tooOldWatcher , err := cacher .Watch (context .TODO (), "pods/ns/foo" , storage.ListOptions {ResourceVersion : "1" , Predicate : storage .Everything })
483
+ if err != nil {
484
+ t .Fatalf ("Expected no direct error, got %v" , err )
485
+ }
486
+ defer tooOldWatcher .Stop ()
487
+
488
+ // Ensure we get a "Gone" error.
489
+ expectedResourceExpiredError := errors .NewResourceExpired ("" ).ErrStatus
490
+ verifyWatchEvent (t , tooOldWatcher , watch .Error , & expectedResourceExpiredError )
471
491
}
472
492
473
493
func TestWatcherTimeout (t * testing.T ) {
0 commit comments