Skip to content

Commit 2f9660d

Browse files
committed
apiserver/storage/watchcache: WaitUntilFreshAndList supports path prefix
1 parent c259fe2 commit 2f9660d

File tree

3 files changed

+35
-15
lines changed

3 files changed

+35
-15
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
615615
// to compute watcher.forget function (which has to happen under lock).
616616
watcher := newCacheWatcher(
617617
chanSize,
618-
filterWithAttrsFunction(key, pred),
618+
filterWithAttrsAndPrefixFunction(key, pred),
619619
emptyFunc,
620620
c.versioner,
621621
deadline,
@@ -809,7 +809,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred
809809
}
810810
return nil, readResourceVersion, "", nil
811811
}
812-
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex(ctx))
812+
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
813813
}
814814

815815
// GetList implements storage.Interface
@@ -885,7 +885,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
885885
if listVal.Kind() != reflect.Slice {
886886
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
887887
}
888-
filter := filterWithAttrsFunction(preparedKey, pred)
889888

890889
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
891890
if err != nil {
@@ -905,7 +904,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
905904
if !ok {
906905
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
907906
}
908-
if filter(elem.Key, elem.Labels, elem.Fields) {
907+
if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
909908
selectedObjects = append(selectedObjects, elem.Object)
910909
lastSelectedObjectKey = elem.Key
911910
}
@@ -1320,7 +1319,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
13201319
}
13211320
}
13221321

1323-
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
1322+
func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
13241323
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
13251324
if !hasPathPrefix(objKey, key) {
13261325
return false

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,29 @@ func (s sortableStoreElements) Swap(i, j int) {
501501

502502
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
503503
// with their ResourceVersion and the name of the index, if any, that was used.
504-
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
504+
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
505+
items, rv, index, err := w.waitUntilFreshAndListItems(ctx, resourceVersion, key, matchValues)
506+
if err != nil {
507+
return nil, 0, "", err
508+
}
509+
510+
var result []interface{}
511+
for _, item := range items {
512+
elem, ok := item.(*storeElement)
513+
if !ok {
514+
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
515+
}
516+
if !hasPathPrefix(elem.Key, key) {
517+
continue
518+
}
519+
result = append(result, item)
520+
}
521+
522+
sort.Sort(sortableStoreElements(result))
523+
return result, rv, index, nil
524+
}
525+
526+
func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
505527
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
506528
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
507529
w.waitingUntilFresh.Add()
@@ -511,7 +533,6 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
511533
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
512534
}
513535

514-
defer func() { sort.Sort(sortableStoreElements(result)) }()
515536
defer w.RUnlock()
516537
if err != nil {
517538
return result, rv, index, err

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
462462
}()
463463

464464
// list by empty MatchValues.
465-
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, nil)
465+
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
466466
if err != nil {
467467
t.Fatalf("unexpected error: %v", err)
468468
}
@@ -481,7 +481,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
481481
{IndexName: "l:label", Value: "value1"},
482482
{IndexName: "f:spec.nodeName", Value: "node2"},
483483
}
484-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
484+
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
485485
if err != nil {
486486
t.Fatalf("unexpected error: %v", err)
487487
}
@@ -500,7 +500,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
500500
{IndexName: "l:not-exist-label", Value: "whatever"},
501501
{IndexName: "f:spec.nodeName", Value: "node2"},
502502
}
503-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
503+
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
504504
if err != nil {
505505
t.Fatalf("unexpected error: %v", err)
506506
}
@@ -518,7 +518,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
518518
matchValues = []storage.MatchValue{
519519
{IndexName: "l:not-exist-label", Value: "whatever"},
520520
}
521-
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
521+
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
522522
if err != nil {
523523
t.Fatalf("unexpected error: %v", err)
524524
}
@@ -546,7 +546,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
546546
}()
547547

548548
// list from future revision. Requires watch cache to request bookmark to get it.
549-
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil)
549+
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
550550
if err != nil {
551551
t.Fatalf("unexpected error: %v", err)
552552
}
@@ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
626626
store.Add(makeTestPod("bar", 4))
627627
}()
628628

629-
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil)
629+
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
630630
if !errors.IsTimeout(err) {
631631
t.Errorf("expected timeout error but got: %v", err)
632632
}
@@ -655,7 +655,7 @@ func TestReflectorForWatchCache(t *testing.T) {
655655
defer store.Stop()
656656

657657
{
658-
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil)
658+
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
659659
if err != nil {
660660
t.Fatalf("unexpected error: %v", err)
661661
}
@@ -678,7 +678,7 @@ func TestReflectorForWatchCache(t *testing.T) {
678678
r.ListAndWatch(wait.NeverStop)
679679

680680
{
681-
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, nil)
681+
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
682682
if err != nil {
683683
t.Fatalf("unexpected error: %v", err)
684684
}

0 commit comments

Comments
 (0)