Skip to content

Commit c1b5e45

Browse files
committed
CachedResource replication reconciler relies on supplied GVR
The reconciler should rely on the GVR passed from the controller instead of having to reach out to REST mapper. On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent 0a9e613 commit c1b5e45

File tree

3 files changed

+36
-40
lines changed

3 files changed

+36
-40
lines changed

pkg/reconciler/cache/cachedresources/cachedresources_reconcile_replication.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
106106
r.shardName,
107107
r.dynamicCacheClient,
108108
r.kcpCacheClient,
109-
r.dynRESTMapper,
110109
gvr,
111110
replicated,
112111
callback,

pkg/reconciler/cache/cachedresources/replication/replication_controller.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
3838
"github.com/kcp-dev/kcp/pkg/indexers"
3939
"github.com/kcp-dev/kcp/pkg/logging"
40-
"github.com/kcp-dev/kcp/pkg/reconciler/dynamicrestmapper"
4140
cachev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/cache/v1alpha1"
4241
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
4342
)
@@ -56,7 +55,6 @@ func NewController(
5655
shardName string,
5756
dynamicCacheClient kcpdynamic.ClusterInterface,
5857
kcpCacheClient kcpclientset.ClusterInterface,
59-
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper,
6058
gvr schema.GroupVersionResource,
6159
replicated *ReplicatedGVR,
6260
callback func(),
@@ -73,17 +71,15 @@ func NewController(
7371
dynamicCacheClient: dynamicCacheClient,
7472
kcpCacheClient: kcpCacheClient,
7573
replicated: replicated,
76-
gvr: gvr,
7774
callback: callback,
78-
dynRESTMapper: dynRESTMapper,
7975
cleanupFuncs: make([]func(), 0),
8076
localLabelSelector: localLabelSelector,
8177
}
8278

8379
localHandler, err := c.replicated.Local.AddEventHandler(cache.ResourceEventHandlerFuncs{
84-
AddFunc: func(obj interface{}) { c.enqueueObject(obj, c.gvr) },
85-
UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, c.gvr) },
86-
DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, c.gvr) },
80+
AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
81+
UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, gvr) },
82+
DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
8783
})
8884
if err != nil {
8985
return nil, err
@@ -211,9 +207,6 @@ type Controller struct {
211207
kcpCacheClient kcpclientset.ClusterInterface
212208

213209
replicated *ReplicatedGVR
214-
gvr schema.GroupVersionResource
215-
216-
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
217210

218211
// callback is called when we want to trigger parent object reconciliation.
219212
// Cache state is being managed by child controller, so we need to trigger parent object reconciliation

pkg/reconciler/cache/cachedresources/replication/replication_reconcile.go

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
5959
return fmt.Errorf("incorrect key: %v, expected group.version.resource::key", gvrKey)
6060
}
6161
gvrParts := strings.SplitN(keyParts[0], ".", 3)
62-
gvr := schema.GroupVersionResource{Version: gvrParts[0], Resource: gvrParts[1], Group: gvrParts[2]}
62+
gvrFromKey := schema.GroupVersionResource{Version: gvrParts[0], Resource: gvrParts[1], Group: gvrParts[2]}
6363

6464
// Key will present in the form of namespace/name in the current logical cluster.
6565
key := keyParts[1]
@@ -68,6 +68,7 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
6868
shardName: c.shardName,
6969
localLabelSelector: c.localLabelSelector,
7070
getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
71+
gvr := gvrFromKey
7172
key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name)
7273
obj, exists, err := c.replicated.Local.GetIndexer().GetByKey(key)
7374
if !exists {
@@ -92,7 +93,12 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
9293
u.SetAPIVersion(gvr.GroupVersion().String())
9394
return u, nil
9495
},
95-
getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string, gvr schema.GroupVersionResource) (*unstructured.Unstructured, error) {
96+
getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
97+
gvr := gvrFromKey
98+
if gvr.Group == "" {
99+
gvr.Group = "core"
100+
}
101+
96102
key := GVRAndShardAndLogicalClusterAndNamespaceKey(gvr, c.shardName, cluster, namespace, name)
97103
objs, err := c.replicated.Global.GetIndexer().ByIndex(ByGVRAndShardAndLogicalClusterAndNamespaceAndName, key)
98104
if err != nil {
@@ -121,26 +127,22 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
121127
return u, nil
122128
},
123129
createObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
124-
gvk := obj.GroupVersionKind()
125-
if gvk.Group == "" {
126-
gvk.Group = "core"
130+
gvr := gvrFromKey
131+
if gvr.Group == "" {
132+
gvr.Group = "core"
127133
}
128134

129135
objBytes, err := json.Marshal(obj)
130136
if err != nil {
131137
return nil, err
132138
}
133-
mapper, err := c.dynRESTMapper.ForCluster(cluster).RESTMapping(gvk.GroupKind(), gvk.Version)
134-
if err != nil {
135-
return nil, err
136-
}
137139
cacheObj := &cachev1alpha1.CachedObject{
138140
TypeMeta: metav1.TypeMeta{
139141
Kind: cache.CachedObjectKind,
140142
APIVersion: cachev1alpha1.SchemeGroupVersion.String(),
141143
},
142144
ObjectMeta: metav1.ObjectMeta{
143-
Name: gvr.Version + "." + mapper.Resource.Resource + "." + gvr.Group + "." + obj.GetName(), // TODO: handle namespace
145+
Name: gvr.Version + "." + gvr.Resource + "." + gvr.Group + "." + obj.GetName(), // TODO: handle namespace
144146
Labels: obj.GetLabels(),
145147
Annotations: obj.GetAnnotations(),
146148
CreationTimestamp: metav1.NewTime(time.Now()),
@@ -153,38 +155,34 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
153155
cacheObj.Labels = map[string]string{}
154156
}
155157
// Append schema label to the object.
156-
cacheObj.Labels[LabelKeyObjectSchema] = gvr.Version + "." + mapper.Resource.Resource + "." + gvr.Group
158+
cacheObj.Labels[LabelKeyObjectSchema] = gvr.Version + "." + gvr.Resource + "." + gvr.Group
157159
cacheObj.Labels[LabelKeyObjectGroup] = gvr.Group
158160
cacheObj.Labels[LabelKeyObjectVersion] = gvr.Version
159-
cacheObj.Labels[LabelKeyObjectResource] = mapper.Resource.Resource
161+
cacheObj.Labels[LabelKeyObjectResource] = gvr.Resource
160162
cacheObj.Labels[LabelKeyObjectOriginalName] = obj.GetName()
161163
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = obj.GetNamespace()
162164

163165
u, err := c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Create(ctx, cacheObj, metav1.CreateOptions{})
164166
return u, err
165167
},
166168
updateObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
167-
gvk := obj.GroupVersionKind()
168-
if gvk.Group == "" {
169-
gvk.Group = "core"
169+
gvr := gvrFromKey
170+
if gvr.Group == "" {
171+
gvr.Group = "core"
170172
}
171173

172174
objBytes, err := json.Marshal(obj)
173175
if err != nil {
174176
return nil, err
175177
}
176178

177-
mapper, err := c.dynRESTMapper.ForCluster(cluster).RESTMapping(gvk.GroupKind(), gvk.Version)
178-
if err != nil {
179-
return nil, err
180-
}
181179
cacheObj := &cachev1alpha1.CachedObject{
182180
TypeMeta: metav1.TypeMeta{
183181
Kind: cache.CachedObjectKind,
184182
APIVersion: cachev1alpha1.SchemeGroupVersion.String(),
185183
},
186184
ObjectMeta: metav1.ObjectMeta{
187-
Name: gvr.Version + "." + mapper.Resource.Resource + "." + gvr.Group + "." + obj.GetName(),
185+
Name: gvr.Version + "." + gvr.Resource + "." + gvr.Group + "." + obj.GetName(),
188186
Labels: obj.GetLabels(),
189187
Annotations: obj.GetAnnotations(),
190188
ResourceVersion: obj.GetResourceVersion(),
@@ -197,23 +195,29 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
197195
cacheObj.Labels = map[string]string{}
198196
}
199197
// Append schema label to the object.
200-
cacheObj.Labels[LabelKeyObjectSchema] = gvr.Version + "." + mapper.Resource.Resource + "." + gvr.Group
198+
cacheObj.Labels[LabelKeyObjectSchema] = gvr.Version + "." + gvr.Resource + "." + gvr.Group
201199
cacheObj.Labels[LabelKeyObjectGroup] = gvr.Group
202200
cacheObj.Labels[LabelKeyObjectVersion] = gvr.Version
203-
cacheObj.Labels[LabelKeyObjectResource] = mapper.Resource.Resource
201+
cacheObj.Labels[LabelKeyObjectResource] = gvr.Resource
204202
cacheObj.Labels[LabelKeyObjectOriginalName] = obj.GetName()
205203
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = obj.GetNamespace()
206204

207205
return c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Update(ctx, cacheObj, metav1.UpdateOptions{})
208206
},
209-
deleteObject: func(ctx context.Context, cluster logicalcluster.Name, ns, name string, gvr schema.GroupVersionResource) error {
207+
deleteObject: func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error {
210208
// deleting from cache - means we delete the wrapper object
209+
210+
gvr := gvrFromKey
211+
if gvr.Group == "" {
212+
gvr.Group = "core"
213+
}
214+
211215
nameCache := gvr.Version + "." + gvr.Resource + "." + gvr.Group + "." + name // TODO: handle namespace
212216
return c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Delete(ctx, nameCache, metav1.DeleteOptions{})
213217
},
214218
}
215219
defer c.callback()
216-
return r.reconcile(ctx, gvr, key)
220+
return r.reconcile(ctx, key)
217221
}
218222

219223
type replicationReconciler struct {
@@ -222,19 +226,19 @@ type replicationReconciler struct {
222226
localLabelSelector labels.Selector
223227

224228
getLocalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
225-
getGlobalCopy func(cluster logicalcluster.Name, namespace, name string, gvr schema.GroupVersionResource) (*unstructured.Unstructured, error)
229+
getGlobalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
226230

227231
createObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
228232
updateObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
229-
deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string, gvr schema.GroupVersionResource) error
233+
deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error
230234
}
231235

232236
// reconcile makes sure that the object under the given key from the local shard is replicated to the cache server.
233237
// the replication function handles the following cases:
234238
// 1. creation of the object in the cache server when the cached object is not found by getGlobalCopy
235239
// 2. deletion of the object from the cache server when the original/local object was removed OR was not found by getLocalCopy
236240
// 3. modification of the cached object to match the original one when meta.annotations, meta.labels, spec or status are different
237-
func (r *replicationReconciler) reconcile(ctx context.Context, gvr schema.GroupVersionResource, key string) error {
241+
func (r *replicationReconciler) reconcile(ctx context.Context, key string) error {
238242
if r.deleted {
239243
return nil
240244
}
@@ -259,7 +263,7 @@ func (r *replicationReconciler) reconcile(ctx context.Context, gvr schema.GroupV
259263
return nil
260264
}
261265

262-
globalCopy, err := r.getGlobalCopy(clusterName, ns, name, gvr)
266+
globalCopy, err := r.getGlobalCopy(clusterName, ns, name)
263267
if err != nil && !apierrors.IsNotFound(err) {
264268
utilruntime.HandleError(err)
265269
return nil
@@ -274,7 +278,7 @@ func (r *replicationReconciler) reconcile(ctx context.Context, gvr schema.GroupV
274278

275279
// Object doesn't exist anymore, delete it from the global cache.
276280
logger.V(2).WithValues("cluster", clusterName, "namespace", ns, "name", name).Info("Deleting object from global cache")
277-
if err := r.deleteObject(ctx, clusterName, ns, name, gvr); err != nil && !apierrors.IsNotFound(err) {
281+
if err := r.deleteObject(ctx, clusterName, ns, name); err != nil && !apierrors.IsNotFound(err) {
278282
return err
279283
}
280284

0 commit comments

Comments
 (0)