Skip to content

Commit 8c661e0

Browse files
authored
Merge pull request #3465 from gman0/cachedresource-repl-fixes
CachedResource replication reconciler fixes
2 parents e5b7984 + 95c7b72 commit 8c661e0

File tree

3 files changed

+58
-53
lines changed

3 files changed

+58
-53
lines changed

pkg/reconciler/cache/cachedresources/cachedresources_reconcile_replication.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
106106
r.shardName,
107107
r.dynamicCacheClient,
108108
r.kcpCacheClient,
109-
r.dynRESTMapper,
110109
gvr,
111110
replicated,
112111
callback,

pkg/reconciler/cache/cachedresources/replication/replication_controller.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
3838
"github.com/kcp-dev/kcp/pkg/indexers"
3939
"github.com/kcp-dev/kcp/pkg/logging"
40-
"github.com/kcp-dev/kcp/pkg/reconciler/dynamicrestmapper"
4140
cachev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/cache/v1alpha1"
4241
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
4342
)
@@ -56,7 +55,6 @@ func NewController(
5655
shardName string,
5756
dynamicCacheClient kcpdynamic.ClusterInterface,
5857
kcpCacheClient kcpclientset.ClusterInterface,
59-
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper,
6058
gvr schema.GroupVersionResource,
6159
replicated *ReplicatedGVR,
6260
callback func(),
@@ -73,17 +71,15 @@ func NewController(
7371
dynamicCacheClient: dynamicCacheClient,
7472
kcpCacheClient: kcpCacheClient,
7573
replicated: replicated,
76-
gvr: gvr,
7774
callback: callback,
78-
dynRESTMapper: dynRESTMapper,
7975
cleanupFuncs: make([]func(), 0),
8076
localLabelSelector: localLabelSelector,
8177
}
8278

8379
localHandler, err := c.replicated.Local.AddEventHandler(cache.ResourceEventHandlerFuncs{
84-
AddFunc: func(obj interface{}) { c.enqueueObject(obj, c.gvr) },
85-
UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, c.gvr) },
86-
DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, c.gvr) },
80+
AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
81+
UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, gvr) },
82+
DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
8783
})
8884
if err != nil {
8985
return nil, err
@@ -211,9 +207,6 @@ type Controller struct {
211207
kcpCacheClient kcpclientset.ClusterInterface
212208

213209
replicated *ReplicatedGVR
214-
gvr schema.GroupVersionResource
215-
216-
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
217210

218211
// callback is called when we want to trigger parent object reconciliation.
219212
// Cache state is being managed by child controller, so we need to trigger parent object reconciliation

pkg/reconciler/cache/cachedresources/replication/replication_reconcile.go

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@ limitations under the License.
1717
package replication
1818

1919
import (
20+
"bytes"
2021
"context"
22+
"crypto/sha256"
2123
"encoding/json"
2224
"fmt"
2325
"strings"
2426
"time"
2527

28+
"github.com/martinlindhe/base36"
29+
2630
apierrors "k8s.io/apimachinery/pkg/api/errors"
2731
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2832
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -49,6 +53,18 @@ const (
4953
LabelKeyObjectOriginalNamespace = "cache.kcp.io/object-original-namespace"
5054
)
5155

56+
func GenCachedObjectName(gvr schema.GroupVersionResource, namespace, name string) string {
57+
buf := bytes.Buffer{}
58+
buf.WriteString(gvr.String())
59+
buf.WriteString(namespace)
60+
buf.WriteString(name)
61+
62+
hash := sha256.Sum256([]byte(name))
63+
base36hash := strings.ToLower(base36.EncodeBytes(hash[:]))
64+
65+
return base36hash
66+
}
67+
5268
func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
5369
if c.deleted {
5470
return nil
@@ -59,7 +75,7 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
5975
return fmt.Errorf("incorrect key: %v, expected group.version.resource::key", gvrKey)
6076
}
6177
gvrParts := strings.SplitN(keyParts[0], ".", 3)
62-
gvr := schema.GroupVersionResource{Version: gvrParts[0], Resource: gvrParts[1], Group: gvrParts[2]}
78+
gvrFromKey := schema.GroupVersionResource{Version: gvrParts[0], Resource: gvrParts[1], Group: gvrParts[2]}
6379

6480
// Key will present in the form of namespace/name in the current logical cluster.
6581
key := keyParts[1]
@@ -68,6 +84,7 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
6884
shardName: c.shardName,
6985
localLabelSelector: c.localLabelSelector,
7086
getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
87+
gvr := gvrFromKey
7188
key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name)
7289
obj, exists, err := c.replicated.Local.GetIndexer().GetByKey(key)
7390
if !exists {
@@ -92,7 +109,12 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
92109
u.SetAPIVersion(gvr.GroupVersion().String())
93110
return u, nil
94111
},
95-
getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string, gvr schema.GroupVersionResource) (*unstructured.Unstructured, error) {
112+
getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
113+
gvr := gvrFromKey
114+
if gvr.Group == "" {
115+
gvr.Group = "core"
116+
}
117+
96118
key := GVRAndShardAndLogicalClusterAndNamespaceKey(gvr, c.shardName, cluster, namespace, name)
97119
objs, err := c.replicated.Global.GetIndexer().ByIndex(ByGVRAndShardAndLogicalClusterAndNamespaceAndName, key)
98120
if err != nil {
@@ -121,26 +143,22 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
121143
return u, nil
122144
},
123145
createObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
124-
gvk := obj.GroupVersionKind()
125-
if gvk.Group == "" {
126-
gvk.Group = "core"
146+
gvr := gvrFromKey
147+
if gvr.Group == "" {
148+
gvr.Group = "core"
127149
}
128150

129151
objBytes, err := json.Marshal(obj)
130152
if err != nil {
131153
return nil, err
132154
}
133-
mapper, err := c.dynRESTMapper.ForCluster(cluster).RESTMapping(gvk.GroupKind(), gvk.Version)
134-
if err != nil {
135-
return nil, err
136-
}
137155
cacheObj := &cachev1alpha1.CachedObject{
138156
TypeMeta: metav1.TypeMeta{
139157
Kind: cache.CachedObjectKind,
140158
APIVersion: cachev1alpha1.SchemeGroupVersion.String(),
141159
},
142160
ObjectMeta: metav1.ObjectMeta{
143-
Name: gvr.Version + "." + mapper.Resource.Resource + "." + gvr.Group + "." + obj.GetName(), // TODO: handle namespace
161+
Name: GenCachedObjectName(gvr, obj.GetNamespace(), obj.GetName()),
144162
Labels: obj.GetLabels(),
145163
Annotations: obj.GetAnnotations(),
146164
CreationTimestamp: metav1.NewTime(time.Now()),
@@ -153,38 +171,34 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
153171
cacheObj.Labels = map[string]string{}
154172
}
155173
// Append schema label to the object.
156-
cacheObj.Labels[LabelKeyObjectSchema] = gvr.Version + "." + mapper.Resource.Resource + "." + gvr.Group
174+
cacheObj.Labels[LabelKeyObjectSchema] = gvr.Version + "." + gvr.Resource + "." + gvr.Group
157175
cacheObj.Labels[LabelKeyObjectGroup] = gvr.Group
158176
cacheObj.Labels[LabelKeyObjectVersion] = gvr.Version
159-
cacheObj.Labels[LabelKeyObjectResource] = mapper.Resource.Resource
177+
cacheObj.Labels[LabelKeyObjectResource] = gvr.Resource
160178
cacheObj.Labels[LabelKeyObjectOriginalName] = obj.GetName()
161179
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = obj.GetNamespace()
162180

163181
u, err := c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Create(ctx, cacheObj, metav1.CreateOptions{})
164182
return u, err
165183
},
166184
updateObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
167-
gvk := obj.GroupVersionKind()
168-
if gvk.Group == "" {
169-
gvk.Group = "core"
185+
gvr := gvrFromKey
186+
if gvr.Group == "" {
187+
gvr.Group = "core"
170188
}
171189

172190
objBytes, err := json.Marshal(obj)
173191
if err != nil {
174192
return nil, err
175193
}
176194

177-
mapper, err := c.dynRESTMapper.ForCluster(cluster).RESTMapping(gvk.GroupKind(), gvk.Version)
178-
if err != nil {
179-
return nil, err
180-
}
181195
cacheObj := &cachev1alpha1.CachedObject{
182196
TypeMeta: metav1.TypeMeta{
183197
Kind: cache.CachedObjectKind,
184198
APIVersion: cachev1alpha1.SchemeGroupVersion.String(),
185199
},
186200
ObjectMeta: metav1.ObjectMeta{
187-
Name: gvr.Version + "." + mapper.Resource.Resource + "." + gvr.Group + "." + obj.GetName(),
201+
Name: GenCachedObjectName(gvr, obj.GetNamespace(), obj.GetName()),
188202
Labels: obj.GetLabels(),
189203
Annotations: obj.GetAnnotations(),
190204
ResourceVersion: obj.GetResourceVersion(),
@@ -197,23 +211,32 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
197211
cacheObj.Labels = map[string]string{}
198212
}
199213
// Append schema label to the object.
200-
cacheObj.Labels[LabelKeyObjectSchema] = gvr.Version + "." + mapper.Resource.Resource + "." + gvr.Group
214+
cacheObj.Labels[LabelKeyObjectSchema] = gvr.Version + "." + gvr.Resource + "." + gvr.Group
201215
cacheObj.Labels[LabelKeyObjectGroup] = gvr.Group
202216
cacheObj.Labels[LabelKeyObjectVersion] = gvr.Version
203-
cacheObj.Labels[LabelKeyObjectResource] = mapper.Resource.Resource
217+
cacheObj.Labels[LabelKeyObjectResource] = gvr.Resource
204218
cacheObj.Labels[LabelKeyObjectOriginalName] = obj.GetName()
205219
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = obj.GetNamespace()
206220

207221
return c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Update(ctx, cacheObj, metav1.UpdateOptions{})
208222
},
209-
deleteObject: func(ctx context.Context, cluster logicalcluster.Name, ns, name string, gvr schema.GroupVersionResource) error {
223+
deleteObject: func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error {
210224
// deleting from cache - means we delete the wrapper object
211-
nameCache := gvr.Version + "." + gvr.Resource + "." + gvr.Group + "." + name // TODO: handle namespace
212-
return c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Delete(ctx, nameCache, metav1.DeleteOptions{})
225+
226+
gvr := gvrFromKey
227+
if gvr.Group == "" {
228+
gvr.Group = "core"
229+
}
230+
231+
cachedObjName := GenCachedObjectName(gvr, ns, name)
232+
if ns != "" {
233+
cachedObjName += "." + ns
234+
}
235+
return c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Delete(ctx, cachedObjName, metav1.DeleteOptions{})
213236
},
214237
}
215238
defer c.callback()
216-
return r.reconcile(ctx, gvr, key)
239+
return r.reconcile(ctx, key)
217240
}
218241

219242
type replicationReconciler struct {
@@ -222,19 +245,19 @@ type replicationReconciler struct {
222245
localLabelSelector labels.Selector
223246

224247
getLocalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
225-
getGlobalCopy func(cluster logicalcluster.Name, namespace, name string, gvr schema.GroupVersionResource) (*unstructured.Unstructured, error)
248+
getGlobalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
226249

227250
createObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
228251
updateObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
229-
deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string, gvr schema.GroupVersionResource) error
252+
deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error
230253
}
231254

232255
// reconcile makes sure that the object under the given key from the local shard is replicated to the cache server.
233256
// the replication function handles the following cases:
234257
// 1. creation of the object in the cache server when the cached object is not found by getGlobalCopy
235258
// 2. deletion of the object from the cache server when the original/local object was removed OR was not found by getLocalCopy
236259
// 3. modification of the cached object to match the original one when meta.annotations, meta.labels, spec or status are different
237-
func (r *replicationReconciler) reconcile(ctx context.Context, gvr schema.GroupVersionResource, key string) error {
260+
func (r *replicationReconciler) reconcile(ctx context.Context, key string) error {
238261
if r.deleted {
239262
return nil
240263
}
@@ -259,7 +282,7 @@ func (r *replicationReconciler) reconcile(ctx context.Context, gvr schema.GroupV
259282
return nil
260283
}
261284

262-
globalCopy, err := r.getGlobalCopy(clusterName, ns, name, gvr)
285+
globalCopy, err := r.getGlobalCopy(clusterName, ns, name)
263286
if err != nil && !apierrors.IsNotFound(err) {
264287
utilruntime.HandleError(err)
265288
return nil
@@ -274,7 +297,7 @@ func (r *replicationReconciler) reconcile(ctx context.Context, gvr schema.GroupV
274297

275298
// Object doesn't exist anymore, delete it from the global cache.
276299
logger.V(2).WithValues("cluster", clusterName, "namespace", ns, "name", name).Info("Deleting object from global cache")
277-
if err := r.deleteObject(ctx, clusterName, ns, name, gvr); err != nil && !apierrors.IsNotFound(err) {
300+
if err := r.deleteObject(ctx, clusterName, ns, name); err != nil && !apierrors.IsNotFound(err) {
278301
return err
279302
}
280303

@@ -295,17 +318,7 @@ func (r *replicationReconciler) reconcile(ctx context.Context, gvr schema.GroupV
295318

296319
logger.V(2).Info("Creating object in global cache")
297320
_, err := r.createObject(ctx, clusterName, localCopy)
298-
if err != nil {
299-
if apierrors.IsAlreadyExists(err) {
300-
// If the object already exists, try to update it instead
301-
logger.V(2).Info("Object already exists in global cache, updating instead")
302-
_, err = r.updateObject(ctx, clusterName, localCopy)
303-
}
304-
if err != nil {
305-
return err
306-
}
307-
}
308-
return nil
321+
return err
309322
}
310323

311324
// update global copy and compare

0 commit comments

Comments
 (0)