Skip to content

Commit 4c311c9

Browse files
authored
Merge pull request kubernetes#130475 from serathius/watchcache-consistency
Implement consistency checking
2 parents e9a3d99 + e4d73c5 commit 4c311c9

File tree

9 files changed

+546
-10
lines changed

9 files changed

+546
-10
lines changed

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,17 @@ func StorageWithCacher() generic.StorageDecorator {
7171
if err != nil {
7272
return nil, func() {}, err
7373
}
74+
delegator := cacherstorage.NewCacheDelegator(cacher, s)
7475
var once sync.Once
7576
destroyFunc := func() {
7677
once.Do(func() {
78+
delegator.Stop()
7779
cacher.Stop()
7880
d()
7981
})
8082
}
8183

82-
return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil
84+
return delegator, destroyFunc, nil
8385
}
8486
}
8587

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2459,8 +2459,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
24592459
}
24602460
}
24612461
d := destroyFunc
2462-
s = cacherstorage.NewCacheDelegator(cacher, s)
2462+
delegator := cacherstorage.NewCacheDelegator(cacher, s)
2463+
s = delegator
24632464
destroyFunc = func() {
2465+
delegator.Stop()
24642466
cacher.Stop()
24652467
d()
24662468
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ type listResp struct {
729729
}
730730

731731
// GetList implements storage.Interface
732-
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
732+
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
733733
// For recursive lists, we need to make sure the key ended with "/" so that we only
734734
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
735735
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@@ -738,6 +738,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
738738
if opts.Recursive && !strings.HasSuffix(key, "/") {
739739
preparedKey += "/"
740740
}
741+
listRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
742+
if err != nil {
743+
return err
744+
}
741745

742746
ctx, span := tracing.Start(ctx, "cacher.GetList",
743747
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -484,10 +484,6 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
484484
t.Fatalf("Failed to initialize cacher: %v", err)
485485
}
486486
ctx := context.Background()
487-
terminate := func() {
488-
cacher.Stop()
489-
server.Terminate(t)
490-
}
491487

492488
// Since some tests depend on the fact that GetList shouldn't fail,
493489
// we wait until the error from the underlying storage is consumed.
@@ -503,8 +499,14 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
503499
t.Fatal(err)
504500
}
505501
}
502+
delegator := NewCacheDelegator(cacher, wrappedStorage)
503+
terminate := func() {
504+
delegator.Stop()
505+
cacher.Stop()
506+
server.Terminate(t)
507+
}
506508

507-
return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate
509+
return ctx, delegator, server, terminate
508510
}
509511

510512
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,15 @@ func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, e
207207
return 100, nil
208208
}
209209

210+
type dummyCacher struct {
211+
dummyStorage
212+
ready bool
213+
}
214+
215+
func (d *dummyCacher) Ready() bool {
216+
return d.ready
217+
}
218+
210219
func TestGetListCacheBypass(t *testing.T) {
211220
type opts struct {
212221
ResourceVersion string
@@ -326,6 +335,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
326335
}
327336
defer cacher.Stop()
328337
delegator := NewCacheDelegator(cacher, backingStorage)
338+
defer delegator.Stop()
329339
result := &example.PodList{}
330340
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
331341
if err := cacher.ready.wait(context.Background()); err != nil {
@@ -450,6 +460,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
450460
}
451461
defer cacher.Stop()
452462
delegator := NewCacheDelegator(cacher, backingStorage)
463+
defer delegator.Stop()
453464
if err := cacher.ready.wait(context.Background()); err != nil {
454465
t.Fatalf("unexpected error waiting for the cache to be ready")
455466
}
@@ -533,6 +544,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
533544
}
534545
defer cacher.Stop()
535546
delegator := NewCacheDelegator(cacher, backingStorage)
547+
defer delegator.Stop()
536548

537549
pred := storage.SelectionPredicate{
538550
Limit: 500,
@@ -572,6 +584,7 @@ func TestGetCacheBypass(t *testing.T) {
572584
}
573585
defer cacher.Stop()
574586
delegator := NewCacheDelegator(cacher, backingStorage)
587+
defer delegator.Stop()
575588

576589
result := &example.Pod{}
577590

@@ -608,6 +621,7 @@ func TestWatchCacheBypass(t *testing.T) {
608621
}
609622
defer cacher.Stop()
610623
delegator := NewCacheDelegator(cacher, backingStorage)
624+
defer delegator.Stop()
611625

612626
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
613627
if err := cacher.ready.wait(context.Background()); err != nil {
@@ -645,6 +659,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
645659
}
646660
defer cacher.Stop()
647661
delegator := NewCacheDelegator(cacher, backingStorage)
662+
defer delegator.Stop()
648663

649664
opts := storage.ListOptions{
650665
ResourceVersion: "0",
@@ -890,6 +905,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
890905
t.Fatalf("Couldn't create cacher: %v", err)
891906
}
892907
delegator := NewCacheDelegator(cacher, backingStorage)
908+
defer delegator.Stop()
893909

894910
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
895911
if err := cacher.ready.wait(context.Background()); err != nil {
@@ -2326,6 +2342,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
23262342
}
23272343
defer cacher.Stop()
23282344
delegator := NewCacheDelegator(cacher, store)
2345+
defer delegator.Stop()
23292346

23302347
// prepare result and pred
23312348
parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
@@ -3207,6 +3224,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
32073224
}
32083225
result := &example.PodList{}
32093226
delegator := NewCacheDelegator(cacher, backingStorage)
3227+
defer delegator.Stop()
32103228
err = delegator.GetList(context.TODO(), "/pods/ns", opts, result)
32113229

32123230
if !apierrors.IsTooManyRequests(err) {

0 commit comments

Comments
 (0)