Skip to content

Commit 6ff0354

Browse files
authored
Merge pull request kubernetes#130399 from serathius/cache-delegator
Rename CacheProxy to CacheDelegator
2 parents 39c640f + 4c635ec commit 6ff0354

File tree

8 files changed

+92
-91
lines changed

8 files changed

+92
-91
lines changed

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func StorageWithCacher() generic.StorageDecorator {
7979
})
8080
}
8181

82-
return cacherstorage.NewCacheProxy(cacher, s), destroyFunc, nil
82+
return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil
8383
}
8484
}
8585

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2459,7 +2459,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
24592459
}
24602460
}
24612461
d := destroyFunc
2462-
s = cacherstorage.NewCacheProxy(cacher, s)
2462+
s = cacherstorage.NewCacheDelegator(cacher, s)
24632463
destroyFunc = func() {
24642464
cacher.Stop()
24652465
d()

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -694,36 +694,6 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
694694
return nil
695695
}
696696

697-
// NOTICE: Keep in sync with shouldListFromStorage function in
698-
//
699-
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
700-
func shouldDelegateList(opts storage.ListOptions) bool {
701-
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
702-
switch opts.ResourceVersionMatch {
703-
case metav1.ResourceVersionMatchExact:
704-
return true
705-
case metav1.ResourceVersionMatchNotOlderThan:
706-
case "":
707-
// Legacy exact match
708-
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
709-
return true
710-
}
711-
default:
712-
return true
713-
}
714-
// Continue
715-
if len(opts.Predicate.Continue) > 0 {
716-
return true
717-
}
718-
// Consistent Read
719-
if opts.ResourceVersion == "" {
720-
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
721-
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
722-
return !consistentListFromCacheEnabled || !requestWatchProgressSupported
723-
}
724-
return false
725-
}
726-
727697
// computeListLimit determines whether the cacher should
728698
// apply a limit to an incoming LIST request and returns its value.
729699
//
@@ -738,14 +708,6 @@ func computeListLimit(opts storage.ListOptions) int64 {
738708
return opts.Predicate.Limit
739709
}
740710

741-
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
742-
pred := opts.Predicate
743-
noLabelSelector := pred.Label == nil || pred.Label.Empty()
744-
noFieldSelector := pred.Field == nil || pred.Field.Empty()
745-
hasLimit := pred.Limit > 0
746-
return noLabelSelector && noFieldSelector && hasLimit
747-
}
748-
749711
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) {
750712
if !recursive {
751713
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -455,12 +455,12 @@ func withNodeNameAndNamespaceIndex(options *setupOptions) {
455455
}
456456
}
457457

458-
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheProxy, tearDownFunc) {
458+
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheDelegator, tearDownFunc) {
459459
ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
460460
return ctx, cacher, tearDown
461461
}
462462

463-
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheProxy, *etcd3testing.EtcdTestServer, tearDownFunc) {
463+
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheDelegator, *etcd3testing.EtcdTestServer, tearDownFunc) {
464464
setupOpts := setupOptions{}
465465
opts = append([]setupOption{withDefaults}, opts...)
466466
for _, opt := range opts {
@@ -514,7 +514,7 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
514514
}
515515
}
516516

517-
return ctx, NewCacheProxy(cacher, wrappedStorage), server, terminate
517+
return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate
518518
}
519519

520520
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
@@ -525,20 +525,20 @@ func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (stora
525525
t.Fatalf("unexpected error waiting for the cache to be ready")
526526
}
527527
}
528-
return &createWrapper{CacheProxy: cacher}, tearDown
528+
return &createWrapper{CacheDelegator: cacher}, tearDown
529529
}
530530

531531
type createWrapper struct {
532-
*CacheProxy
532+
*CacheDelegator
533533
}
534534

535535
func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
536-
if err := c.CacheProxy.Create(ctx, key, obj, out, ttl); err != nil {
536+
if err := c.CacheDelegator.Create(ctx, key, obj, out, ttl); err != nil {
537537
return err
538538
}
539539
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
540-
currentObj := c.CacheProxy.cacher.newFunc()
541-
err := c.CacheProxy.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
540+
currentObj := c.CacheDelegator.cacher.newFunc()
541+
err := c.CacheDelegator.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
542542
if err != nil {
543543
if storage.IsNotFound(err) {
544544
return false, nil

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string {
7878
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
7979
}
8080

81-
func compactStorage(c *CacheProxy, client *clientv3.Client) storagetesting.Compaction {
81+
func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction {
8282
return func(ctx context.Context, t *testing.T, resourceVersion string) {
8383
versioner := storage.APIObjectVersioner{}
8484
rv, err := versioner.ParseResourceVersion(resourceVersion)

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
317317
t.Fatalf("Couldn't create cacher: %v", err)
318318
}
319319
defer cacher.Stop()
320-
proxy := NewCacheProxy(cacher, backingStorage)
320+
delegator := NewCacheDelegator(cacher, backingStorage)
321321
result := &example.PodList{}
322322
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
323323
if err := cacher.ready.wait(context.Background()); err != nil {
@@ -342,7 +342,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
342342
return nil
343343
}
344344
}
345-
err = proxy.GetList(context.TODO(), "pods/ns", options, result)
345+
err = delegator.GetList(context.TODO(), "pods/ns", options, result)
346346
gotBypass := errors.Is(err, errDummy)
347347
if err != nil && !gotBypass {
348348
t.Fatalf("Unexpected error for List request with options: %v, err: %v", options, err)
@@ -441,7 +441,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
441441
t.Fatalf("Couldn't create cacher: %v", err)
442442
}
443443
defer cacher.Stop()
444-
proxy := NewCacheProxy(cacher, backingStorage)
444+
delegator := NewCacheDelegator(cacher, backingStorage)
445445
if err := cacher.ready.wait(context.Background()); err != nil {
446446
t.Fatalf("unexpected error waiting for the cache to be ready")
447447
}
@@ -484,7 +484,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
484484
}
485485

486486
start := cacher.clock.Now()
487-
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
487+
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
488488
clockStepCancelFn()
489489
duration := cacher.clock.Since(start)
490490
if (err != nil) != tc.expectError {
@@ -516,7 +516,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
516516
t.Fatalf("Couldn't create cacher: %v", err)
517517
}
518518
defer cacher.Stop()
519-
proxy := NewCacheProxy(cacher, backingStorage)
519+
delegator := NewCacheDelegator(cacher, backingStorage)
520520

521521
pred := storage.SelectionPredicate{
522522
Limit: 500,
@@ -531,15 +531,15 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
531531

532532
// Inject error to underlying layer and check if cacher is not bypassed.
533533
backingStorage.injectError(errDummy)
534-
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
534+
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{
535535
ResourceVersion: "0",
536536
Predicate: pred,
537537
}, result)
538538
if err != nil {
539539
t.Errorf("GetList with Limit and RV=0 should be served from cache: %v", err)
540540
}
541541

542-
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
542+
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{
543543
ResourceVersion: "",
544544
Predicate: pred,
545545
}, result)
@@ -555,7 +555,7 @@ func TestGetCacheBypass(t *testing.T) {
555555
t.Fatalf("Couldn't create cacher: %v", err)
556556
}
557557
defer cacher.Stop()
558-
proxy := NewCacheProxy(cacher, backingStorage)
558+
delegator := NewCacheDelegator(cacher, backingStorage)
559559

560560
result := &example.Pod{}
561561

@@ -567,15 +567,15 @@ func TestGetCacheBypass(t *testing.T) {
567567

568568
// Inject error to underlying layer and check if cacher is not bypassed.
569569
backingStorage.injectError(errDummy)
570-
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
570+
err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
571571
IgnoreNotFound: true,
572572
ResourceVersion: "0",
573573
}, result)
574574
if err != nil {
575575
t.Errorf("Get with RV=0 should be served from cache: %v", err)
576576
}
577577

578-
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
578+
err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
579579
IgnoreNotFound: true,
580580
ResourceVersion: "",
581581
}, result)
@@ -591,23 +591,23 @@ func TestWatchCacheBypass(t *testing.T) {
591591
t.Fatalf("Couldn't create cacher: %v", err)
592592
}
593593
defer cacher.Stop()
594-
proxy := NewCacheProxy(cacher, backingStorage)
594+
delegator := NewCacheDelegator(cacher, backingStorage)
595595

596596
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
597597
if err := cacher.ready.wait(context.Background()); err != nil {
598598
t.Fatalf("unexpected error waiting for the cache to be ready")
599599
}
600600
}
601601

602-
_, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{
602+
_, err = delegator.Watch(context.TODO(), "pod/ns", storage.ListOptions{
603603
ResourceVersion: "0",
604604
Predicate: storage.Everything,
605605
})
606606
if err != nil {
607607
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
608608
}
609609

610-
_, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{
610+
_, err = delegator.Watch(context.TODO(), "pod/ns", storage.ListOptions{
611611
ResourceVersion: "",
612612
Predicate: storage.Everything,
613613
})
@@ -628,7 +628,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
628628
t.Fatalf("Couldn't create cacher: %v", err)
629629
}
630630
defer cacher.Stop()
631-
proxy := NewCacheProxy(cacher, backingStorage)
631+
delegator := NewCacheDelegator(cacher, backingStorage)
632632

633633
opts := storage.ListOptions{
634634
ResourceVersion: "0",
@@ -640,15 +640,15 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
640640
defer listCancel()
641641

642642
result := &example.PodList{}
643-
err = proxy.GetList(listCtx, "/pods/ns", opts, result)
643+
err = delegator.GetList(listCtx, "/pods/ns", opts, result)
644644
if err != nil && apierrors.IsTooManyRequests(err) {
645645
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for List")
646646
}
647647

648648
watchCtx, watchCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
649649
defer watchCancel()
650650

651-
_, err = proxy.Watch(watchCtx, "/pods/ns", opts)
651+
_, err = delegator.Watch(watchCtx, "/pods/ns", opts)
652652
if err != nil && apierrors.IsTooManyRequests(err) {
653653
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for Watch")
654654
}
@@ -873,15 +873,15 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
873873
if err != nil {
874874
t.Fatalf("Couldn't create cacher: %v", err)
875875
}
876-
proxy := NewCacheProxy(cacher, backingStorage)
876+
delegator := NewCacheDelegator(cacher, backingStorage)
877877

878878
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
879879
if err := cacher.ready.wait(context.Background()); err != nil {
880880
t.Fatalf("unexpected error waiting for the cache to be ready")
881881
}
882882
}
883883

884-
w, err := proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
884+
w, err := delegator.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
885885
if err != nil {
886886
t.Fatalf("Failed to create watch: %v", err)
887887
}
@@ -901,13 +901,13 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
901901

902902
cacher.Stop()
903903

904-
_, err = proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
904+
_, err = delegator.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
905905
if err == nil {
906906
t.Fatalf("Success to create Watch: %v", err)
907907
}
908908

909909
result := &example.Pod{}
910-
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
910+
err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
911911
IgnoreNotFound: true,
912912
ResourceVersion: "1",
913913
}, result)
@@ -922,7 +922,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
922922
}
923923

924924
listResult := &example.PodList{}
925-
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
925+
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{
926926
ResourceVersion: "1",
927927
Recursive: true,
928928
Predicate: storage.SelectionPredicate{
@@ -2291,7 +2291,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
22912291
b.Fatalf("new cacher: %v", err)
22922292
}
22932293
defer cacher.Stop()
2294-
proxy := NewCacheProxy(cacher, store)
2294+
delegator := NewCacheDelegator(cacher, store)
22952295

22962296
// prepare result and pred
22972297
parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
@@ -2307,7 +2307,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
23072307
b.ResetTimer()
23082308
for i := 0; i < b.N; i++ {
23092309
result := &example.PodList{}
2310-
err = proxy.GetList(context.TODO(), "pods", storage.ListOptions{
2310+
err = delegator.GetList(context.TODO(), "pods", storage.ListOptions{
23112311
Predicate: pred,
23122312
Recursive: true,
23132313
ResourceVersion: "12345",
@@ -3179,8 +3179,8 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
31793179
Predicate: storage.Everything,
31803180
}
31813181
result := &example.PodList{}
3182-
proxy := NewCacheProxy(cacher, backingStorage)
3183-
err = proxy.GetList(context.TODO(), "/pods/ns", opts, result)
3182+
delegator := NewCacheDelegator(cacher, backingStorage)
3183+
err = delegator.GetList(context.TODO(), "/pods/ns", opts, result)
31843184

31853185
if !apierrors.IsTooManyRequests(err) {
31863186
t.Fatalf("Unexpected GetList error: %v", err)

0 commit comments

Comments
 (0)