Skip to content

Commit 87536f3

Browse files
committed
apiserver/storage/cacher: cache supports pagination
1 parent bc3b8f6 commit 87536f3

File tree

2 files changed

+92
-7
lines changed

2 files changed

+92
-7
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -762,12 +762,26 @@ func shouldDelegateList(opts storage.ListOptions) bool {
762762
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
763763
// Watch cache doesn't support continuations, so serve them from etcd.
764764
hasContinuation := len(pred.Continue) > 0
765-
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
766-
hasLimit := pred.Limit > 0 && resourceVersion != "0"
767765
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
768-
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
766+
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
767+
isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
768+
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
769769

770-
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
770+
return consistentReadFromStorage || hasContinuation || unsupportedMatch
771+
}
772+
773+
// computeListLimit determines whether the cacher should
774+
// apply a limit to an incoming LIST request and returns its value.
775+
//
776+
// note that this function doesn't check RVM nor the Continuation token.
777+
// these parameters are validated by the shouldDelegateList function.
778+
//
779+
// as of today, the limit is ignored for requests that set RV == 0
780+
func computeListLimit(opts storage.ListOptions) int64 {
781+
if opts.Predicate.Limit <= 0 || opts.ResourceVersion == "0" {
782+
return 0
783+
}
784+
return opts.Predicate.Limit
771785
}
772786

773787
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
@@ -877,13 +891,21 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
877891
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
878892
// so we try to delay this action as much as possible
879893
var selectedObjects []runtime.Object
880-
for _, obj := range objs {
894+
var lastSelectedObjectKey string
895+
var hasMoreListItems bool
896+
limit := computeListLimit(opts)
897+
for i, obj := range objs {
881898
elem, ok := obj.(*storeElement)
882899
if !ok {
883900
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
884901
}
885902
if filter(elem.Key, elem.Labels, elem.Fields) {
886903
selectedObjects = append(selectedObjects, elem.Object)
904+
lastSelectedObjectKey = elem.Key
905+
}
906+
if limit > 0 && int64(len(selectedObjects)) >= limit {
907+
hasMoreListItems = i < len(objs)-1
908+
break
887909
}
888910
}
889911
if len(selectedObjects) == 0 {
@@ -899,7 +921,12 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
899921
}
900922
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
901923
if c.versioner != nil {
902-
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
924+
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts)
925+
if err != nil {
926+
return err
927+
}
928+
929+
if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil {
903930
return err
904931
}
905932
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ func TestGetListCacheBypass(t *testing.T) {
201201
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
202202
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
203203

204-
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
205204
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
206205
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
207206

@@ -214,6 +213,7 @@ func TestGetListCacheBypass(t *testing.T) {
214213
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
215214
testCases := append(commonTestCases,
216215
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
216+
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
217217
)
218218
for _, tc := range testCases {
219219
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
@@ -233,6 +233,7 @@ func TestGetListCacheBypass(t *testing.T) {
233233

234234
testCases := append(commonTestCases,
235235
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false},
236+
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
236237
)
237238
for _, tc := range testCases {
238239
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
@@ -2566,6 +2567,63 @@ func TestWatchStreamSeparation(t *testing.T) {
25662567
}
25672568
}
25682569

2570+
func TestComputeListLimit(t *testing.T) {
2571+
scenarios := []struct {
2572+
name string
2573+
opts storage.ListOptions
2574+
expectedLimit int64
2575+
}{
2576+
{
2577+
name: "limit is zero",
2578+
opts: storage.ListOptions{
2579+
Predicate: storage.SelectionPredicate{
2580+
Limit: 0,
2581+
},
2582+
},
2583+
expectedLimit: 0,
2584+
},
2585+
{
2586+
name: "limit is positive, RV is unset",
2587+
opts: storage.ListOptions{
2588+
Predicate: storage.SelectionPredicate{
2589+
Limit: 1,
2590+
},
2591+
ResourceVersion: "",
2592+
},
2593+
expectedLimit: 1,
2594+
},
2595+
{
2596+
name: "limit is positive, RV = 100",
2597+
opts: storage.ListOptions{
2598+
Predicate: storage.SelectionPredicate{
2599+
Limit: 1,
2600+
},
2601+
ResourceVersion: "100",
2602+
},
2603+
expectedLimit: 1,
2604+
},
2605+
{
2606+
name: "legacy case: limit is positive, RV = 0",
2607+
opts: storage.ListOptions{
2608+
Predicate: storage.SelectionPredicate{
2609+
Limit: 1,
2610+
},
2611+
ResourceVersion: "0",
2612+
},
2613+
expectedLimit: 0,
2614+
},
2615+
}
2616+
2617+
for _, scenario := range scenarios {
2618+
t.Run(scenario.name, func(t *testing.T) {
2619+
actualLimit := computeListLimit(scenario.opts)
2620+
if actualLimit != scenario.expectedLimit {
2621+
t.Errorf("computeListLimit returned = %v, expected %v", actualLimit, scenario.expectedLimit)
2622+
}
2623+
})
2624+
}
2625+
}
2626+
25692627
func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
25702628
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
25712629
opts.Predicate.AllowWatchBookmarks = true

0 commit comments

Comments
 (0)