Skip to content

Commit 0e78cf1

Browse files
authored
Merge pull request kubernetes#129440 from serathius/watchcache-extract-list-response
Extract list response struct to manage all the response fields
2 parents 09b8a26 + 78a6402 commit 0e78cf1

File tree

4 files changed

+76
-69
lines changed

4 files changed

+76
-69
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -825,20 +825,25 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
825825
return noLabelSelector && noFieldSelector && hasLimit
826826
}
827827

828-
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
828+
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) {
829829
if !recursive {
830830
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
831831
if err != nil {
832-
return nil, 0, "", err
832+
return listResp{}, "", err
833833
}
834834
if exists {
835-
return []interface{}{obj}, readResourceVersion, "", nil
835+
return listResp{Items: []interface{}{obj}, ResourceVersion: readResourceVersion}, "", nil
836836
}
837-
return nil, readResourceVersion, "", nil
837+
return listResp{ResourceVersion: readResourceVersion}, "", nil
838838
}
839839
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
840840
}
841841

842+
type listResp struct {
843+
Items []interface{}
844+
ResourceVersion uint64
845+
}
846+
842847
// GetList implements storage.Interface
843848
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
844849
recursive := opts.Recursive
@@ -914,7 +919,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
914919
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
915920
}
916921

917-
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
922+
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
918923
success := "true"
919924
fallback := "false"
920925
if err != nil {
@@ -933,7 +938,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
933938
if consistentRead {
934939
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
935940
}
936-
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
941+
span.AddEvent("Listed items from cache", attribute.Int("count", len(resp.Items)))
937942
// store pointer of eligible objects,
938943
// Why not directly put object in the items of listObj?
939944
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
@@ -942,7 +947,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
942947
var lastSelectedObjectKey string
943948
var hasMoreListItems bool
944949
limit := computeListLimit(opts)
945-
for i, obj := range objs {
950+
for i, obj := range resp.Items {
946951
elem, ok := obj.(*storeElement)
947952
if !ok {
948953
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
@@ -952,7 +957,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
952957
lastSelectedObjectKey = elem.Key
953958
}
954959
if limit > 0 && int64(len(selectedObjects)) >= limit {
955-
hasMoreListItems = i < len(objs)-1
960+
hasMoreListItems = i < len(resp.Items)-1
956961
break
957962
}
958963
}
@@ -969,16 +974,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
969974
}
970975
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
971976
if c.versioner != nil {
972-
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts)
977+
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(resp.ResourceVersion), int64(len(resp.Items)), hasMoreListItems, opts)
973978
if err != nil {
974979
return err
975980
}
976981

977-
if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil {
982+
if err = c.versioner.UpdateList(listObj, resp.ResourceVersion, continueValue, remainingItemCount); err != nil {
978983
return err
979984
}
980985
}
981-
metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), listVal.Len())
986+
metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(resp.Items), listVal.Len())
982987
return nil
983988
}
984989

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3079,7 +3079,7 @@ func TestListIndexer(t *testing.T) {
30793079
for _, tt := range tests {
30803080
t.Run(tt.name, func(t *testing.T) {
30813081
pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields)
3082-
_, _, usedIndex, err := cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive)
3082+
_, usedIndex, err := cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive)
30833083
if err != nil {
30843084
t.Errorf("Unexpected error: %v", err)
30853085
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ func (s sortableStoreElements) Swap(i, j int) {
451451

452452
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
453453
// with their ResourceVersion and the name of the index, if any, that was used.
454-
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
454+
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (resp listResp, index string, err error) {
455455
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
456456
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
457457
w.waitingUntilFresh.Add()
@@ -463,32 +463,34 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
463463

464464
defer w.RUnlock()
465465
if err != nil {
466-
return result, rv, index, err
467-
}
468-
var prefixFilteredAndOrdered bool
469-
result, rv, index, prefixFilteredAndOrdered, err = func() ([]interface{}, uint64, string, bool, error) {
470-
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
471-
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
472-
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
473-
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
474-
for _, matchValue := range matchValues {
475-
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
476-
return result, w.resourceVersion, matchValue.IndexName, false, nil
477-
}
478-
}
479-
if store, ok := w.store.(orderedLister); ok {
480-
result, _ := store.ListPrefix(key, "", 0)
481-
return result, w.resourceVersion, "", true, nil
482-
}
483-
return w.store.List(), w.resourceVersion, "", false, nil
484-
}()
485-
if !prefixFilteredAndOrdered {
486-
result, err = filterPrefixAndOrder(key, result)
487-
if err != nil {
488-
return nil, 0, "", err
466+
return listResp{}, "", err
467+
}
468+
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
469+
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
470+
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
471+
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
472+
for _, matchValue := range matchValues {
473+
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
474+
result, err = filterPrefixAndOrder(key, result)
475+
return listResp{
476+
Items: result,
477+
ResourceVersion: w.resourceVersion,
478+
}, matchValue.IndexName, err
489479
}
490480
}
491-
return result, w.resourceVersion, index, nil
481+
if store, ok := w.store.(orderedLister); ok {
482+
result, _ := store.ListPrefix(key, "", 0)
483+
return listResp{
484+
Items: result,
485+
ResourceVersion: w.resourceVersion,
486+
}, "", nil
487+
}
488+
result := w.store.List()
489+
result, err = filterPrefixAndOrder(key, result)
490+
return listResp{
491+
Items: result,
492+
ResourceVersion: w.resourceVersion,
493+
}, "", err
492494
}
493495

494496
func filterPrefixAndOrder(prefix string, items []interface{}) ([]interface{}, error) {

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -462,15 +462,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
462462
}()
463463

464464
// list by empty MatchValues.
465-
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
465+
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
466466
if err != nil {
467467
t.Fatalf("unexpected error: %v", err)
468468
}
469-
if resourceVersion != 5 {
470-
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
469+
if resp.ResourceVersion != 5 {
470+
t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion)
471471
}
472-
if len(list) != 3 {
473-
t.Errorf("unexpected list returned: %#v", list)
472+
if len(resp.Items) != 3 {
473+
t.Errorf("unexpected list returned: %#v", resp)
474474
}
475475
if indexUsed != "" {
476476
t.Errorf("Used index %q but expected none to be used", indexUsed)
@@ -481,15 +481,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
481481
{IndexName: "l:label", Value: "value1"},
482482
{IndexName: "f:spec.nodeName", Value: "node2"},
483483
}
484-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
484+
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
485485
if err != nil {
486486
t.Fatalf("unexpected error: %v", err)
487487
}
488-
if resourceVersion != 5 {
489-
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
488+
if resp.ResourceVersion != 5 {
489+
t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion)
490490
}
491-
if len(list) != 2 {
492-
t.Errorf("unexpected list returned: %#v", list)
491+
if len(resp.Items) != 2 {
492+
t.Errorf("unexpected list returned: %#v", resp)
493493
}
494494
if indexUsed != "l:label" {
495495
t.Errorf("Used index %q but expected %q", indexUsed, "l:label")
@@ -500,15 +500,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
500500
{IndexName: "l:not-exist-label", Value: "whatever"},
501501
{IndexName: "f:spec.nodeName", Value: "node2"},
502502
}
503-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
503+
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
504504
if err != nil {
505505
t.Fatalf("unexpected error: %v", err)
506506
}
507-
if resourceVersion != 5 {
508-
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
507+
if resp.ResourceVersion != 5 {
508+
t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion)
509509
}
510-
if len(list) != 1 {
511-
t.Errorf("unexpected list returned: %#v", list)
510+
if len(resp.Items) != 1 {
511+
t.Errorf("unexpected list returned: %#v", resp)
512512
}
513513
if indexUsed != "f:spec.nodeName" {
514514
t.Errorf("Used index %q but expected %q", indexUsed, "f:spec.nodeName")
@@ -518,15 +518,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
518518
matchValues = []storage.MatchValue{
519519
{IndexName: "l:not-exist-label", Value: "whatever"},
520520
}
521-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
521+
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
522522
if err != nil {
523523
t.Fatalf("unexpected error: %v", err)
524524
}
525-
if resourceVersion != 5 {
526-
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
525+
if resp.ResourceVersion != 5 {
526+
t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion)
527527
}
528-
if len(list) != 3 {
529-
t.Errorf("unexpected list returned: %#v", list)
528+
if len(resp.Items) != 3 {
529+
t.Errorf("unexpected list returned: %#v", resp)
530530
}
531531
if indexUsed != "" {
532532
t.Errorf("Used index %q but expected none to be used", indexUsed)
@@ -546,15 +546,15 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
546546
}()
547547

548548
// list from future revision. Requires watch cache to request bookmark to get it.
549-
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
549+
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
550550
if err != nil {
551551
t.Fatalf("unexpected error: %v", err)
552552
}
553-
if resourceVersion != 3 {
554-
t.Errorf("unexpected resourceVersion: %v, expected: 6", resourceVersion)
553+
if resp.ResourceVersion != 3 {
554+
t.Errorf("unexpected resourceVersion: %v, expected: 6", resp.ResourceVersion)
555555
}
556-
if len(list) != 1 {
557-
t.Errorf("unexpected list returned: %#v", list)
556+
if len(resp.Items) != 1 {
557+
t.Errorf("unexpected list returned: %#v", resp)
558558
}
559559
if indexUsed != "" {
560560
t.Errorf("Used index %q but expected none to be used", indexUsed)
@@ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
626626
store.Add(makeTestPod("bar", 4))
627627
}()
628628

629-
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
629+
_, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
630630
if !errors.IsTimeout(err) {
631631
t.Errorf("expected timeout error but got: %v", err)
632632
}
@@ -655,12 +655,12 @@ func TestReflectorForWatchCache(t *testing.T) {
655655
defer store.Stop()
656656

657657
{
658-
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
658+
resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
659659
if err != nil {
660660
t.Fatalf("unexpected error: %v", err)
661661
}
662-
if version != 0 {
663-
t.Errorf("unexpected resource version: %d", version)
662+
if resp.ResourceVersion != 0 {
663+
t.Errorf("unexpected resource version: %d", resp.ResourceVersion)
664664
}
665665
}
666666

@@ -678,12 +678,12 @@ func TestReflectorForWatchCache(t *testing.T) {
678678
r.ListAndWatch(wait.NeverStop)
679679

680680
{
681-
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
681+
resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
682682
if err != nil {
683683
t.Fatalf("unexpected error: %v", err)
684684
}
685-
if version != 10 {
686-
t.Errorf("unexpected resource version: %d", version)
685+
if resp.ResourceVersion != 10 {
686+
t.Errorf("unexpected resource version: %d", resp.ResourceVersion)
687687
}
688688
}
689689
}

0 commit comments

Comments
 (0)