Skip to content

Commit 010409a

Browse files
authored
Merge pull request kubernetes#125730 from p0lyn0mial/upstream-bring-back-consistent-read-from-cache-supports-pagination
apiserver/storage/cacher: consistent read from cache supports limit
2 parents ef1d28a + 2f9660d commit 010409a

File tree

5 files changed

+131
-26
lines changed

5 files changed

+131
-26
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
615615
// to compute watcher.forget function (which has to happen under lock).
616616
watcher := newCacheWatcher(
617617
chanSize,
618-
filterWithAttrsFunction(key, pred),
618+
filterWithAttrsAndPrefixFunction(key, pred),
619619
emptyFunc,
620620
c.versioner,
621621
deadline,
@@ -768,12 +768,26 @@ func shouldDelegateList(opts storage.ListOptions) bool {
768768
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
769769
// Watch cache doesn't support continuations, so serve them from etcd.
770770
hasContinuation := len(pred.Continue) > 0
771-
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
772-
hasLimit := pred.Limit > 0 && resourceVersion != "0"
773771
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
774-
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
772+
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
773+
isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
774+
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
775775

776-
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
776+
return consistentReadFromStorage || hasContinuation || unsupportedMatch
777+
}
778+
779+
// computeListLimit determines whether the cacher should
780+
// apply a limit to an incoming LIST request and returns its value.
781+
//
782+
// note that this function doesn't check RVM nor the Continuation token.
783+
// these parameters are validated by the shouldDelegateList function.
784+
//
785+
// as of today, the limit is ignored for requests that set RV == 0
786+
func computeListLimit(opts storage.ListOptions) int64 {
787+
if opts.Predicate.Limit <= 0 || opts.ResourceVersion == "0" {
788+
return 0
789+
}
790+
return opts.Predicate.Limit
777791
}
778792

779793
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
@@ -795,7 +809,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred
795809
}
796810
return nil, readResourceVersion, "", nil
797811
}
798-
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex(ctx))
812+
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
799813
}
800814

801815
// GetList implements storage.Interface
@@ -871,7 +885,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
871885
if listVal.Kind() != reflect.Slice {
872886
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
873887
}
874-
filter := filterWithAttrsFunction(preparedKey, pred)
875888

876889
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
877890
if err != nil {
@@ -883,13 +896,21 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
883896
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
884897
// so we try to delay this action as much as possible
885898
var selectedObjects []runtime.Object
886-
for _, obj := range objs {
899+
var lastSelectedObjectKey string
900+
var hasMoreListItems bool
901+
limit := computeListLimit(opts)
902+
for i, obj := range objs {
887903
elem, ok := obj.(*storeElement)
888904
if !ok {
889905
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
890906
}
891-
if filter(elem.Key, elem.Labels, elem.Fields) {
907+
if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
892908
selectedObjects = append(selectedObjects, elem.Object)
909+
lastSelectedObjectKey = elem.Key
910+
}
911+
if limit > 0 && int64(len(selectedObjects)) >= limit {
912+
hasMoreListItems = i < len(objs)-1
913+
break
893914
}
894915
}
895916
if len(selectedObjects) == 0 {
@@ -905,7 +926,12 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
905926
}
906927
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
907928
if c.versioner != nil {
908-
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
929+
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts)
930+
if err != nil {
931+
return err
932+
}
933+
934+
if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil {
909935
return err
910936
}
911937
}
@@ -1293,7 +1319,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
12931319
}
12941320
}
12951321

1296-
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
1322+
func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
12971323
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
12981324
if !hasPathPrefix(objKey, key) {
12991325
return false

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ func TestGetListCacheBypass(t *testing.T) {
201201
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
202202
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
203203

204-
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
205204
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
206205
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
207206

@@ -214,6 +213,7 @@ func TestGetListCacheBypass(t *testing.T) {
214213
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
215214
testCases := append(commonTestCases,
216215
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
216+
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
217217
)
218218
for _, tc := range testCases {
219219
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
@@ -233,6 +233,7 @@ func TestGetListCacheBypass(t *testing.T) {
233233

234234
testCases := append(commonTestCases,
235235
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false},
236+
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
236237
)
237238
for _, tc := range testCases {
238239
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
@@ -2591,6 +2592,63 @@ func TestWatchStreamSeparation(t *testing.T) {
25912592
}
25922593
}
25932594

2595+
func TestComputeListLimit(t *testing.T) {
2596+
scenarios := []struct {
2597+
name string
2598+
opts storage.ListOptions
2599+
expectedLimit int64
2600+
}{
2601+
{
2602+
name: "limit is zero",
2603+
opts: storage.ListOptions{
2604+
Predicate: storage.SelectionPredicate{
2605+
Limit: 0,
2606+
},
2607+
},
2608+
expectedLimit: 0,
2609+
},
2610+
{
2611+
name: "limit is positive, RV is unset",
2612+
opts: storage.ListOptions{
2613+
Predicate: storage.SelectionPredicate{
2614+
Limit: 1,
2615+
},
2616+
ResourceVersion: "",
2617+
},
2618+
expectedLimit: 1,
2619+
},
2620+
{
2621+
name: "limit is positive, RV = 100",
2622+
opts: storage.ListOptions{
2623+
Predicate: storage.SelectionPredicate{
2624+
Limit: 1,
2625+
},
2626+
ResourceVersion: "100",
2627+
},
2628+
expectedLimit: 1,
2629+
},
2630+
{
2631+
name: "legacy case: limit is positive, RV = 0",
2632+
opts: storage.ListOptions{
2633+
Predicate: storage.SelectionPredicate{
2634+
Limit: 1,
2635+
},
2636+
ResourceVersion: "0",
2637+
},
2638+
expectedLimit: 0,
2639+
},
2640+
}
2641+
2642+
for _, scenario := range scenarios {
2643+
t.Run(scenario.name, func(t *testing.T) {
2644+
actualLimit := computeListLimit(scenario.opts)
2645+
if actualLimit != scenario.expectedLimit {
2646+
t.Errorf("computeListLimit returned = %v, expected %v", actualLimit, scenario.expectedLimit)
2647+
}
2648+
})
2649+
}
2650+
}
2651+
25942652
func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
25952653
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
25962654
opts.Predicate.AllowWatchBookmarks = true

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,29 @@ func (s sortableStoreElements) Swap(i, j int) {
501501

502502
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
503503
// with their ResourceVersion and the name of the index, if any, that was used.
504-
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
504+
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
505+
items, rv, index, err := w.waitUntilFreshAndListItems(ctx, resourceVersion, key, matchValues)
506+
if err != nil {
507+
return nil, 0, "", err
508+
}
509+
510+
var result []interface{}
511+
for _, item := range items {
512+
elem, ok := item.(*storeElement)
513+
if !ok {
514+
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
515+
}
516+
if !hasPathPrefix(elem.Key, key) {
517+
continue
518+
}
519+
result = append(result, item)
520+
}
521+
522+
sort.Sort(sortableStoreElements(result))
523+
return result, rv, index, nil
524+
}
525+
526+
func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
505527
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
506528
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
507529
w.waitingUntilFresh.Add()
@@ -511,7 +533,6 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
511533
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
512534
}
513535

514-
defer func() { sort.Sort(sortableStoreElements(result)) }()
515536
defer w.RUnlock()
516537
if err != nil {
517538
return result, rv, index, err

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
462462
}()
463463

464464
// list by empty MatchValues.
465-
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, nil)
465+
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
466466
if err != nil {
467467
t.Fatalf("unexpected error: %v", err)
468468
}
@@ -481,7 +481,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
481481
{IndexName: "l:label", Value: "value1"},
482482
{IndexName: "f:spec.nodeName", Value: "node2"},
483483
}
484-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
484+
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
485485
if err != nil {
486486
t.Fatalf("unexpected error: %v", err)
487487
}
@@ -500,7 +500,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
500500
{IndexName: "l:not-exist-label", Value: "whatever"},
501501
{IndexName: "f:spec.nodeName", Value: "node2"},
502502
}
503-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
503+
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
504504
if err != nil {
505505
t.Fatalf("unexpected error: %v", err)
506506
}
@@ -518,7 +518,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
518518
matchValues = []storage.MatchValue{
519519
{IndexName: "l:not-exist-label", Value: "whatever"},
520520
}
521-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
521+
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
522522
if err != nil {
523523
t.Fatalf("unexpected error: %v", err)
524524
}
@@ -546,7 +546,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
546546
}()
547547

548548
// list from future revision. Requires watch cache to request bookmark to get it.
549-
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil)
549+
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
550550
if err != nil {
551551
t.Fatalf("unexpected error: %v", err)
552552
}
@@ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
626626
store.Add(makeTestPod("bar", 4))
627627
}()
628628

629-
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil)
629+
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
630630
if !errors.IsTimeout(err) {
631631
t.Errorf("expected timeout error but got: %v", err)
632632
}
@@ -655,7 +655,7 @@ func TestReflectorForWatchCache(t *testing.T) {
655655
defer store.Stop()
656656

657657
{
658-
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil)
658+
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
659659
if err != nil {
660660
t.Fatalf("unexpected error: %v", err)
661661
}
@@ -678,7 +678,7 @@ func TestReflectorForWatchCache(t *testing.T) {
678678
r.ListAndWatch(wait.NeverStop)
679679

680680
{
681-
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, nil)
681+
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
682682
if err != nil {
683683
t.Fatalf("unexpected error: %v", err)
684684
}

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
173173
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
174174
// Watch cache doesn't support continuations, so serve them from etcd.
175175
hasContinuation := len(opts.Continue) > 0
176-
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
177-
hasLimit := opts.Limit > 0 && resourceVersion != "0"
178176
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
179-
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
177+
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
178+
isLegacyExactMatch := opts.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
179+
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
180180

181-
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
181+
return consistentReadFromStorage || hasContinuation || unsupportedMatch
182182
}

0 commit comments

Comments
 (0)