Skip to content

Commit 64132a2

Browse files
authored
Merge pull request #73 from p0lyn0mial/kcp-1.23-watch-cache
fix the watch cache
2 parents a8b2000 + bf61e8d commit 64132a2

File tree

12 files changed

+243
-39
lines changed

12 files changed

+243
-39
lines changed

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/kcp_rest_options_getter.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@ import (
44
"fmt"
55
"strings"
66

7+
"github.com/kcp-dev/logicalcluster"
8+
79
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
810
"k8s.io/apimachinery/pkg/runtime/schema"
11+
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
912
"k8s.io/apiserver/pkg/registry/generic"
13+
"k8s.io/apiserver/pkg/storage/storagebackend"
1014
)
1115

1216
type apiBindingAwareCRDRESTOptionsGetter struct {
@@ -20,6 +24,9 @@ func (t apiBindingAwareCRDRESTOptionsGetter) GetRESTOptions(resource schema.Grou
2024
return ret, err
2125
}
2226

27+
// assign some KCP metadata that are used by the reflector from the watch cache
28+
ret.StorageConfig.KcpExtraStorageMetadata = &storagebackend.KcpStorageMetadata{IsCRD: true}
29+
2330
// Priority 1: wildcard partial metadata requests. These have been assigned a fake UID that ends with
2431
// .wildcard.partial-metadata. If this is present, we don't want to modify the ResourcePrefix, which means that
2532
// a wildcard partial metadata list/watch request will return every CR from every CRD for that group-resource, which
@@ -30,9 +37,11 @@ func (t apiBindingAwareCRDRESTOptionsGetter) GetRESTOptions(resource schema.Grou
3037
// - /registry/mygroup.io/widgets/identity1234/...
3138
// - /registry/mygroup.io/widgets/identity4567/...
3239
if strings.HasSuffix(string(t.crd.UID), ".wildcard.partial-metadata") {
40+
ret.StorageConfig.KcpExtraStorageMetadata.Cluster = genericapirequest.Cluster{Name: logicalcluster.Wildcard, PartialMetadataRequest: true}
3341
return ret, nil
3442
}
3543

44+
ret.StorageConfig.KcpExtraStorageMetadata.Cluster = genericapirequest.Cluster{Name: logicalcluster.Wildcard}
3645
// Normal CRDs (not coming from an APIBinding) are stored in e.g. /registry/mygroup.io/widgets/customresources/...
3746
if _, bound := t.crd.Annotations["apis.kcp.dev/bound-crd"]; !bound {
3847
ret.ResourcePrefix += "/customresources"

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package registry
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"sync"
2223

@@ -38,7 +39,7 @@ func StorageWithCacher() generic.StorageDecorator {
3839
return func(
3940
storageConfig *storagebackend.ConfigForResource,
4041
resourcePrefix string,
41-
keyFunc func(obj runtime.Object) (string, error),
42+
keyFunc func(ctx context.Context, obj runtime.Object) (string, error),
4243
newFunc func() runtime.Object,
4344
newListFunc func() runtime.Object,
4445
getAttrsFunc storage.AttrFunc,
@@ -54,16 +55,17 @@ func StorageWithCacher() generic.StorageDecorator {
5455
}
5556

5657
cacherConfig := cacherstorage.Config{
57-
Storage: s,
58-
Versioner: etcd3.APIObjectVersioner{},
59-
ResourcePrefix: resourcePrefix,
60-
KeyFunc: keyFunc,
61-
NewFunc: newFunc,
62-
NewListFunc: newListFunc,
63-
GetAttrsFunc: getAttrsFunc,
64-
IndexerFuncs: triggerFuncs,
65-
Indexers: indexers,
66-
Codec: storageConfig.Codec,
58+
Storage: s,
59+
Versioner: etcd3.APIObjectVersioner{},
60+
ResourcePrefix: resourcePrefix,
61+
KeyFunc: keyFunc,
62+
NewFunc: newFunc,
63+
NewListFunc: newListFunc,
64+
GetAttrsFunc: getAttrsFunc,
65+
IndexerFuncs: triggerFuncs,
66+
Indexers: indexers,
67+
Codec: storageConfig.Codec,
68+
KcpExtraStorageMetadata: storageConfig.KcpExtraStorageMetadata,
6769
}
6870
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
6971
if err != nil {

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,17 +1415,17 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
14151415

14161416
// We adapt the store's keyFunc so that we can use it with the StorageDecorator
14171417
// without making any assumptions about where objects are stored in etcd
1418-
keyFunc := func(obj runtime.Object) (string, error) {
1418+
keyFunc := func(ctx context.Context, obj runtime.Object) (string, error) {
14191419
accessor, err := meta.Accessor(obj)
14201420
if err != nil {
14211421
return "", err
14221422
}
14231423

14241424
if isNamespaced {
1425-
return e.KeyFunc(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName())
1425+
return e.KeyFunc(genericapirequest.WithNamespace(ctx, accessor.GetNamespace()), accessor.GetName())
14261426
}
14271427

1428-
return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName())
1428+
return e.KeyFunc(ctx, accessor.GetName())
14291429
}
14301430

14311431
if e.DeleteCollectionWorkers == 0 {

staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package generic
1818

1919
import (
20+
"context"
21+
2022
"k8s.io/apimachinery/pkg/runtime"
2123
"k8s.io/apiserver/pkg/storage"
2224
"k8s.io/apiserver/pkg/storage/storagebackend"
@@ -29,7 +31,7 @@ import (
2931
type StorageDecorator func(
3032
config *storagebackend.ConfigForResource,
3133
resourcePrefix string,
32-
keyFunc func(obj runtime.Object) (string, error),
34+
keyFunc func(ctx context.Context, obj runtime.Object) (string, error),
3335
newFunc func() runtime.Object,
3436
newListFunc func() runtime.Object,
3537
getAttrsFunc storage.AttrFunc,
@@ -41,7 +43,7 @@ type StorageDecorator func(
4143
func UndecoratedStorage(
4244
config *storagebackend.ConfigForResource,
4345
resourcePrefix string,
44-
keyFunc func(obj runtime.Object) (string, error),
46+
keyFunc func(ctx context.Context, obj runtime.Object) (string, error),
4547
newFunc func() runtime.Object,
4648
newListFunc func() runtime.Object,
4749
getAttrsFunc storage.AttrFunc,

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"sync"
2525
"time"
2626

27+
"github.com/kcp-dev/logicalcluster"
28+
2729
"k8s.io/apimachinery/pkg/api/errors"
2830
"k8s.io/apimachinery/pkg/api/meta"
2931
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -34,9 +36,12 @@ import (
3436
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3537
"k8s.io/apimachinery/pkg/util/wait"
3638
"k8s.io/apimachinery/pkg/watch"
39+
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
3740
"k8s.io/apiserver/pkg/features"
41+
kcpapi "k8s.io/apiserver/pkg/kcp"
3842
"k8s.io/apiserver/pkg/storage"
3943
"k8s.io/apiserver/pkg/storage/cacher/metrics"
44+
"k8s.io/apiserver/pkg/storage/storagebackend"
4045
utilfeature "k8s.io/apiserver/pkg/util/feature"
4146
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
4247
"k8s.io/client-go/tools/cache"
@@ -73,7 +78,7 @@ type Config struct {
7378
ResourcePrefix string
7479

7580
// KeyFunc is used to get a key in the underlying storage for a given object.
76-
KeyFunc func(runtime.Object) (string, error)
81+
KeyFunc func(context.Context, runtime.Object) (string, error)
7782

7883
// GetAttrsFunc is used to get object labels, fields
7984
GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, err error)
@@ -96,6 +101,9 @@ type Config struct {
96101
Codec runtime.Codec
97102

98103
Clock clock.Clock
104+
105+
// KcpExtraStorageMetadata holds metadata used by the watchCache's reflector to instruct the storage layer how to assign/extract the cluster name
106+
KcpExtraStorageMetadata *storagebackend.KcpStorageMetadata
99107
}
100108

101109
type watchersMap map[int]*cacheWatcher
@@ -365,9 +373,15 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
365373
<-cacher.timer.C
366374
}
367375

376+
// empty storage metadata usually indicate build-in resources
377+
// for those we require only a WildCard cluster to be present in the ctx
378+
if config.KcpExtraStorageMetadata == nil {
379+
config.KcpExtraStorageMetadata = &storagebackend.KcpStorageMetadata{Cluster: genericapirequest.Cluster{Name: logicalcluster.Wildcard}}
380+
}
381+
368382
watchCache := newWatchCache(
369383
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
370-
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
384+
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, config.KcpExtraStorageMetadata)
371385
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
372386

373387
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
@@ -1101,17 +1115,19 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
11011115

11021116
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
11031117
type cacherListerWatcher struct {
1104-
storage storage.Interface
1105-
resourcePrefix string
1106-
newListFunc func() runtime.Object
1118+
storage storage.Interface
1119+
resourcePrefix string
1120+
newListFunc func() runtime.Object
1121+
kcpExtraStorageMetadata *storagebackend.KcpStorageMetadata
11071122
}
11081123

11091124
// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
1110-
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
1125+
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object, kcpExtraStorageMetadata *storagebackend.KcpStorageMetadata) cache.ListerWatcher {
11111126
return &cacherListerWatcher{
1112-
storage: storage,
1113-
resourcePrefix: resourcePrefix,
1114-
newListFunc: newListFunc,
1127+
storage: storage,
1128+
resourcePrefix: resourcePrefix,
1129+
newListFunc: newListFunc,
1130+
kcpExtraStorageMetadata: kcpExtraStorageMetadata,
11151131
}
11161132
}
11171133

@@ -1125,7 +1141,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
11251141
Continue: options.Continue,
11261142
}
11271143

1128-
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersionMatch: options.ResourceVersionMatch, Predicate: pred}, list); err != nil {
1144+
if err := lw.storage.List(createKCPClusterAwareContext(lw.kcpExtraStorageMetadata), lw.resourcePrefix, storage.ListOptions{ResourceVersionMatch: options.ResourceVersionMatch, Predicate: pred}, list); err != nil {
11291145
return nil, err
11301146
}
11311147
return list, nil
@@ -1140,7 +1156,15 @@ func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interfac
11401156
if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) {
11411157
opts.ProgressNotify = true
11421158
}
1143-
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts)
1159+
return lw.storage.WatchList(createKCPClusterAwareContext(lw.kcpExtraStorageMetadata), lw.resourcePrefix, opts)
1160+
}
1161+
1162+
func createKCPClusterAwareContext(meta *storagebackend.KcpStorageMetadata) context.Context {
1163+
ctx := context.Background()
1164+
if meta.IsCRD {
1165+
ctx = kcpapi.WithCustomResourceIndicator(ctx)
1166+
}
1167+
return genericapirequest.WithCluster(ctx, meta.Cluster)
11441168
}
11451169

11461170
// errWatcher implements watch.Interface to return a single error

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -265,12 +265,14 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
265265
Storage: s,
266266
Versioner: testVersioner{},
267267
ResourcePrefix: prefix,
268-
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
269-
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
270-
NewFunc: func() runtime.Object { return &example.Pod{} },
271-
NewListFunc: func() runtime.Object { return &example.PodList{} },
272-
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
273-
Clock: clock.RealClock{},
268+
KeyFunc: func(ctx context.Context, obj runtime.Object) (string, error) {
269+
return storage.NamespaceKeyFunc(prefix, obj)
270+
},
271+
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
272+
NewFunc: func() runtime.Object { return &example.Pod{} },
273+
NewListFunc: func() runtime.Object { return &example.PodList{} },
274+
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
275+
Clock: clock.RealClock{},
274276
}
275277
cacher, err := NewCacherFromConfig(config)
276278
return cacher, testVersioner{}, err

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package cacher
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"reflect"
2223
"sort"
@@ -148,7 +149,7 @@ type watchCache struct {
148149
lowerBoundCapacity int
149150

150151
// keyFunc is used to get a key in the underlying storage for a given object.
151-
keyFunc func(runtime.Object) (string, error)
152+
keyFunc func(context.Context, runtime.Object) (string, error)
152153

153154
// getAttrsFunc is used to get labels and fields of an object.
154155
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
@@ -192,7 +193,7 @@ type watchCache struct {
192193
}
193194

194195
func newWatchCache(
195-
keyFunc func(runtime.Object) (string, error),
196+
keyFunc func(context.Context, runtime.Object) (string, error),
196197
eventHandler func(*watchCacheEvent),
197198
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
198199
versioner storage.Versioner,
@@ -273,10 +274,11 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
273274
// processEvent is safe as long as there is at most one call to it in flight
274275
// at any point in time.
275276
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
276-
key, err := w.keyFunc(event.Object)
277+
key, err := w.keyFunc(createClusterAwareContext(event.Object), event.Object)
277278
if err != nil {
278279
return fmt.Errorf("couldn't compute key: %v", err)
279280
}
281+
280282
elem := &storeElement{Key: key, Object: event.Object}
281283
elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object)
282284
if err != nil {
@@ -502,7 +504,7 @@ func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
502504
if !ok {
503505
return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
504506
}
505-
key, err := w.keyFunc(object)
507+
key, err := w.keyFunc(createClusterAwareContext(object), object)
506508
if err != nil {
507509
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
508510
}
@@ -528,7 +530,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
528530
if !ok {
529531
return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
530532
}
531-
key, err := w.keyFunc(object)
533+
key, err := w.keyFunc(createClusterAwareContext(object), object)
532534
if err != nil {
533535
return fmt.Errorf("couldn't compute key: %v", err)
534536
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package cacher
2+
3+
import (
4+
"context"
5+
6+
"github.com/kcp-dev/logicalcluster"
7+
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/runtime"
10+
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
11+
"k8s.io/klog/v2"
12+
)
13+
14+
// clusterNameGetter as of today it satisfies unstructured.Unstructured since it implements metav1.Object directly
15+
type clusterNameGetter interface {
16+
GetClusterName() string
17+
}
18+
19+
// createClusterAwareContext extracts the clusterName from the given object and puts it into a context
20+
// the context is used by the key function to compute the key under which the object will be stored
21+
//
22+
// background:
23+
//
24+
// resources in the db are stored without the clusterName, since the reflector used by the cache uses the logicalcluster.Wildcard
25+
// the clusterName will be assigned to object by the storage layer upon retrieval.
26+
// we need take it into consideration and change the key to contain the clusterName
27+
// because this is how clients are going to be retrieving data from the cache.
28+
func createClusterAwareContext(object runtime.Object) context.Context {
29+
var clusterName string
30+
31+
switch t := object.(type) {
32+
case metav1.ObjectMetaAccessor:
33+
clusterName = t.GetObjectMeta().GetClusterName()
34+
case clusterNameGetter:
35+
clusterName = t.GetClusterName()
36+
default:
37+
klog.Warningf("unknown object, could not get a clusterName and a namespace from: %T", object)
38+
return context.Background()
39+
}
40+
41+
return genericapirequest.WithCluster(context.Background(), genericapirequest.Cluster{Name: logicalcluster.New(clusterName)})
42+
}

0 commit comments

Comments
 (0)