Skip to content

Commit aa35eff

Browse files
authored
Merge pull request kubernetes#130423 from serathius/watchcache-continue
Serve LISTs with exact RV and continuations from cache
2 parents 473533a + f82c9e5 commit aa35eff

File tree

8 files changed

+299
-35
lines changed

8 files changed

+299
-35
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,15 +1328,52 @@ func newErrWatcher(err error) *errWatcher {
13281328
}
13291329

13301330
func (c *Cacher) ShouldDelegateExactRV(resourceVersion string, recursive bool) (delegator.Result, error) {
1331-
return delegator.CacheWithoutSnapshots{}.ShouldDelegateExactRV(resourceVersion, recursive)
1331+
// Not Recursive is not supported unitl exact RV is implemented for WaitUntilFreshAndGet.
1332+
if !recursive || c.watchCache.snapshots == nil {
1333+
return delegator.Result{ShouldDelegate: true}, nil
1334+
}
1335+
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
1336+
if err != nil {
1337+
return delegator.Result{}, err
1338+
}
1339+
return c.shouldDelegateExactRV(listRV)
13321340
}
13331341

13341342
func (c *Cacher) ShouldDelegateContinue(continueToken string, recursive bool) (delegator.Result, error) {
1335-
return delegator.CacheWithoutSnapshots{}.ShouldDelegateContinue(continueToken, recursive)
1343+
// Not Recursive is not supported unitl exact RV is implemented for WaitUntilFreshAndGet.
1344+
if !recursive || c.watchCache.snapshots == nil {
1345+
return delegator.Result{ShouldDelegate: true}, nil
1346+
}
1347+
_, continueRV, err := storage.DecodeContinue(continueToken, c.resourcePrefix)
1348+
if err != nil {
1349+
return delegator.Result{}, err
1350+
}
1351+
if continueRV > 0 {
1352+
return c.shouldDelegateExactRV(uint64(continueRV))
1353+
} else {
1354+
// Continue with negative RV is a consistent read.
1355+
return c.ShouldDelegateConsistentRead()
1356+
}
1357+
}
1358+
1359+
func (c *Cacher) shouldDelegateExactRV(rv uint64) (delegator.Result, error) {
1360+
// Exact requests on future revision require support for consistent read, but are not a consistent read by themselves.
1361+
if c.watchCache.notFresh(rv) {
1362+
return delegator.Result{
1363+
ShouldDelegate: !delegator.ConsistentReadSupported(),
1364+
}, nil
1365+
}
1366+
_, canServe := c.watchCache.snapshots.GetLessOrEqual(rv)
1367+
return delegator.Result{
1368+
ShouldDelegate: !canServe,
1369+
}, nil
13361370
}
13371371

13381372
func (c *Cacher) ShouldDelegateConsistentRead() (delegator.Result, error) {
1339-
return delegator.CacheWithoutSnapshots{}.ShouldDelegateConsistentRead()
1373+
return delegator.Result{
1374+
ConsistentRead: true,
1375+
ShouldDelegate: !delegator.ConsistentReadSupported(),
1376+
}, nil
13401377
}
13411378

13421379
// Implements watch.Interface.

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -179,31 +179,46 @@ func TestListPaging(t *testing.T) {
179179
func TestList(t *testing.T) {
180180
for _, consistentRead := range []bool{true, false} {
181181
t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) {
182-
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
183-
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
184-
t.Cleanup(terminate)
185-
storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true)
182+
for _, listFromCacheSnapshot := range []bool{true, false} {
183+
t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) {
184+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot)
185+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
186+
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
187+
t.Cleanup(terminate)
188+
storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true)
189+
})
190+
}
186191
})
187192
}
188193
}
189194
func TestConsistentList(t *testing.T) {
190195
for _, consistentRead := range []bool{true, false} {
191196
t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) {
192-
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
193-
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
194-
t.Cleanup(terminate)
195-
storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead)
197+
for _, listFromCacheSnapshot := range []bool{true, false} {
198+
t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) {
199+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot)
200+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
201+
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
202+
t.Cleanup(terminate)
203+
storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead, listFromCacheSnapshot)
204+
})
205+
}
196206
})
197207
}
198208
}
199209

200210
func TestGetListNonRecursive(t *testing.T) {
201211
for _, consistentRead := range []bool{true, false} {
202212
t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) {
203-
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
204-
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
205-
t.Cleanup(terminate)
206-
storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher)
213+
for _, listFromCacheSnapshot := range []bool{true, false} {
214+
t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) {
215+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot)
216+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
217+
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
218+
t.Cleanup(terminate)
219+
storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher)
220+
})
221+
}
207222
})
208223
}
209224
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 158 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ func TestShouldDelegateList(t *testing.T) {
344344
}
345345
}
346346

347-
runTestCases := func(t *testing.T, testcases map[opts]bool, overrides ...map[opts]bool) {
347+
runTestCases := func(t *testing.T, snapshotAvailable bool, testcases map[opts]bool, overrides ...map[opts]bool) {
348348
for opt, expectBypass := range testCases {
349349
for _, override := range overrides {
350350
if bypass, ok := override[opt]; ok {
@@ -362,6 +362,9 @@ func TestShouldDelegateList(t *testing.T) {
362362
t.Fatalf("Couldn't create cacher: %v", err)
363363
}
364364
defer cacher.Stop()
365+
if snapshotAvailable {
366+
cacher.watchCache.snapshots.Add(uint64(mustAtoi(oldRV)), fakeOrderedLister{})
367+
}
365368
result, err := shouldDelegateList(toStorageOpts(opt), cacher)
366369
if err != nil {
367370
t.Fatal(err)
@@ -371,27 +374,88 @@ func TestShouldDelegateList(t *testing.T) {
371374
}
372375
}
373376
}
374-
consistentListFromCacheOverrides := map[opts]bool{}
375-
for _, recursive := range []bool{true, false} {
376-
consistentListFromCacheOverrides[opts{Recursive: recursive}] = false
377-
consistentListFromCacheOverrides[opts{Limit: 100, Recursive: recursive}] = false
378-
}
377+
378+
// Exacts and continue on current cache RV.
379+
listFromSnapshotEnabledOverrides := map[opts]bool{}
380+
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, Limit: 100}] = false
381+
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false
382+
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, Limit: 100, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false
383+
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnCacheRV}] = false
384+
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnCacheRV}] = false
385+
listFromSnapshotEnabledOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnCacheRV}] = false
386+
listFromSnapshotEnabledOverrides[opts{Recursive: true, Continue: continueOnCacheRV}] = false
387+
388+
// Exacts and continue RV with a snapshot.
389+
snapshotAvailableOverrides := map[opts]bool{}
390+
snapshotAvailableOverrides[opts{Recursive: true, Continue: continueOnOldRV}] = false
391+
snapshotAvailableOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnOldRV}] = false
392+
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnOldRV}] = false
393+
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnOldRV}] = false
394+
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, Limit: 100}] = false
395+
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false
396+
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = false
379397

380398
t.Run("ConsistentListFromCache=false", func(t *testing.T) {
381399
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
382-
runTestCases(t, testCases)
400+
t.Run("ListFromCacheSnapshot=false", func(t *testing.T) {
401+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false)
402+
runTestCases(t, false, testCases)
403+
})
404+
405+
t.Run("ListFromCacheSnapshot=true", func(t *testing.T) {
406+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true)
407+
t.Run("SnapshotAvailable=false", func(t *testing.T) {
408+
runTestCases(t, false, testCases, listFromSnapshotEnabledOverrides)
409+
})
410+
t.Run("SnapshotAvailable=true", func(t *testing.T) {
411+
runTestCases(t, true, testCases, listFromSnapshotEnabledOverrides, snapshotAvailableOverrides)
412+
})
413+
})
383414
})
384415
t.Run("ConsistentListFromCache=true", func(t *testing.T) {
385416
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
386-
387417
// TODO(p0lyn0mial): the following tests assume that etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
388418
// evaluates to true. Otherwise the cache will be bypassed and the test will fail.
389419
//
390420
// If you were to run only TestGetListCacheBypass you would see that the test fail.
391421
// However in CI all test are run and there must be a test(s) that properly
392422
// initialize the storage layer so that the mentioned method evaluates to true
393423
forceRequestWatchProgressSupport(t)
394-
runTestCases(t, testCases, consistentListFromCacheOverrides)
424+
425+
consistentListFromCacheOverrides := map[opts]bool{}
426+
for _, recursive := range []bool{true, false} {
427+
consistentListFromCacheOverrides[opts{Recursive: recursive}] = false
428+
consistentListFromCacheOverrides[opts{Limit: 100, Recursive: recursive}] = false
429+
}
430+
431+
t.Run("ListFromCacheSnapshot=false", func(t *testing.T) {
432+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false)
433+
runTestCases(t, false, testCases, consistentListFromCacheOverrides)
434+
})
435+
t.Run("ListFromCacheSnapshot=true", func(t *testing.T) {
436+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true)
437+
438+
consistentReadWithSnapshotOverrides := map[opts]bool{}
439+
// Continues with negative RV are same as consistent read.
440+
consistentReadWithSnapshotOverrides[opts{Recursive: true, Limit: 0, Continue: continueOnNegativeRV}] = false
441+
consistentReadWithSnapshotOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnNegativeRV}] = false
442+
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 0, Continue: continueOnNegativeRV}] = false
443+
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnNegativeRV}] = false
444+
// Exact on RV not yet observed by cache
445+
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, Limit: 100}] = false
446+
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false
447+
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = false
448+
consistentReadWithSnapshotOverrides[opts{Recursive: true, Continue: continueOnEtcdRV}] = false
449+
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnEtcdRV}] = false
450+
consistentReadWithSnapshotOverrides[opts{Recursive: true, Continue: continueOnEtcdRV, Limit: 100}] = false
451+
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnEtcdRV, Limit: 100}] = false
452+
t.Run("SnapshotAvailable=false", func(t *testing.T) {
453+
runTestCases(t, false, testCases, listFromSnapshotEnabledOverrides, consistentListFromCacheOverrides, consistentReadWithSnapshotOverrides)
454+
})
455+
t.Run("SnapshotAvailable=true", func(t *testing.T) {
456+
runTestCases(t, true, testCases, listFromSnapshotEnabledOverrides, consistentListFromCacheOverrides, consistentReadWithSnapshotOverrides, snapshotAvailableOverrides)
457+
})
458+
})
395459
})
396460
}
397461

@@ -568,6 +632,91 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
568632
}
569633
}
570634

635+
func TestMatchExactResourceVersionFallback(t *testing.T) {
636+
tcs := []struct {
637+
name string
638+
snapshotsAvailable []bool
639+
640+
expectStoreRequests int
641+
expectSnapshotRequests int
642+
}{
643+
{
644+
name: "Disabled",
645+
snapshotsAvailable: []bool{false, false},
646+
expectStoreRequests: 2,
647+
expectSnapshotRequests: 1,
648+
},
649+
{
650+
name: "Enabled",
651+
snapshotsAvailable: []bool{true, true},
652+
expectStoreRequests: 1,
653+
expectSnapshotRequests: 2,
654+
},
655+
{
656+
name: "Fallback",
657+
snapshotsAvailable: []bool{true, false},
658+
expectSnapshotRequests: 2,
659+
expectStoreRequests: 2,
660+
},
661+
}
662+
for _, tc := range tcs {
663+
t.Run(tc.name, func(t *testing.T) {
664+
backingStorage := &dummyStorage{}
665+
expectStoreRequests := 0
666+
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
667+
expectStoreRequests++
668+
podList := listObj.(*example.PodList)
669+
switch opts.ResourceVersionMatch {
670+
case "":
671+
podList.ResourceVersion = "42"
672+
case metav1.ResourceVersionMatchExact:
673+
podList.ResourceVersion = opts.ResourceVersion
674+
}
675+
return nil
676+
}
677+
cacher, _, err := newTestCacherWithoutSyncing(backingStorage, clock.RealClock{})
678+
if err != nil {
679+
t.Fatalf("Couldn't create cacher: %v", err)
680+
}
681+
defer cacher.Stop()
682+
snapshotRequestCount := 0
683+
cacher.watchCache.RWMutex.Lock()
684+
cacher.watchCache.snapshots = &fakeSnapshotter{
685+
getLessOrEqual: func(rv uint64) (orderedLister, bool) {
686+
snapshotAvailable := tc.snapshotsAvailable[snapshotRequestCount]
687+
snapshotRequestCount++
688+
if snapshotAvailable {
689+
return fakeOrderedLister{}, true
690+
} else {
691+
return nil, false
692+
}
693+
},
694+
}
695+
cacher.watchCache.RWMutex.Unlock()
696+
if err := cacher.ready.wait(context.Background()); err != nil {
697+
t.Fatalf("unexpected error waiting for the cache to be ready")
698+
}
699+
delegator := NewCacheDelegator(cacher, backingStorage)
700+
701+
result := &example.PodList{}
702+
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "20", ResourceVersionMatch: metav1.ResourceVersionMatchExact, Recursive: true}, result)
703+
if err != nil {
704+
t.Fatalf("Unexpected error: %v", err)
705+
}
706+
if result.ResourceVersion != "20" {
707+
t.Fatalf("Unexpected List response RV, got: %q, want: %d", result.ResourceVersion, 20)
708+
}
709+
if expectStoreRequests != tc.expectStoreRequests {
710+
t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", expectStoreRequests, tc.expectStoreRequests)
711+
}
712+
if snapshotRequestCount != tc.expectSnapshotRequests {
713+
t.Fatalf("Unexpected number of requests to snapshots, got: %d, want: %d", snapshotRequestCount, tc.expectSnapshotRequests)
714+
}
715+
716+
})
717+
}
718+
}
719+
571720
func TestGetListNonRecursiveCacheBypass(t *testing.T) {
572721
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
573722
backingStorage := &dummyStorage{}

staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
218218
success := "true"
219219
fallback := "false"
220220
if err != nil {
221+
if errors.IsResourceExpired(err) {
222+
return c.storage.GetList(ctx, key, opts, listObj)
223+
}
221224
if result.ConsistentRead {
222225
if storage.IsTooLargeResourceVersion(err) {
223226
fallback = "true"

0 commit comments

Comments
 (0)