Skip to content

Commit cace64a

Browse files
authored
Merge pull request kubernetes#129443 from serathius/watchcache-proxy
Watchcache proxy
2 parents 20e1944 + 4a4fc9d commit cace64a

File tree

7 files changed

+281
-219
lines changed

7 files changed

+281
-219
lines changed

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func StorageWithCacher() generic.StorageDecorator {
7979
})
8080
}
8181

82-
return cacher, destroyFunc, nil
82+
return cacherstorage.NewCacheProxy(cacher, s), destroyFunc, nil
8383
}
8484
}
8585

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2459,7 +2459,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
24592459
}
24602460
}
24612461
d := destroyFunc
2462-
s = cacher
2462+
s = cacherstorage.NewCacheProxy(cacher, s)
24632463
destroyFunc = func() {
24642464
cacher.Stop()
24652465
d()

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 5 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,6 @@ type Cacher struct {
339339
expiredBookmarkWatchers []*cacheWatcher
340340
}
341341

342-
func (c *Cacher) RequestWatchProgress(ctx context.Context) error {
343-
return c.storage.RequestWatchProgress(ctx)
344-
}
345-
346342
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
347343
// its internal cache and updating its cache in the background based on the
348344
// given configuration.
@@ -495,56 +491,13 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
495491
}
496492
}
497493

498-
// Versioner implements storage.Interface.
499-
func (c *Cacher) Versioner() storage.Versioner {
500-
return c.storage.Versioner()
501-
}
502-
503-
// Create implements storage.Interface.
504-
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
505-
return c.storage.Create(ctx, key, obj, out, ttl)
506-
}
507-
508-
// Delete implements storage.Interface.
509-
func (c *Cacher) Delete(
510-
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
511-
validateDeletion storage.ValidateObjectFunc, _ runtime.Object, opts storage.DeleteOptions) error {
512-
// Ignore the suggestion and try to pass down the current version of the object
513-
// read from cache.
514-
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
515-
klog.Errorf("GetByKey returned error: %v", err)
516-
} else if exists {
517-
// DeepCopy the object since we modify resource version when serializing the
518-
// current object.
519-
currObj := elem.(*storeElement).Object.DeepCopyObject()
520-
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts)
521-
}
522-
// If we couldn't get the object, fallback to no-suggestion.
523-
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts)
524-
}
525-
526494
type namespacedName struct {
527495
namespace string
528496
name string
529497
}
530498

531-
// Watch implements storage.Interface.
532499
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
533500
pred := opts.Predicate
534-
// if the watch-list feature wasn't set and the resourceVersion is unset
535-
// ensure that the rv from which the watch is being served, is the latest
536-
// one. "latest" is ensured by serving the watch from
537-
// the underlying storage.
538-
//
539-
// it should never happen due to our validation but let's just be super-safe here
540-
// and disable sendingInitialEvents when the feature wasn't enabled
541-
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
542-
opts.SendInitialEvents = nil
543-
}
544-
// TODO: we should eventually get rid of this legacy case
545-
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
546-
return c.storage.Watch(ctx, key, opts)
547-
}
548501
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
549502
if err != nil {
550503
return nil, err
@@ -709,58 +662,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
709662
return watcher, nil
710663
}
711664

712-
// Get implements storage.Interface.
713665
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
714-
ctx, span := tracing.Start(ctx, "cacher.Get",
715-
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
716-
attribute.String("key", key),
717-
attribute.String("resource-version", opts.ResourceVersion))
718-
defer span.End(500 * time.Millisecond)
719-
if opts.ResourceVersion == "" {
720-
// If resourceVersion is not specified, serve it from underlying
721-
// storage (for backward compatibility).
722-
span.AddEvent("About to Get from underlying storage")
723-
return c.storage.Get(ctx, key, opts, objPtr)
724-
}
725-
726-
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
727-
if !c.ready.check() {
728-
// If Cache is not initialized, delegate Get requests to storage
729-
// as described in https://kep.k8s.io/4568
730-
span.AddEvent("About to Get from underlying storage - cache not initialized")
731-
return c.storage.Get(ctx, key, opts, objPtr)
732-
}
733-
}
734-
735-
// If resourceVersion is specified, serve it from cache.
736-
// It's guaranteed that the returned value is at least that
737-
// fresh as the given resourceVersion.
738666
getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
739667
if err != nil {
740668
return err
741669
}
742670

743-
// Do not create a trace - it's not for free and there are tons
744-
// of Get requests. We can add it if it will be really needed.
745-
746-
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
747-
if getRV == 0 && !c.ready.check() {
748-
// If Cacher is not yet initialized and we don't require any specific
749-
// minimal resource version, simply forward the request to storage.
750-
span.AddEvent("About to Get from underlying storage - cache not initialized and no resourceVersion set")
751-
return c.storage.Get(ctx, key, opts, objPtr)
752-
}
753-
if err := c.ready.wait(ctx); err != nil {
754-
return errors.NewServiceUnavailable(err.Error())
755-
}
756-
}
757-
758671
objVal, err := conversion.EnforcePtr(objPtr)
759672
if err != nil {
760673
return err
761674
}
762675

763-
span.AddEvent("About to fetch object from cache")
764676
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, getRV, key)
765677
if err != nil {
766678
return err
@@ -853,32 +765,9 @@ type listResp struct {
853765
}
854766

855767
// GetList implements storage.Interface
856-
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
768+
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
857769
recursive := opts.Recursive
858-
resourceVersion := opts.ResourceVersion
859770
pred := opts.Predicate
860-
if shouldDelegateList(opts) {
861-
return c.storage.GetList(ctx, key, opts, listObj)
862-
}
863-
864-
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
865-
if err != nil {
866-
return err
867-
}
868-
869-
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
870-
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
871-
// If Cacher is not initialized, delegate List requests to storage
872-
// as described in https://kep.k8s.io/4568
873-
return c.storage.GetList(ctx, key, opts, listObj)
874-
}
875-
} else {
876-
if listRV == 0 && !c.ready.check() {
877-
// If Cacher is not yet initialized and we don't require any specific
878-
// minimal resource version, simply forward the request to storage.
879-
return c.storage.GetList(ctx, key, opts, listObj)
880-
}
881-
}
882771
// For recursive lists, we need to make sure the key ended with "/" so that we only
883772
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
884773
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@@ -887,14 +776,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
887776
if opts.Recursive && !strings.HasSuffix(key, "/") {
888777
preparedKey += "/"
889778
}
890-
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
891-
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
892-
if consistentRead {
893-
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
894-
if err != nil {
895-
return err
896-
}
897-
}
898779

899780
ctx, span := tracing.Start(ctx, "cacher.GetList",
900781
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
@@ -928,24 +809,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
928809
}
929810

930811
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
931-
success := "true"
932-
fallback := "false"
933812
if err != nil {
934-
if consistentRead {
935-
if storage.IsTooLargeResourceVersion(err) {
936-
fallback = "true"
937-
err = c.storage.GetList(ctx, key, opts, listObj)
938-
}
939-
if err != nil {
940-
success = "false"
941-
}
942-
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
943-
}
944813
return err
945814
}
946-
if consistentRead {
947-
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
948-
}
949815
span.AddEvent("Listed items from cache", attribute.Int("count", len(resp.Items)))
950816
// store pointer of eligible objects,
951817
// Why not directly put object in the items of listObj?
@@ -995,37 +861,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
995861
return nil
996862
}
997863

998-
// GuaranteedUpdate implements storage.Interface.
999-
func (c *Cacher) GuaranteedUpdate(
1000-
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
1001-
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ runtime.Object) error {
1002-
// Ignore the suggestion and try to pass down the current version of the object
1003-
// read from cache.
1004-
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
1005-
klog.Errorf("GetByKey returned error: %v", err)
1006-
} else if exists {
1007-
// DeepCopy the object since we modify resource version when serializing the
1008-
// current object.
1009-
currObj := elem.(*storeElement).Object.DeepCopyObject()
1010-
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj)
1011-
}
1012-
// If we couldn't get the object, fallback to no-suggestion.
1013-
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
1014-
}
1015-
1016-
// Count implements storage.Interface.
1017-
func (c *Cacher) Count(pathPrefix string) (int64, error) {
1018-
return c.storage.Count(pathPrefix)
1019-
}
1020-
1021-
// ReadinessCheck implements storage.Interface.
1022-
func (c *Cacher) ReadinessCheck() error {
1023-
if !c.ready.check() {
1024-
return storage.ErrStorageNotReady
1025-
}
1026-
return nil
1027-
}
1028-
1029864
// baseObjectThreadUnsafe omits locking for cachingObject.
1030865
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
1031866
if co, ok := object.(*cachingObject); ok {
@@ -1502,6 +1337,10 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
15021337
}
15031338
}
15041339

1340+
func (c *Cacher) Ready() bool {
1341+
return c.ready.check()
1342+
}
1343+
15051344
// errWatcher implements watch.Interface to return a single error
15061345
type errWatcher struct {
15071346
result chan watch.Event

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -455,12 +455,12 @@ func withNodeNameAndNamespaceIndex(options *setupOptions) {
455455
}
456456
}
457457

458-
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
458+
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheProxy, tearDownFunc) {
459459
ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
460460
return ctx, cacher, tearDown
461461
}
462462

463-
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *Cacher, *etcd3testing.EtcdTestServer, tearDownFunc) {
463+
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheProxy, *etcd3testing.EtcdTestServer, tearDownFunc) {
464464
setupOpts := setupOptions{}
465465
opts = append([]setupOption{withDefaults}, opts...)
466466
for _, opt := range opts {
@@ -514,31 +514,31 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
514514
}
515515
}
516516

517-
return ctx, cacher, server, terminate
517+
return ctx, NewCacheProxy(cacher, wrappedStorage), server, terminate
518518
}
519519

520520
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
521521
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
522522

523523
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
524-
if err := cacher.ready.wait(context.TODO()); err != nil {
524+
if err := cacher.cacher.ready.wait(context.TODO()); err != nil {
525525
t.Fatalf("unexpected error waiting for the cache to be ready")
526526
}
527527
}
528-
return &createWrapper{Cacher: cacher}, tearDown
528+
return &createWrapper{CacheProxy: cacher}, tearDown
529529
}
530530

531531
type createWrapper struct {
532-
*Cacher
532+
*CacheProxy
533533
}
534534

535535
func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
536-
if err := c.Cacher.Create(ctx, key, obj, out, ttl); err != nil {
536+
if err := c.CacheProxy.Create(ctx, key, obj, out, ttl); err != nil {
537537
return err
538538
}
539539
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
540-
currentObj := c.Cacher.newFunc()
541-
err := c.Cacher.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
540+
currentObj := c.CacheProxy.cacher.newFunc()
541+
err := c.CacheProxy.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
542542
if err != nil {
543543
if storage.IsNotFound(err) {
544544
return false, nil

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,45 +78,45 @@ func computePodKey(obj *example.Pod) string {
7878
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
7979
}
8080

81-
func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compaction {
81+
func compactStorage(c *CacheProxy, client *clientv3.Client) storagetesting.Compaction {
8282
return func(ctx context.Context, t *testing.T, resourceVersion string) {
8383
versioner := storage.APIObjectVersioner{}
8484
rv, err := versioner.ParseResourceVersion(resourceVersion)
8585
if err != nil {
8686
t.Fatal(err)
8787
}
8888

89-
err = c.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
89+
err = c.cacher.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
9090
if err != nil {
9191
t.Fatalf("WatchCache didn't caught up to RV: %v", rv)
9292
}
93-
c.watchCache.RUnlock()
93+
c.cacher.watchCache.RUnlock()
9494

95-
c.watchCache.Lock()
96-
defer c.watchCache.Unlock()
97-
c.Lock()
98-
defer c.Unlock()
95+
c.cacher.watchCache.Lock()
96+
defer c.cacher.watchCache.Unlock()
97+
c.cacher.Lock()
98+
defer c.cacher.Unlock()
9999

100-
if c.watchCache.resourceVersion < rv {
100+
if c.cacher.watchCache.resourceVersion < rv {
101101
t.Fatalf("Can't compact into a future version: %v", resourceVersion)
102102
}
103103

104-
if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 {
104+
if len(c.cacher.watchers.allWatchers) > 0 || len(c.cacher.watchers.valueWatchers) > 0 {
105105
// We could consider terminating those watchers, but given
106106
// watchcache doesn't really support compaction and we don't
107107
// exercise it in tests, we just throw an error here.
108108
t.Error("Open watchers are not supported during compaction")
109109
}
110110

111-
for c.watchCache.startIndex < c.watchCache.endIndex {
112-
index := c.watchCache.startIndex % c.watchCache.capacity
113-
if c.watchCache.cache[index].ResourceVersion > rv {
111+
for c.cacher.watchCache.startIndex < c.cacher.watchCache.endIndex {
112+
index := c.cacher.watchCache.startIndex % c.cacher.watchCache.capacity
113+
if c.cacher.watchCache.cache[index].ResourceVersion > rv {
114114
break
115115
}
116116

117-
c.watchCache.startIndex++
117+
c.cacher.watchCache.startIndex++
118118
}
119-
c.watchCache.listResourceVersion = rv
119+
c.cacher.watchCache.listResourceVersion = rv
120120

121121
if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil {
122122
t.Fatalf("Could not update compact_rev_key: %v", err)

0 commit comments

Comments
 (0)