Skip to content

Commit e422e9a

Browse files
authored
Merge pull request kubernetes#91595 from jpbetz/get-list-storage-options
Introduce GetOptions and ListOptions to storage interface
2 parents 6dbb92d + 4c99949 commit e422e9a

File tree

15 files changed

+344
-194
lines changed

15 files changed

+344
-194
lines changed

pkg/master/reconcilers/lease.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ var _ Leases = &storageLeases{}
6363
// ListLeases retrieves a list of the current master IPs from storage
6464
func (s *storageLeases) ListLeases() ([]string, error) {
6565
ipInfoList := &corev1.EndpointsList{}
66-
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
66+
storageOpts := storage.ListOptions{
67+
ResourceVersion: "0",
68+
Predicate: storage.Everything,
69+
}
70+
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, storageOpts, ipInfoList); err != nil {
6771
return nil, err
6872
}
6973

pkg/registry/core/service/allocator/storage/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_test(
1414
"//pkg/apis/core:go_default_library",
1515
"//pkg/registry/core/service/allocator:go_default_library",
1616
"//pkg/registry/registrytest:go_default_library",
17+
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
1718
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
1819
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
1920
],

pkg/registry/core/service/allocator/storage/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (e *Etcd) tryUpdate(fn func() error) error {
190190
// etcd. If the key does not exist, the object will have an empty ResourceVersion.
191191
func (e *Etcd) Get() (*api.RangeAllocation, error) {
192192
existing := &api.RangeAllocation{}
193-
if err := e.storage.Get(context.TODO(), e.baseKey, "", existing, true); err != nil {
193+
if err := e.storage.Get(context.TODO(), e.baseKey, storage.GetOptions{IgnoreNotFound: true}, existing); err != nil {
194194
return nil, storeerr.InterpretGetError(err, e.resource, "")
195195
}
196196
return existing, nil

pkg/registry/core/service/allocator/storage/storage_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"strings"
2222
"testing"
2323

24+
apiserverstorage "k8s.io/apiserver/pkg/storage"
2425
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
2526
"k8s.io/apiserver/pkg/storage/storagebackend"
2627
api "k8s.io/kubernetes/pkg/apis/core"
@@ -80,7 +81,7 @@ func TestStore(t *testing.T) {
8081
other := allocator.NewAllocationMap(100, "rangeSpecValue")
8182

8283
allocation := &api.RangeAllocation{}
83-
if err := storage.storage.Get(context.TODO(), key(), "", allocation, false); err != nil {
84+
if err := storage.storage.Get(context.TODO(), key(), apiserverstorage.GetOptions{}, allocation); err != nil {
8485
t.Fatal(err)
8586
}
8687
if allocation.Range != "rangeSpecValue" {

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (s *DryRunnableStorage) Versioner() storage.Versioner {
3535

3636
func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error {
3737
if dryRun {
38-
if err := s.Storage.Get(ctx, key, "", out, false); err == nil {
38+
if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil {
3939
return storage.NewKeyExistsError(key, 0)
4040
}
4141
return s.copyInto(obj, out)
@@ -45,7 +45,7 @@ func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out ru
4545

4646
func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, dryRun bool) error {
4747
if dryRun {
48-
if err := s.Storage.Get(ctx, key, "", out, false); err != nil {
48+
if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err != nil {
4949
return err
5050
}
5151
if err := preconditions.Check(key, out); err != nil {
@@ -56,31 +56,31 @@ func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime
5656
return s.Storage.Delete(ctx, key, out, preconditions, deleteValidation)
5757
}
5858

59-
func (s *DryRunnableStorage) Watch(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) {
60-
return s.Storage.Watch(ctx, key, resourceVersion, p)
59+
func (s *DryRunnableStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
60+
return s.Storage.Watch(ctx, key, opts)
6161
}
6262

63-
func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) {
64-
return s.Storage.WatchList(ctx, key, resourceVersion, p)
63+
func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
64+
return s.Storage.WatchList(ctx, key, opts)
6565
}
6666

67-
func (s *DryRunnableStorage) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
68-
return s.Storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
67+
func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
68+
return s.Storage.Get(ctx, key, opts, objPtr)
6969
}
7070

71-
func (s *DryRunnableStorage) GetToList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error {
72-
return s.Storage.GetToList(ctx, key, resourceVersion, p, listObj)
71+
func (s *DryRunnableStorage) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
72+
return s.Storage.GetToList(ctx, key, opts, listObj)
7373
}
7474

75-
func (s *DryRunnableStorage) List(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error {
76-
return s.Storage.List(ctx, key, resourceVersion, p, listObj)
75+
func (s *DryRunnableStorage) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
76+
return s.Storage.List(ctx, key, opts, listObj)
7777
}
7878

7979
func (s *DryRunnableStorage) GuaranteedUpdate(
8080
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
8181
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, dryRun bool, suggestion ...runtime.Object) error {
8282
if dryRun {
83-
err := s.Storage.Get(ctx, key, "", ptrToType, ignoreNotFound)
83+
err := s.Storage.Get(ctx, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, ptrToType)
8484
if err != nil {
8585
return err
8686
}

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestDryRunCreateDoesntCreate(t *testing.T) {
6969
t.Fatalf("Failed to create new dry-run object: %v", err)
7070
}
7171

72-
err = s.Get(context.Background(), "key", "", out, false)
72+
err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
7373
if e, ok := err.(*storage.StorageError); !ok || e.Code != storage.ErrCodeKeyNotFound {
7474
t.Errorf("Expected key to be not found, error: %v", err)
7575
}
@@ -185,7 +185,7 @@ func TestDryRunUpdateDoesntUpdate(t *testing.T) {
185185
t.Fatalf("Failed to dry-run update: %v", err)
186186
}
187187
out := UnstructuredOrDie(`{}`)
188-
err = s.Get(context.Background(), "key", "", out, false)
188+
err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
189189
if !reflect.DeepEqual(created, out) {
190190
t.Fatalf("Returned object %q different from expected %q", created, out)
191191
}
@@ -239,7 +239,7 @@ func TestDryRunDeleteDoesntDelete(t *testing.T) {
239239
t.Fatalf("Failed to dry-run delete the object: %v", err)
240240
}
241241

242-
err = s.Get(context.Background(), "key", "", out, false)
242+
err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
243243
if err != nil {
244244
t.Fatalf("Failed to retrieve dry-run deleted object: %v", err)
245245
}

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -322,15 +322,16 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate,
322322
p.Continue = options.Continue
323323
list := e.NewListFunc()
324324
qualifiedResource := e.qualifiedResourceFromContext(ctx)
325+
storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: p}
325326
if name, ok := p.MatchesSingle(); ok {
326327
if key, err := e.KeyFunc(ctx, name); err == nil {
327-
err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list)
328+
err := e.Storage.GetToList(ctx, key, storageOpts, list)
328329
return list, storeerr.InterpretListError(err, qualifiedResource)
329330
}
330331
// if we cannot extract a key based on the current context, the optimization is skipped
331332
}
332333

333-
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list)
334+
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), storageOpts, list)
334335
return list, storeerr.InterpretListError(err, qualifiedResource)
335336
}
336337

@@ -367,7 +368,7 @@ func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation
367368
if !apierrors.IsAlreadyExists(err) {
368369
return nil, err
369370
}
370-
if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {
371+
if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
371372
return nil, err
372373
}
373374
accessor, errGetAcc := meta.Accessor(out)
@@ -608,7 +609,7 @@ func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions
608609
if err != nil {
609610
return nil, err
610611
}
611-
if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
612+
if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
612613
return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
613614
}
614615
if e.Decorator != nil {
@@ -892,7 +893,7 @@ func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.V
892893
}
893894
obj := e.NewFunc()
894895
qualifiedResource := e.qualifiedResourceFromContext(ctx)
895-
if err = e.Storage.Get(ctx, key, "", obj, false); err != nil {
896+
if err = e.Storage.Get(ctx, key, storage.GetOptions{}, obj); err != nil {
896897
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
897898
}
898899

@@ -1126,9 +1127,10 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
11261127

11271128
// WatchPredicate starts a watch for the items that matches.
11281129
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
1130+
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p}
11291131
if name, ok := p.MatchesSingle(); ok {
11301132
if key, err := e.KeyFunc(ctx, name); err == nil {
1131-
w, err := e.Storage.Watch(ctx, key, resourceVersion, p)
1133+
w, err := e.Storage.Watch(ctx, key, storageOpts)
11321134
if err != nil {
11331135
return nil, err
11341136
}
@@ -1141,7 +1143,7 @@ func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate
11411143
// optimization is skipped
11421144
}
11431145

1144-
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p)
1146+
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), storageOpts)
11451147
if err != nil {
11461148
return nil, err
11471149
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,9 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
431431
}
432432

433433
// Watch implements storage.Interface.
434-
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
435-
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion)
434+
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
435+
pred := opts.Predicate
436+
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
436437
if err != nil {
437438
return nil, err
438439
}
@@ -515,30 +516,30 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
515516
}
516517

517518
// WatchList implements storage.Interface.
518-
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
519-
return c.Watch(ctx, key, resourceVersion, pred)
519+
func (c *Cacher) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
520+
return c.Watch(ctx, key, opts)
520521
}
521522

522523
// Get implements storage.Interface.
523-
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
524-
if resourceVersion == "" {
524+
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
525+
if opts.ResourceVersion == "" {
525526
// If resourceVersion is not specified, serve it from underlying
526527
// storage (for backward compatibility).
527-
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
528+
return c.storage.Get(ctx, key, opts, objPtr)
528529
}
529530

530531
// If resourceVersion is specified, serve it from cache.
531532
// It's guaranteed that the returned value is at least that
532533
// fresh as the given resourceVersion.
533-
getRV, err := c.versioner.ParseResourceVersion(resourceVersion)
534+
getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
534535
if err != nil {
535536
return err
536537
}
537538

538539
if getRV == 0 && !c.ready.check() {
539540
// If Cacher is not yet initialized and we don't require any specific
540541
// minimal resource version, simply forward the request to storage.
541-
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
542+
return c.storage.Get(ctx, key, opts, objPtr)
542543
}
543544

544545
// Do not create a trace - it's not for free and there are tons
@@ -563,15 +564,17 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
563564
objVal.Set(reflect.ValueOf(elem.Object).Elem())
564565
} else {
565566
objVal.Set(reflect.Zero(objVal.Type()))
566-
if !ignoreNotFound {
567+
if !opts.IgnoreNotFound {
567568
return storage.NewKeyNotFoundError(key, int64(readResourceVersion))
568569
}
569570
}
570571
return nil
571572
}
572573

573574
// GetToList implements storage.Interface.
574-
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
575+
func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
576+
resourceVersion := opts.ResourceVersion
577+
pred := opts.Predicate
575578
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
576579
hasContinuation := pagingEnabled && len(pred.Continue) > 0
577580
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
@@ -582,7 +585,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
582585
// Limits are only sent to storage when resourceVersion is non-zero
583586
// since the watch cache isn't able to perform continuations, and
584587
// limits are ignored when resource version is zero
585-
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
588+
return c.storage.GetToList(ctx, key, opts, listObj)
586589
}
587590

588591
// If resourceVersion is specified, serve it from cache.
@@ -596,7 +599,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
596599
if listRV == 0 && !c.ready.check() {
597600
// If Cacher is not yet initialized and we don't require any specific
598601
// minimal resource version, simply forward the request to storage.
599-
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
602+
return c.storage.GetToList(ctx, key, opts, listObj)
600603
}
601604

602605
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
@@ -643,7 +646,9 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
643646
}
644647

645648
// List implements storage.Interface.
646-
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
649+
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
650+
resourceVersion := opts.ResourceVersion
651+
pred := opts.Predicate
647652
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
648653
hasContinuation := pagingEnabled && len(pred.Continue) > 0
649654
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
@@ -654,7 +659,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
654659
// Limits are only sent to storage when resourceVersion is non-zero
655660
// since the watch cache isn't able to perform continuations, and
656661
// limits are ignored when resource version is zero.
657-
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
662+
return c.storage.List(ctx, key, opts, listObj)
658663
}
659664

660665
// If resourceVersion is specified, serve it from cache.
@@ -668,7 +673,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
668673
if listRV == 0 && !c.ready.check() {
669674
// If Cacher is not yet initialized and we don't require any specific
670675
// minimal resource version, simply forward the request to storage.
671-
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
676+
return c.storage.List(ctx, key, opts, listObj)
672677
}
673678

674679
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
@@ -1083,15 +1088,15 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
10831088
Continue: options.Continue,
10841089
}
10851090

1086-
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil {
1091+
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{Predicate: pred}, list); err != nil {
10871092
return nil, err
10881093
}
10891094
return list, nil
10901095
}
10911096

10921097
// Implements cache.ListerWatcher interface.
10931098
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
1094-
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, storage.Everything)
1099+
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
10951100
}
10961101

10971102
// errWatcher implements watch.Interface to return a single error

0 commit comments

Comments
 (0)