Skip to content

Commit c30b1eb

Browse files
authored
Merge pull request kubernetes#130589 from serathius/watchcache-opts
Pass storage.ListOptions to WaitUntilFreshAndList
2 parents 4696667 + e6cf9dd commit c30b1eb

File tree

4 files changed

+49
-40
lines changed

4 files changed

+49
-40
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -709,8 +709,8 @@ func computeListLimit(opts storage.ListOptions) int64 {
709709
return opts.Predicate.Limit
710710
}
711711

712-
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) {
713-
if !recursive {
712+
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, opts storage.ListOptions) (listResp, string, error) {
713+
if !opts.Recursive {
714714
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
715715
if err != nil {
716716
return listResp{}, "", err
@@ -720,7 +720,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred
720720
}
721721
return listResp{ResourceVersion: readResourceVersion}, "", nil
722722
}
723-
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
723+
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, opts)
724724
}
725725

726726
type listResp struct {
@@ -730,8 +730,6 @@ type listResp struct {
730730

731731
// GetList implements storage.Interface
732732
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
733-
recursive := opts.Recursive
734-
pred := opts.Predicate
735733
// For recursive lists, we need to make sure the key ended with "/" so that we only
736734
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
737735
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@@ -772,7 +770,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
772770
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
773771
}
774772

775-
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
773+
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, opts)
776774
if err != nil {
777775
return err
778776
}
@@ -790,7 +788,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
790788
if !ok {
791789
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
792790
}
793-
if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
791+
if opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
794792
selectedObjects = append(selectedObjects, elem.Object)
795793
lastSelectedObjectKey = elem.Key
796794
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3172,7 +3172,7 @@ func TestListIndexer(t *testing.T) {
31723172
for _, tt := range tests {
31733173
t.Run(tt.name, func(t *testing.T) {
31743174
pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields)
3175-
_, usedIndex, err := cacher.cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive)
3175+
_, usedIndex, err := cacher.cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, storage.ListOptions{Predicate: pred, Recursive: tt.recursive})
31763176
if err != nil {
31773177
t.Errorf("Unexpected error: %v", err)
31783178
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ func (s sortableStoreElements) Swap(i, j int) {
495495

496496
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
497497
// with their ResourceVersion and the name of the index, if any, that was used.
498-
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (resp listResp, index string, err error) {
498+
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) {
499499
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
500500
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
501501
w.waitingUntilFresh.Add()
@@ -513,7 +513,7 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
513513
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
514514
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
515515
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
516-
for _, matchValue := range matchValues {
516+
for _, matchValue := range opts.Predicate.MatcherIndex(ctx) {
517517
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
518518
result, err = filterPrefixAndOrder(key, result)
519519
return listResp{

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func TestEvents(t *testing.T) {
287287

288288
// Test for Added event.
289289
{
290-
_, err := store.getAllEventsSince(1, storage.ListOptions{})
290+
_, err := store.getAllEventsSince(1, storage.ListOptions{Predicate: storage.Everything})
291291
if err == nil {
292292
t.Errorf("expected error too old")
293293
}
@@ -296,7 +296,7 @@ func TestEvents(t *testing.T) {
296296
}
297297
}
298298
{
299-
result, err := store.getAllEventsSince(2, storage.ListOptions{})
299+
result, err := store.getAllEventsSince(2, storage.ListOptions{Predicate: storage.Everything})
300300
if err != nil {
301301
t.Errorf("unexpected error: %v", err)
302302
}
@@ -320,13 +320,13 @@ func TestEvents(t *testing.T) {
320320

321321
// Test with not full cache.
322322
{
323-
_, err := store.getAllEventsSince(1, storage.ListOptions{})
323+
_, err := store.getAllEventsSince(1, storage.ListOptions{Predicate: storage.Everything})
324324
if err == nil {
325325
t.Errorf("expected error too old")
326326
}
327327
}
328328
{
329-
result, err := store.getAllEventsSince(3, storage.ListOptions{})
329+
result, err := store.getAllEventsSince(3, storage.ListOptions{Predicate: storage.Everything})
330330
if err != nil {
331331
t.Errorf("unexpected error: %v", err)
332332
}
@@ -354,13 +354,13 @@ func TestEvents(t *testing.T) {
354354

355355
// Test with full cache - there should be elements from 5 to 9.
356356
{
357-
_, err := store.getAllEventsSince(3, storage.ListOptions{})
357+
_, err := store.getAllEventsSince(3, storage.ListOptions{Predicate: storage.Everything})
358358
if err == nil {
359359
t.Errorf("expected error too old")
360360
}
361361
}
362362
{
363-
result, err := store.getAllEventsSince(4, storage.ListOptions{})
363+
result, err := store.getAllEventsSince(4, storage.ListOptions{Predicate: storage.Everything})
364364
if err != nil {
365365
t.Errorf("unexpected error: %v", err)
366366
}
@@ -379,7 +379,7 @@ func TestEvents(t *testing.T) {
379379
store.Delete(makeTestPod("pod", uint64(10)))
380380

381381
{
382-
result, err := store.getAllEventsSince(9, storage.ListOptions{})
382+
result, err := store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything})
383383
if err != nil {
384384
t.Errorf("unexpected error: %v", err)
385385
}
@@ -410,13 +410,13 @@ func TestMarker(t *testing.T) {
410410
makeTestPod("pod2", 9),
411411
}, "9")
412412

413-
_, err := store.getAllEventsSince(8, storage.ListOptions{})
413+
_, err := store.getAllEventsSince(8, storage.ListOptions{Predicate: storage.Everything})
414414
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
415415
t.Errorf("unexpected error: %v", err)
416416
}
417417
// Getting events from 8 should return no events,
418418
// even though there is a marker there.
419-
result, err := store.getAllEventsSince(9, storage.ListOptions{})
419+
result, err := store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything})
420420
if err != nil {
421421
t.Fatalf("unexpected error: %v", err)
422422
}
@@ -427,7 +427,7 @@ func TestMarker(t *testing.T) {
427427
pod := makeTestPod("pods", 12)
428428
store.Add(pod)
429429
// Getting events from 8 should still work and return one event.
430-
result, err = store.getAllEventsSince(9, storage.ListOptions{})
430+
result, err = store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything})
431431
if err != nil {
432432
t.Fatalf("unexpected error: %v", err)
433433
}
@@ -466,7 +466,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
466466
}()
467467

468468
// list by empty MatchValues.
469-
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
469+
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.Everything})
470470
if err != nil {
471471
t.Fatalf("unexpected error: %v", err)
472472
}
@@ -481,11 +481,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
481481
}
482482

483483
// list by label index.
484-
matchValues := []storage.MatchValue{
485-
{IndexName: "l:label", Value: "value1"},
486-
{IndexName: "f:spec.nodeName", Value: "node2"},
487-
}
488-
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
484+
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{
485+
Label: labels.SelectorFromSet(map[string]string{
486+
"label": "value1",
487+
}),
488+
Field: fields.SelectorFromSet(map[string]string{
489+
"spec.nodeName": "node2",
490+
}),
491+
IndexLabels: []string{"label"},
492+
}})
489493
if err != nil {
490494
t.Fatalf("unexpected error: %v", err)
491495
}
@@ -500,11 +504,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
500504
}
501505

502506
// list with spec.nodeName index.
503-
matchValues = []storage.MatchValue{
504-
{IndexName: "l:not-exist-label", Value: "whatever"},
505-
{IndexName: "f:spec.nodeName", Value: "node2"},
506-
}
507-
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
507+
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{
508+
Label: labels.SelectorFromSet(map[string]string{
509+
"not-exist-label": "whatever",
510+
}),
511+
Field: fields.SelectorFromSet(map[string]string{
512+
"spec.nodeName": "node2",
513+
}),
514+
IndexFields: []string{"spec.nodeName"},
515+
}})
508516
if err != nil {
509517
t.Fatalf("unexpected error: %v", err)
510518
}
@@ -519,10 +527,13 @@ func TestWaitUntilFreshAndList(t *testing.T) {
519527
}
520528

521529
// list with index not exists.
522-
matchValues = []storage.MatchValue{
523-
{IndexName: "l:not-exist-label", Value: "whatever"},
524-
}
525-
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
530+
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{
531+
Label: labels.SelectorFromSet(map[string]string{
532+
"not-exist-label": "whatever",
533+
}),
534+
Field: fields.Everything(),
535+
IndexLabels: []string{"label"},
536+
}})
526537
if err != nil {
527538
t.Fatalf("unexpected error: %v", err)
528539
}
@@ -550,7 +561,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
550561
}()
551562

552563
// list from future revision. Requires watch cache to request bookmark to get it.
553-
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
564+
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", storage.ListOptions{Predicate: storage.Everything})
554565
if err != nil {
555566
t.Fatalf("unexpected error: %v", err)
556567
}
@@ -630,7 +641,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
630641
store.Add(makeTestPod("bar", 4))
631642
}()
632643

633-
_, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
644+
_, _, err := store.WaitUntilFreshAndList(ctx, 4, "", storage.ListOptions{Predicate: storage.Everything})
634645
if !errors.IsTimeout(err) {
635646
t.Errorf("expected timeout error but got: %v", err)
636647
}
@@ -659,7 +670,7 @@ func TestReflectorForWatchCache(t *testing.T) {
659670
defer store.Stop()
660671

661672
{
662-
resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
673+
resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", storage.ListOptions{Predicate: storage.Everything})
663674
if err != nil {
664675
t.Fatalf("unexpected error: %v", err)
665676
}
@@ -682,7 +693,7 @@ func TestReflectorForWatchCache(t *testing.T) {
682693
r.ListAndWatch(wait.NeverStop)
683694

684695
{
685-
resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
696+
resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", storage.ListOptions{Predicate: storage.Everything})
686697
if err != nil {
687698
t.Fatalf("unexpected error: %v", err)
688699
}
@@ -994,7 +1005,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
9941005
// Force cache resize.
9951006
addEvent("key4", 50, later.Add(time.Second))
9961007

997-
_, err := store.getAllEventsSince(15, storage.ListOptions{})
1008+
_, err := store.getAllEventsSince(15, storage.ListOptions{Predicate: storage.Everything})
9981009
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
9991010
t.Errorf("unexpected error: %v", err)
10001011
}

0 commit comments

Comments
 (0)