Skip to content

Commit c9202c8

Browse files
committed
CachedResources: replicate full objects
DDSIF, the informer for retrieving local objects, yields only PartialMetadataObjects. This commit adds a live GET call to retrieve the full object for replication. On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent 07de7da commit c9202c8

File tree

6 files changed

+98
-70
lines changed

6 files changed

+98
-70
lines changed

pkg/reconciler/cache/cachedresources/cachedresources_controller.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ func NewController(
6868
kcpClusterClient kcpclientset.ClusterInterface,
6969
kcpCacheClient kcpclientset.ClusterInterface,
7070
dynamicClient kcpdynamic.ClusterInterface,
71-
cacheDynamicClient kcpdynamic.ClusterInterface,
7271

7372
kubeClusterClient kcpkubernetesclientset.ClusterInterface,
7473
namespaceInformer kcpcorev1informers.NamespaceClusterInformer,
@@ -93,8 +92,7 @@ func NewController(
9392
kcpClient: kcpClusterClient,
9493
kcpCacheClient: kcpCacheClient,
9594

96-
dynamicClient: dynamicClient,
97-
cacheDynamicClient: cacheDynamicClient,
95+
dynamicClient: dynamicClient,
9896

9997
dynRESTMapper: dynRESTMapper,
10098

@@ -165,8 +163,7 @@ type Controller struct {
165163
kcpClient kcpclientset.ClusterInterface
166164
kcpCacheClient kcpclientset.ClusterInterface
167165

168-
dynamicClient kcpdynamic.ClusterInterface
169-
cacheDynamicClient kcpdynamic.ClusterInterface
166+
dynamicClient kcpdynamic.ClusterInterface
170167

171168
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
172169

pkg/reconciler/cache/cachedresources/cachedresources_reconcile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (c *Controller) reconcile(ctx context.Context, cluster logicalcluster.Name,
8383
},
8484
&replication{
8585
shardName: c.shardName,
86-
dynamicCacheClient: c.dynamicClient,
86+
dynamicClusterClient: c.dynamicClient,
8787
kcpCacheClient: c.kcpCacheClient,
8888
dynRESTMapper: c.dynRESTMapper,
8989
cacheKcpInformers: c.cacheKcpInformers,

pkg/reconciler/cache/cachedresources/cachedresources_reconcile_replication.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
// Or deletes the replication controller if the published resource is being deleted.
4242
type replication struct {
4343
shardName string
44-
dynamicCacheClient kcpdynamic.ClusterInterface
44+
dynamicClusterClient kcpdynamic.ClusterInterface
4545
kcpCacheClient kcpclientset.ClusterInterface
4646
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
4747
cacheKcpInformers kcpinformers.SharedInformerFactory
@@ -103,7 +103,7 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
103103

104104
c, err := replicationcontroller.NewController(
105105
r.shardName,
106-
r.dynamicCacheClient,
106+
r.dynamicClusterClient,
107107
r.kcpCacheClient,
108108
cluster,
109109
gvr,

pkg/reconciler/cache/cachedresources/replication/replication_controller.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ const (
5454
// NewController returns a new replication controller.
5555
func NewController(
5656
shardName string,
57-
dynamicCacheClient kcpdynamic.ClusterInterface,
57+
dynamicClusterClient kcpdynamic.ClusterInterface,
5858
kcpCacheClient kcpclientset.ClusterInterface,
5959
cluster logicalcluster.Name,
6060
gvr schema.GroupVersionResource,
@@ -70,12 +70,12 @@ func NewController(
7070
Name: ControllerName,
7171
},
7272
),
73-
dynamicCacheClient: dynamicCacheClient,
74-
kcpCacheClient: kcpCacheClient,
75-
replicated: replicated,
76-
callback: callback,
77-
cleanupFuncs: make([]func(), 0),
78-
localLabelSelector: localLabelSelector,
73+
dynamicClusterClient: dynamicClusterClient,
74+
kcpCacheClient: kcpCacheClient,
75+
replicated: replicated,
76+
callback: callback,
77+
cleanupFuncs: make([]func(), 0),
78+
localLabelSelector: localLabelSelector,
7979
}
8080

8181
localHandler, err := c.replicated.Local.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -222,8 +222,8 @@ type Controller struct {
222222
shardName string
223223
queue workqueue.TypedRateLimitingInterface[string]
224224

225-
dynamicCacheClient kcpdynamic.ClusterInterface
226-
kcpCacheClient kcpclientset.ClusterInterface
225+
dynamicClusterClient kcpdynamic.ClusterInterface
226+
kcpCacheClient kcpclientset.ClusterInterface
227227

228228
replicated *ReplicatedGVR
229229

pkg/reconciler/cache/cachedresources/replication/replication_reconcile.go

Lines changed: 84 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
8585
r := &replicationReconciler{
8686
shardName: c.shardName,
8787
localLabelSelector: c.localLabelSelector,
88-
getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
88+
getLocalPartialObjectMetadata: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
8989
gvr := gvrFromKey
9090
key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name)
9191
obj, exists, err := c.replicated.Local.GetIndexer().GetByKey(key)
@@ -111,7 +111,7 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
111111
u.SetAPIVersion(gvr.GroupVersion().String())
112112
return u, nil
113113
},
114-
getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
114+
getCachedObject: func(ctx context.Context, cluster logicalcluster.Name, namespace, name string) (*cachev1alpha1.CachedObject, error) {
115115
gvr := gvrFromKey
116116
if gvr.Group == "" {
117117
gvr.Group = "core"
@@ -129,28 +129,53 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
129129
return nil, fmt.Errorf("found multiple objects for %v|%v/%v", cluster, namespace, name)
130130
}
131131

132-
obj := objs[0]
132+
return objs[0].(*cachev1alpha1.CachedObject), nil
133+
},
134+
getGlobalCopyFromCachedObject: func(cachedObj *cachev1alpha1.CachedObject) (*unstructured.Unstructured, error) {
135+
gvr := gvrFromKey
133136

134-
u, err := toUnstructured(obj)
137+
u, err := toUnstructured(&cachedObj.Spec.Raw)
135138
if err != nil {
136139
return nil, err
137140
}
138-
if _, ok := obj.(*unstructured.Unstructured); ok {
139-
u = u.DeepCopy()
140-
}
141-
141+
u = u.DeepCopy()
142142
u.SetKind(c.replicated.Kind)
143143
u.SetAPIVersion(gvr.GroupVersion().String())
144-
145144
return u, nil
146145
},
147-
createObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
146+
getLocalCopy: func(ctx context.Context, cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
147+
gvr := gvrFromKey
148+
149+
obj, err := c.dynamicClusterClient.Cluster(cluster.Path()).
150+
Resource(gvrFromKey).
151+
Namespace(namespace).
152+
Get(ctx, name, metav1.GetOptions{})
153+
if err != nil {
154+
return nil, err
155+
}
156+
157+
obj.SetKind(c.replicated.Kind)
158+
obj.SetAPIVersion(gvr.GroupVersion().String())
159+
160+
// Append system annotations to the object.
161+
annotations := obj.GetAnnotations()
162+
if annotations == nil {
163+
annotations = map[string]string{}
164+
}
165+
annotations[genericrequest.ShardAnnotationKey] = c.shardName
166+
annotations[AnnotationKeyOriginalResourceUID] = string(obj.GetUID())
167+
annotations[AnnotationKeyOriginalResourceVersion] = obj.GetResourceVersion()
168+
obj.SetAnnotations(annotations)
169+
170+
return obj, nil
171+
},
172+
createObject: func(ctx context.Context, cluster logicalcluster.Name, local *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
148173
gvr := gvrFromKey
149174
if gvr.Group == "" {
150175
gvr.Group = "core"
151176
}
152177

153-
objBytes, err := json.Marshal(obj)
178+
objBytes, err := json.Marshal(local)
154179
if err != nil {
155180
return nil, err
156181
}
@@ -160,9 +185,9 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
160185
APIVersion: cachev1alpha1.SchemeGroupVersion.String(),
161186
},
162187
ObjectMeta: metav1.ObjectMeta{
163-
Name: GenCachedObjectName(gvr, obj.GetNamespace(), obj.GetName()),
164-
Labels: obj.GetLabels(),
165-
Annotations: obj.GetAnnotations(),
188+
Name: GenCachedObjectName(gvr, local.GetNamespace(), local.GetName()),
189+
Labels: local.GetLabels(),
190+
Annotations: local.GetAnnotations(),
166191
CreationTimestamp: metav1.NewTime(time.Now()),
167192
},
168193
Spec: cachev1alpha1.CachedObjectSpec{
@@ -177,19 +202,19 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
177202
cacheObj.Labels[LabelKeyObjectGroup] = gvr.Group
178203
cacheObj.Labels[LabelKeyObjectVersion] = gvr.Version
179204
cacheObj.Labels[LabelKeyObjectResource] = gvr.Resource
180-
cacheObj.Labels[LabelKeyObjectOriginalName] = obj.GetName()
181-
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = obj.GetNamespace()
205+
cacheObj.Labels[LabelKeyObjectOriginalName] = local.GetName()
206+
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = local.GetNamespace()
182207

183208
u, err := c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Create(ctx, cacheObj, metav1.CreateOptions{})
184209
return u, err
185210
},
186-
updateObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
211+
updateCachedObjectWithLocalUnstructured: func(ctx context.Context, cluster logicalcluster.Name, origCachedObj *cachev1alpha1.CachedObject, local *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
187212
gvr := gvrFromKey
188213
if gvr.Group == "" {
189214
gvr.Group = "core"
190215
}
191216

192-
objBytes, err := json.Marshal(obj)
217+
objBytes, err := json.Marshal(local)
193218
if err != nil {
194219
return nil, err
195220
}
@@ -200,10 +225,10 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
200225
APIVersion: cachev1alpha1.SchemeGroupVersion.String(),
201226
},
202227
ObjectMeta: metav1.ObjectMeta{
203-
Name: GenCachedObjectName(gvr, obj.GetNamespace(), obj.GetName()),
204-
Labels: obj.GetLabels(),
205-
Annotations: obj.GetAnnotations(),
206-
ResourceVersion: obj.GetResourceVersion(),
228+
Name: GenCachedObjectName(gvr, local.GetNamespace(), local.GetName()),
229+
Labels: origCachedObj.GetLabels(),
230+
Annotations: origCachedObj.GetAnnotations(),
231+
ResourceVersion: origCachedObj.GetResourceVersion(),
207232
},
208233
Spec: cachev1alpha1.CachedObjectSpec{
209234
Raw: runtime.RawExtension{Raw: objBytes},
@@ -217,8 +242,8 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
217242
cacheObj.Labels[LabelKeyObjectGroup] = gvr.Group
218243
cacheObj.Labels[LabelKeyObjectVersion] = gvr.Version
219244
cacheObj.Labels[LabelKeyObjectResource] = gvr.Resource
220-
cacheObj.Labels[LabelKeyObjectOriginalName] = obj.GetName()
221-
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = obj.GetNamespace()
245+
cacheObj.Labels[LabelKeyObjectOriginalName] = local.GetName()
246+
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = local.GetNamespace()
222247

223248
return c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Update(ctx, cacheObj, metav1.UpdateOptions{})
224249
},
@@ -246,12 +271,14 @@ type replicationReconciler struct {
246271
deleted bool
247272
localLabelSelector labels.Selector
248273

249-
getLocalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
250-
getGlobalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
274+
getLocalPartialObjectMetadata func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
275+
getCachedObject func(ctx context.Context, cluster logicalcluster.Name, namespace, name string) (*cachev1alpha1.CachedObject, error)
276+
getGlobalCopyFromCachedObject func(cachedObj *cachev1alpha1.CachedObject) (*unstructured.Unstructured, error)
277+
getLocalCopy func(ctx context.Context, cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
251278

252-
createObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
253-
updateObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
254-
deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error
279+
createObject func(ctx context.Context, cluster logicalcluster.Name, local *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
280+
updateCachedObjectWithLocalUnstructured func(ctx context.Context, cluster logicalcluster.Name, cachedObj *cachev1alpha1.CachedObject, local *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
281+
deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error
255282
}
256283

257284
// reconcile makes sure that the object under the given key from the local shard is replicated to the cache server.
@@ -271,29 +298,28 @@ func (r *replicationReconciler) reconcile(ctx context.Context, key string) error
271298
return nil
272299
}
273300

274-
localCopy, err := r.getLocalCopy(clusterName, ns, name)
301+
localPartialObjMeta, err := r.getLocalPartialObjectMetadata(clusterName, ns, name)
275302
if err != nil && !apierrors.IsNotFound(err) {
276303
utilruntime.HandleError(err)
277304
return nil
278305
}
279306
localExists := !apierrors.IsNotFound(err)
280307

281308
// we only replicate objects that match the label selector.
282-
if localExists && r.localLabelSelector != nil && !r.localLabelSelector.Matches(labels.Set(localCopy.GetLabels())) {
309+
if localExists && r.localLabelSelector != nil && !r.localLabelSelector.Matches(labels.Set(localPartialObjMeta.GetLabels())) {
283310
logger.V(2).WithValues("cluster", clusterName, "namespace", ns, "name", name).Info("Object does not match label selector, skipping")
284311
return nil
285312
}
286313

287-
globalCopy, err := r.getGlobalCopy(clusterName, ns, name)
314+
cachedObj, err := r.getCachedObject(ctx, clusterName, ns, name)
288315
if err != nil && !apierrors.IsNotFound(err) {
289-
utilruntime.HandleError(err)
290-
return nil
316+
return err
291317
}
292-
globalExists := !apierrors.IsNotFound(err)
318+
cachedObjExists := !apierrors.IsNotFound(err)
293319

294320
// local is gone or being deleted. Delete in cache.
295-
if !localExists || !localCopy.GetDeletionTimestamp().IsZero() {
296-
if !globalExists {
321+
if !localExists || !localPartialObjMeta.GetDeletionTimestamp().IsZero() {
322+
if !cachedObjExists {
297323
return nil
298324
}
299325

@@ -306,24 +332,30 @@ func (r *replicationReconciler) reconcile(ctx context.Context, key string) error
306332
return nil
307333
}
308334

309-
// local exists, global doesn't. Create in cache.
310-
if !globalExists {
311-
originalRV := localCopy.GetResourceVersion()
312-
originalUID := localCopy.GetUID()
313-
314-
localCopy.SetResourceVersion("")
315-
annotations := localCopy.GetAnnotations()
316-
if annotations == nil {
317-
annotations = map[string]string{}
335+
var globalCopy *unstructured.Unstructured
336+
if cachedObjExists {
337+
globalCopy, err = r.getGlobalCopyFromCachedObject(cachedObj)
338+
if err != nil {
339+
return err
340+
}
341+
// Exit early if there were no changes on the resource.
342+
if localPartialObjMeta.GetResourceVersion() != "" && globalCopy.GetResourceVersion() == localPartialObjMeta.GetResourceVersion() {
343+
logger.V(4).Info("Object is up to date")
344+
return nil
318345
}
319-
annotations[genericrequest.ShardAnnotationKey] = r.shardName
320-
annotations[AnnotationKeyOriginalResourceVersion] = originalRV
321-
annotations[AnnotationKeyOriginalResourceUID] = string(originalUID)
346+
}
322347

323-
localCopy.SetAnnotations(annotations)
348+
// The local DDSIF informer yields only PartialObjectMetadata, and we need the full object for replication.
349+
localCopy, err := r.getLocalCopy(ctx, clusterName, ns, name)
350+
if err != nil {
351+
// Return any error we get. If it's NotFound, we want to requeue in that case too: the local DDSIF
352+
// informer probably hasn't caught up yet, and we may need to clean up the replicated CachedObject.
353+
return err
354+
}
324355

356+
if !cachedObjExists {
325357
logger.V(2).Info("Creating object in global cache")
326-
_, err := r.createObject(ctx, clusterName, localCopy)
358+
_, err = r.createObject(ctx, clusterName, localCopy)
327359
return err
328360
}
329361

@@ -342,6 +374,6 @@ func (r *replicationReconciler) reconcile(ctx context.Context, key string) error
342374
}
343375

344376
logger.V(2).WithValues("kind", globalCopy.GetKind(), "namespace", globalCopy.GetNamespace(), "name", globalCopy.GetName()).Info("Updating object in global cache")
345-
_, err = r.updateObject(ctx, clusterName, globalCopy) // no need for patch because there is only this actor
377+
_, err = r.updateCachedObjectWithLocalUnstructured(ctx, clusterName, cachedObj, localCopy) // no need for patch because there is only this actor
346378
return err
347379
}

pkg/server/controllers.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1742,7 +1742,6 @@ func (s *Server) installCacheController(ctx context.Context, config *rest.Config
17421742
kcpClusterClient,
17431743
s.KcpCacheClusterClient,
17441744
dynamicClient,
1745-
s.CacheDynamicClient,
17461745
s.KubeClusterClient,
17471746
s.KubeSharedInformerFactory.Core().V1().Namespaces(),
17481747
s.KubeSharedInformerFactory.Core().V1().Secrets(),

0 commit comments

Comments
 (0)