Skip to content

Commit 4a4fc9d

Browse files
committed
Extract and unify cache bypass logic by creating a CacheProxy struct
1 parent 75531cc commit 4a4fc9d

File tree

7 files changed

+281
-219
lines changed

7 files changed

+281
-219
lines changed

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func StorageWithCacher() generic.StorageDecorator {
7979
})
8080
}
8181

82-
return cacher, destroyFunc, nil
82+
return cacherstorage.NewCacheProxy(cacher, s), destroyFunc, nil
8383
}
8484
}
8585

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2459,7 +2459,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
24592459
}
24602460
}
24612461
d := destroyFunc
2462-
s = cacher
2462+
s = cacherstorage.NewCacheProxy(cacher, s)
24632463
destroyFunc = func() {
24642464
cacher.Stop()
24652465
d()

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 5 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,6 @@ type Cacher struct {
339339
expiredBookmarkWatchers []*cacheWatcher
340340
}
341341

342-
func (c *Cacher) RequestWatchProgress(ctx context.Context) error {
343-
return c.storage.RequestWatchProgress(ctx)
344-
}
345-
346342
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
347343
// its internal cache and updating its cache in the background based on the
348344
// given configuration.
@@ -495,56 +491,13 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
495491
}
496492
}
497493

498-
// Versioner implements storage.Interface.
499-
func (c *Cacher) Versioner() storage.Versioner {
500-
return c.storage.Versioner()
501-
}
502-
503-
// Create implements storage.Interface.
504-
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
505-
return c.storage.Create(ctx, key, obj, out, ttl)
506-
}
507-
508-
// Delete implements storage.Interface.
509-
func (c *Cacher) Delete(
510-
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
511-
validateDeletion storage.ValidateObjectFunc, _ runtime.Object, opts storage.DeleteOptions) error {
512-
// Ignore the suggestion and try to pass down the current version of the object
513-
// read from cache.
514-
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
515-
klog.Errorf("GetByKey returned error: %v", err)
516-
} else if exists {
517-
// DeepCopy the object since we modify resource version when serializing the
518-
// current object.
519-
currObj := elem.(*storeElement).Object.DeepCopyObject()
520-
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts)
521-
}
522-
// If we couldn't get the object, fallback to no-suggestion.
523-
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts)
524-
}
525-
526494
type namespacedName struct {
527495
namespace string
528496
name string
529497
}
530498

531-
// Watch implements storage.Interface.
532499
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
533500
pred := opts.Predicate
534-
// if the watch-list feature wasn't set and the resourceVersion is unset
535-
// ensure that the rv from which the watch is being served, is the latest
536-
// one. "latest" is ensured by serving the watch from
537-
// the underlying storage.
538-
//
539-
// it should never happen due to our validation but let's just be super-safe here
540-
// and disable sendingInitialEvents when the feature wasn't enabled
541-
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
542-
opts.SendInitialEvents = nil
543-
}
544-
// TODO: we should eventually get rid of this legacy case
545-
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
546-
return c.storage.Watch(ctx, key, opts)
547-
}
548501
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
549502
if err != nil {
550503
return nil, err
@@ -709,58 +662,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
709662
return watcher, nil
710663
}
711664

712-
// Get implements storage.Interface.
713665
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
714-
ctx, span := tracing.Start(ctx, "cacher.Get",
715-
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
716-
attribute.String("key", key),
717-
attribute.String("resource-version", opts.ResourceVersion))
718-
defer span.End(500 * time.Millisecond)
719-
if opts.ResourceVersion == "" {
720-
// If resourceVersion is not specified, serve it from underlying
721-
// storage (for backward compatibility).
722-
span.AddEvent("About to Get from underlying storage")
723-
return c.storage.Get(ctx, key, opts, objPtr)
724-
}
725-
726-
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
727-
if !c.ready.check() {
728-
// If Cache is not initialized, delegate Get requests to storage
729-
// as described in https://kep.k8s.io/4568
730-
span.AddEvent("About to Get from underlying storage - cache not initialized")
731-
return c.storage.Get(ctx, key, opts, objPtr)
732-
}
733-
}
734-
735-
// If resourceVersion is specified, serve it from cache.
736-
// It's guaranteed that the returned value is at least that
737-
// fresh as the given resourceVersion.
738666
getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
739667
if err != nil {
740668
return err
741669
}
742670

743-
// Do not create a trace - it's not for free and there are tons
744-
// of Get requests. We can add it if it will be really needed.
745-
746-
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
747-
if getRV == 0 && !c.ready.check() {
748-
// If Cacher is not yet initialized and we don't require any specific
749-
// minimal resource version, simply forward the request to storage.
750-
span.AddEvent("About to Get from underlying storage - cache not initialized and no resourceVersion set")
751-
return c.storage.Get(ctx, key, opts, objPtr)
752-
}
753-
if err := c.ready.wait(ctx); err != nil {
754-
return errors.NewServiceUnavailable(err.Error())
755-
}
756-
}
757-
758671
objVal, err := conversion.EnforcePtr(objPtr)
759672
if err != nil {
760673
return err
761674
}
762675

763-
span.AddEvent("About to fetch object from cache")
764676
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, getRV, key)
765677
if err != nil {
766678
return err
@@ -845,32 +757,9 @@ type listResp struct {
845757
}
846758

847759
// GetList implements storage.Interface
848-
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
760+
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
849761
recursive := opts.Recursive
850-
resourceVersion := opts.ResourceVersion
851762
pred := opts.Predicate
852-
if shouldDelegateList(opts) {
853-
return c.storage.GetList(ctx, key, opts, listObj)
854-
}
855-
856-
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
857-
if err != nil {
858-
return err
859-
}
860-
861-
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
862-
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
863-
// If Cacher is not initialized, delegate List requests to storage
864-
// as described in https://kep.k8s.io/4568
865-
return c.storage.GetList(ctx, key, opts, listObj)
866-
}
867-
} else {
868-
if listRV == 0 && !c.ready.check() {
869-
// If Cacher is not yet initialized and we don't require any specific
870-
// minimal resource version, simply forward the request to storage.
871-
return c.storage.GetList(ctx, key, opts, listObj)
872-
}
873-
}
874763
// For recursive lists, we need to make sure the key ended with "/" so that we only
875764
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
876765
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@@ -879,14 +768,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
879768
if opts.Recursive && !strings.HasSuffix(key, "/") {
880769
preparedKey += "/"
881770
}
882-
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
883-
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
884-
if consistentRead {
885-
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
886-
if err != nil {
887-
return err
888-
}
889-
}
890771

891772
ctx, span := tracing.Start(ctx, "cacher.GetList",
892773
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
@@ -920,24 +801,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
920801
}
921802

922803
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
923-
success := "true"
924-
fallback := "false"
925804
if err != nil {
926-
if consistentRead {
927-
if storage.IsTooLargeResourceVersion(err) {
928-
fallback = "true"
929-
err = c.storage.GetList(ctx, key, opts, listObj)
930-
}
931-
if err != nil {
932-
success = "false"
933-
}
934-
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
935-
}
936805
return err
937806
}
938-
if consistentRead {
939-
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
940-
}
941807
span.AddEvent("Listed items from cache", attribute.Int("count", len(resp.Items)))
942808
// store pointer of eligible objects,
943809
// Why not directly put object in the items of listObj?
@@ -987,37 +853,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
987853
return nil
988854
}
989855

990-
// GuaranteedUpdate implements storage.Interface.
991-
func (c *Cacher) GuaranteedUpdate(
992-
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
993-
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ runtime.Object) error {
994-
// Ignore the suggestion and try to pass down the current version of the object
995-
// read from cache.
996-
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
997-
klog.Errorf("GetByKey returned error: %v", err)
998-
} else if exists {
999-
// DeepCopy the object since we modify resource version when serializing the
1000-
// current object.
1001-
currObj := elem.(*storeElement).Object.DeepCopyObject()
1002-
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj)
1003-
}
1004-
// If we couldn't get the object, fallback to no-suggestion.
1005-
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
1006-
}
1007-
1008-
// Count implements storage.Interface.
1009-
func (c *Cacher) Count(pathPrefix string) (int64, error) {
1010-
return c.storage.Count(pathPrefix)
1011-
}
1012-
1013-
// ReadinessCheck implements storage.Interface.
1014-
func (c *Cacher) ReadinessCheck() error {
1015-
if !c.ready.check() {
1016-
return storage.ErrStorageNotReady
1017-
}
1018-
return nil
1019-
}
1020-
1021856
// baseObjectThreadUnsafe omits locking for cachingObject.
1022857
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
1023858
if co, ok := object.(*cachingObject); ok {
@@ -1494,6 +1329,10 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
14941329
}
14951330
}
14961331

1332+
func (c *Cacher) Ready() bool {
1333+
return c.ready.check()
1334+
}
1335+
14971336
// errWatcher implements watch.Interface to return a single error
14981337
type errWatcher struct {
14991338
result chan watch.Event

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -455,12 +455,12 @@ func withNodeNameAndNamespaceIndex(options *setupOptions) {
455455
}
456456
}
457457

458-
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
458+
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheProxy, tearDownFunc) {
459459
ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
460460
return ctx, cacher, tearDown
461461
}
462462

463-
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *Cacher, *etcd3testing.EtcdTestServer, tearDownFunc) {
463+
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheProxy, *etcd3testing.EtcdTestServer, tearDownFunc) {
464464
setupOpts := setupOptions{}
465465
opts = append([]setupOption{withDefaults}, opts...)
466466
for _, opt := range opts {
@@ -514,31 +514,31 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
514514
}
515515
}
516516

517-
return ctx, cacher, server, terminate
517+
return ctx, NewCacheProxy(cacher, wrappedStorage), server, terminate
518518
}
519519

520520
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
521521
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
522522

523523
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
524-
if err := cacher.ready.wait(context.TODO()); err != nil {
524+
if err := cacher.cacher.ready.wait(context.TODO()); err != nil {
525525
t.Fatalf("unexpected error waiting for the cache to be ready")
526526
}
527527
}
528-
return &createWrapper{Cacher: cacher}, tearDown
528+
return &createWrapper{CacheProxy: cacher}, tearDown
529529
}
530530

531531
type createWrapper struct {
532-
*Cacher
532+
*CacheProxy
533533
}
534534

535535
func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
536-
if err := c.Cacher.Create(ctx, key, obj, out, ttl); err != nil {
536+
if err := c.CacheProxy.Create(ctx, key, obj, out, ttl); err != nil {
537537
return err
538538
}
539539
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
540-
currentObj := c.Cacher.newFunc()
541-
err := c.Cacher.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
540+
currentObj := c.CacheProxy.cacher.newFunc()
541+
err := c.CacheProxy.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
542542
if err != nil {
543543
if storage.IsNotFound(err) {
544544
return false, nil

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,45 +78,45 @@ func computePodKey(obj *example.Pod) string {
7878
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
7979
}
8080

81-
func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compaction {
81+
func compactStorage(c *CacheProxy, client *clientv3.Client) storagetesting.Compaction {
8282
return func(ctx context.Context, t *testing.T, resourceVersion string) {
8383
versioner := storage.APIObjectVersioner{}
8484
rv, err := versioner.ParseResourceVersion(resourceVersion)
8585
if err != nil {
8686
t.Fatal(err)
8787
}
8888

89-
err = c.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
89+
err = c.cacher.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
9090
if err != nil {
9191
t.Fatalf("WatchCache didn't caught up to RV: %v", rv)
9292
}
93-
c.watchCache.RUnlock()
93+
c.cacher.watchCache.RUnlock()
9494

95-
c.watchCache.Lock()
96-
defer c.watchCache.Unlock()
97-
c.Lock()
98-
defer c.Unlock()
95+
c.cacher.watchCache.Lock()
96+
defer c.cacher.watchCache.Unlock()
97+
c.cacher.Lock()
98+
defer c.cacher.Unlock()
9999

100-
if c.watchCache.resourceVersion < rv {
100+
if c.cacher.watchCache.resourceVersion < rv {
101101
t.Fatalf("Can't compact into a future version: %v", resourceVersion)
102102
}
103103

104-
if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 {
104+
if len(c.cacher.watchers.allWatchers) > 0 || len(c.cacher.watchers.valueWatchers) > 0 {
105105
// We could consider terminating those watchers, but given
106106
// watchcache doesn't really support compaction and we don't
107107
// exercise it in tests, we just throw an error here.
108108
t.Error("Open watchers are not supported during compaction")
109109
}
110110

111-
for c.watchCache.startIndex < c.watchCache.endIndex {
112-
index := c.watchCache.startIndex % c.watchCache.capacity
113-
if c.watchCache.cache[index].ResourceVersion > rv {
111+
for c.cacher.watchCache.startIndex < c.cacher.watchCache.endIndex {
112+
index := c.cacher.watchCache.startIndex % c.cacher.watchCache.capacity
113+
if c.cacher.watchCache.cache[index].ResourceVersion > rv {
114114
break
115115
}
116116

117-
c.watchCache.startIndex++
117+
c.cacher.watchCache.startIndex++
118118
}
119-
c.watchCache.listResourceVersion = rv
119+
c.cacher.watchCache.listResourceVersion = rv
120120

121121
if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil {
122122
t.Fatalf("Could not update compact_rev_key: %v", err)

0 commit comments

Comments
 (0)