Skip to content

Commit 5d726f2

Browse files
committed
CachedResources: replicate full objects
DDSIF, the informer for retrieving local objects, yields only PartialMetadataObjects. This commit adds a live GET call to retrieve the full object for replication. On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent 68ba203 commit 5d726f2

File tree

6 files changed

+98
-70
lines changed

6 files changed

+98
-70
lines changed

pkg/reconciler/cache/cachedresources/cachedresources_controller.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ func NewController(
6868
kcpClusterClient kcpclientset.ClusterInterface,
6969
kcpCacheClient kcpclientset.ClusterInterface,
7070
dynamicClient kcpdynamic.ClusterInterface,
71-
cacheDynamicClient kcpdynamic.ClusterInterface,
7271

7372
kubeClusterClient kcpkubernetesclientset.ClusterInterface,
7473
namespaceInformer kcpcorev1informers.NamespaceClusterInformer,
@@ -93,8 +92,7 @@ func NewController(
9392
kcpClient: kcpClusterClient,
9493
kcpCacheClient: kcpCacheClient,
9594

96-
dynamicClient: dynamicClient,
97-
cacheDynamicClient: cacheDynamicClient,
95+
dynamicClient: dynamicClient,
9896

9997
dynRESTMapper: dynRESTMapper,
10098

@@ -165,8 +163,7 @@ type Controller struct {
165163
kcpClient kcpclientset.ClusterInterface
166164
kcpCacheClient kcpclientset.ClusterInterface
167165

168-
dynamicClient kcpdynamic.ClusterInterface
169-
cacheDynamicClient kcpdynamic.ClusterInterface
166+
dynamicClient kcpdynamic.ClusterInterface
170167

171168
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
172169

pkg/reconciler/cache/cachedresources/cachedresources_reconcile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (c *Controller) reconcile(ctx context.Context, cluster logicalcluster.Name,
8383
},
8484
&replication{
8585
shardName: c.shardName,
86-
dynamicCacheClient: c.dynamicClient,
86+
dynamicClusterClient: c.dynamicClient,
8787
kcpCacheClient: c.kcpCacheClient,
8888
dynRESTMapper: c.dynRESTMapper,
8989
cacheKcpInformers: c.cacheKcpInformers,

pkg/reconciler/cache/cachedresources/cachedresources_reconcile_replication.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
// Or deletes the replication controller if the published resource is being deleted.
4242
type replication struct {
4343
shardName string
44-
dynamicCacheClient kcpdynamic.ClusterInterface
44+
dynamicClusterClient kcpdynamic.ClusterInterface
4545
kcpCacheClient kcpclientset.ClusterInterface
4646
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
4747
cacheKcpInformers kcpinformers.SharedInformerFactory
@@ -103,7 +103,7 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
103103

104104
c, err := replicationcontroller.NewController(
105105
r.shardName,
106-
r.dynamicCacheClient,
106+
r.dynamicClusterClient,
107107
r.kcpCacheClient,
108108
cluster,
109109
gvr,

pkg/reconciler/cache/cachedresources/replication/replication_controller.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ const (
5454
// NewController returns a new replication controller.
5555
func NewController(
5656
shardName string,
57-
dynamicCacheClient kcpdynamic.ClusterInterface,
57+
dynamicClusterClient kcpdynamic.ClusterInterface,
5858
kcpCacheClient kcpclientset.ClusterInterface,
5959
cluster logicalcluster.Name,
6060
gvr schema.GroupVersionResource,
@@ -70,12 +70,12 @@ func NewController(
7070
Name: ControllerName,
7171
},
7272
),
73-
dynamicCacheClient: dynamicCacheClient,
74-
kcpCacheClient: kcpCacheClient,
75-
replicated: replicated,
76-
callback: callback,
77-
cleanupFuncs: make([]func(), 0),
78-
localLabelSelector: localLabelSelector,
73+
dynamicClusterClient: dynamicClusterClient,
74+
kcpCacheClient: kcpCacheClient,
75+
replicated: replicated,
76+
callback: callback,
77+
cleanupFuncs: make([]func(), 0),
78+
localLabelSelector: localLabelSelector,
7979
}
8080

8181
localHandler, err := c.replicated.Local.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -222,8 +222,8 @@ type Controller struct {
222222
shardName string
223223
queue workqueue.TypedRateLimitingInterface[string]
224224

225-
dynamicCacheClient kcpdynamic.ClusterInterface
226-
kcpCacheClient kcpclientset.ClusterInterface
225+
dynamicClusterClient kcpdynamic.ClusterInterface
226+
kcpCacheClient kcpclientset.ClusterInterface
227227

228228
replicated *ReplicatedGVR
229229

pkg/reconciler/cache/cachedresources/replication/replication_reconcile.go

Lines changed: 84 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
8484
r := &replicationReconciler{
8585
shardName: c.shardName,
8686
localLabelSelector: c.localLabelSelector,
87-
getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
87+
getLocalPartialObjectMetadata: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
8888
gvr := gvrFromKey
8989
key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name)
9090
obj, exists, err := c.replicated.Local.GetIndexer().GetByKey(key)
@@ -110,7 +110,7 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
110110
u.SetAPIVersion(gvr.GroupVersion().String())
111111
return u, nil
112112
},
113-
getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
113+
getCachedObject: func(ctx context.Context, cluster logicalcluster.Name, namespace, name string) (*cachev1alpha1.CachedObject, error) {
114114
gvr := gvrFromKey
115115
if gvr.Group == "" {
116116
gvr.Group = "core"
@@ -128,28 +128,53 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
128128
return nil, fmt.Errorf("found multiple objects for %v|%v/%v", cluster, namespace, name)
129129
}
130130

131-
obj := objs[0]
131+
return objs[0].(*cachev1alpha1.CachedObject), nil
132+
},
133+
getGlobalCopyFromCachedObject: func(cachedObj *cachev1alpha1.CachedObject) (*unstructured.Unstructured, error) {
134+
gvr := gvrFromKey
132135

133-
u, err := toUnstructured(obj)
136+
u, err := toUnstructured(&cachedObj.Spec.Raw)
134137
if err != nil {
135138
return nil, err
136139
}
137-
if _, ok := obj.(*unstructured.Unstructured); ok {
138-
u = u.DeepCopy()
139-
}
140-
140+
u = u.DeepCopy()
141141
u.SetKind(c.replicated.Kind)
142142
u.SetAPIVersion(gvr.GroupVersion().String())
143-
144143
return u, nil
145144
},
146-
createObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
145+
getLocalCopy: func(ctx context.Context, cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
146+
gvr := gvrFromKey
147+
148+
obj, err := c.dynamicClusterClient.Cluster(cluster.Path()).
149+
Resource(gvrFromKey).
150+
Namespace(namespace).
151+
Get(ctx, name, metav1.GetOptions{})
152+
if err != nil {
153+
return nil, err
154+
}
155+
156+
obj.SetKind(c.replicated.Kind)
157+
obj.SetAPIVersion(gvr.GroupVersion().String())
158+
159+
// Append system annotations to the object.
160+
annotations := obj.GetAnnotations()
161+
if annotations == nil {
162+
annotations = map[string]string{}
163+
}
164+
annotations[genericrequest.ShardAnnotationKey] = c.shardName
165+
annotations[AnnotationKeyOriginalResourceUID] = string(obj.GetUID())
166+
annotations[AnnotationKeyOriginalResourceVersion] = obj.GetResourceVersion()
167+
obj.SetAnnotations(annotations)
168+
169+
return obj, nil
170+
},
171+
createObject: func(ctx context.Context, cluster logicalcluster.Name, local *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
147172
gvr := gvrFromKey
148173
if gvr.Group == "" {
149174
gvr.Group = "core"
150175
}
151176

152-
objBytes, err := json.Marshal(obj)
177+
objBytes, err := json.Marshal(local)
153178
if err != nil {
154179
return nil, err
155180
}
@@ -159,9 +184,9 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
159184
APIVersion: cachev1alpha1.SchemeGroupVersion.String(),
160185
},
161186
ObjectMeta: metav1.ObjectMeta{
162-
Name: GenCachedObjectName(gvr, obj.GetNamespace(), obj.GetName()),
163-
Labels: obj.GetLabels(),
164-
Annotations: obj.GetAnnotations(),
187+
Name: GenCachedObjectName(gvr, local.GetNamespace(), local.GetName()),
188+
Labels: local.GetLabels(),
189+
Annotations: local.GetAnnotations(),
165190
CreationTimestamp: metav1.NewTime(time.Now()),
166191
},
167192
Spec: cachev1alpha1.CachedObjectSpec{
@@ -176,19 +201,19 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
176201
cacheObj.Labels[LabelKeyObjectGroup] = gvr.Group
177202
cacheObj.Labels[LabelKeyObjectVersion] = gvr.Version
178203
cacheObj.Labels[LabelKeyObjectResource] = gvr.Resource
179-
cacheObj.Labels[LabelKeyObjectOriginalName] = obj.GetName()
180-
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = obj.GetNamespace()
204+
cacheObj.Labels[LabelKeyObjectOriginalName] = local.GetName()
205+
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = local.GetNamespace()
181206

182207
u, err := c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Create(ctx, cacheObj, metav1.CreateOptions{})
183208
return u, err
184209
},
185-
updateObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
210+
updateCachedObjectWithLocalUnstructured: func(ctx context.Context, cluster logicalcluster.Name, origCachedObj *cachev1alpha1.CachedObject, local *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error) {
186211
gvr := gvrFromKey
187212
if gvr.Group == "" {
188213
gvr.Group = "core"
189214
}
190215

191-
objBytes, err := json.Marshal(obj)
216+
objBytes, err := json.Marshal(local)
192217
if err != nil {
193218
return nil, err
194219
}
@@ -199,10 +224,10 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
199224
APIVersion: cachev1alpha1.SchemeGroupVersion.String(),
200225
},
201226
ObjectMeta: metav1.ObjectMeta{
202-
Name: GenCachedObjectName(gvr, obj.GetNamespace(), obj.GetName()),
203-
Labels: obj.GetLabels(),
204-
Annotations: obj.GetAnnotations(),
205-
ResourceVersion: obj.GetResourceVersion(),
227+
Name: GenCachedObjectName(gvr, local.GetNamespace(), local.GetName()),
228+
Labels: origCachedObj.GetLabels(),
229+
Annotations: origCachedObj.GetAnnotations(),
230+
ResourceVersion: origCachedObj.GetResourceVersion(),
206231
},
207232
Spec: cachev1alpha1.CachedObjectSpec{
208233
Raw: runtime.RawExtension{Raw: objBytes},
@@ -216,8 +241,8 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
216241
cacheObj.Labels[LabelKeyObjectGroup] = gvr.Group
217242
cacheObj.Labels[LabelKeyObjectVersion] = gvr.Version
218243
cacheObj.Labels[LabelKeyObjectResource] = gvr.Resource
219-
cacheObj.Labels[LabelKeyObjectOriginalName] = obj.GetName()
220-
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = obj.GetNamespace()
244+
cacheObj.Labels[LabelKeyObjectOriginalName] = local.GetName()
245+
cacheObj.Labels[LabelKeyObjectOriginalNamespace] = local.GetNamespace()
221246

222247
return c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Update(ctx, cacheObj, metav1.UpdateOptions{})
223248
},
@@ -245,12 +270,14 @@ type replicationReconciler struct {
245270
deleted bool
246271
localLabelSelector labels.Selector
247272

248-
getLocalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
249-
getGlobalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
273+
getLocalPartialObjectMetadata func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
274+
getCachedObject func(ctx context.Context, cluster logicalcluster.Name, namespace, name string) (*cachev1alpha1.CachedObject, error)
275+
getGlobalCopyFromCachedObject func(cachedObj *cachev1alpha1.CachedObject) (*unstructured.Unstructured, error)
276+
getLocalCopy func(ctx context.Context, cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error)
250277

251-
createObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
252-
updateObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
253-
deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error
278+
createObject func(ctx context.Context, cluster logicalcluster.Name, local *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
279+
updateCachedObjectWithLocalUnstructured func(ctx context.Context, cluster logicalcluster.Name, cachedObj *cachev1alpha1.CachedObject, local *unstructured.Unstructured) (*cachev1alpha1.CachedObject, error)
280+
deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error
254281
}
255282

256283
// reconcile makes sure that the object under the given key from the local shard is replicated to the cache server.
@@ -270,29 +297,28 @@ func (r *replicationReconciler) reconcile(ctx context.Context, key string) error
270297
return nil
271298
}
272299

273-
localCopy, err := r.getLocalCopy(clusterName, ns, name)
300+
localPartialObjMeta, err := r.getLocalPartialObjectMetadata(clusterName, ns, name)
274301
if err != nil && !apierrors.IsNotFound(err) {
275302
utilruntime.HandleError(err)
276303
return nil
277304
}
278305
localExists := !apierrors.IsNotFound(err)
279306

280307
// we only replicate objects that match the label selector.
281-
if localExists && r.localLabelSelector != nil && !r.localLabelSelector.Matches(labels.Set(localCopy.GetLabels())) {
308+
if localExists && r.localLabelSelector != nil && !r.localLabelSelector.Matches(labels.Set(localPartialObjMeta.GetLabels())) {
282309
logger.V(2).WithValues("cluster", clusterName, "namespace", ns, "name", name).Info("Object does not match label selector, skipping")
283310
return nil
284311
}
285312

286-
globalCopy, err := r.getGlobalCopy(clusterName, ns, name)
313+
cachedObj, err := r.getCachedObject(ctx, clusterName, ns, name)
287314
if err != nil && !apierrors.IsNotFound(err) {
288-
utilruntime.HandleError(err)
289-
return nil
315+
return err
290316
}
291-
globalExists := !apierrors.IsNotFound(err)
317+
cachedObjExists := !apierrors.IsNotFound(err)
292318

293319
// local is gone or being deleted. Delete in cache.
294-
if !localExists || !localCopy.GetDeletionTimestamp().IsZero() {
295-
if !globalExists {
320+
if !localExists || !localPartialObjMeta.GetDeletionTimestamp().IsZero() {
321+
if !cachedObjExists {
296322
return nil
297323
}
298324

@@ -305,24 +331,30 @@ func (r *replicationReconciler) reconcile(ctx context.Context, key string) error
305331
return nil
306332
}
307333

308-
// local exists, global doesn't. Create in cache.
309-
if !globalExists {
310-
originalRV := localCopy.GetResourceVersion()
311-
originalUID := localCopy.GetUID()
312-
313-
localCopy.SetResourceVersion("")
314-
annotations := localCopy.GetAnnotations()
315-
if annotations == nil {
316-
annotations = map[string]string{}
334+
var globalCopy *unstructured.Unstructured
335+
if cachedObjExists {
336+
globalCopy, err = r.getGlobalCopyFromCachedObject(cachedObj)
337+
if err != nil {
338+
return err
339+
}
340+
// Exit early if there were no changes on the resource.
341+
if localPartialObjMeta.GetResourceVersion() != "" && globalCopy.GetResourceVersion() == localPartialObjMeta.GetResourceVersion() {
342+
logger.V(4).Info("Object is up to date")
343+
return nil
317344
}
318-
annotations[genericrequest.ShardAnnotationKey] = r.shardName
319-
annotations[AnnotationKeyOriginalResourceVersion] = originalRV
320-
annotations[AnnotationKeyOriginalResourceUID] = string(originalUID)
345+
}
321346

322-
localCopy.SetAnnotations(annotations)
347+
// The local DDSIF informer yields only PartialObjectMetadata, and we need the full object for replication.
348+
localCopy, err := r.getLocalCopy(ctx, clusterName, ns, name)
349+
if err != nil {
350+
// Return any error we get. If it's NotFound, we want to requeue in that case too: the local DDSIF
351+
// informer probably hasn't caught up yet, and we may need to clean up the replicated CachedObject.
352+
return err
353+
}
323354

355+
if !cachedObjExists {
324356
logger.V(2).Info("Creating object in global cache")
325-
_, err := r.createObject(ctx, clusterName, localCopy)
357+
_, err = r.createObject(ctx, clusterName, localCopy)
326358
return err
327359
}
328360

@@ -341,6 +373,6 @@ func (r *replicationReconciler) reconcile(ctx context.Context, key string) error
341373
}
342374

343375
logger.V(2).WithValues("kind", globalCopy.GetKind(), "namespace", globalCopy.GetNamespace(), "name", globalCopy.GetName()).Info("Updating object in global cache")
344-
_, err = r.updateObject(ctx, clusterName, globalCopy) // no need for patch because there is only this actor
376+
_, err = r.updateCachedObjectWithLocalUnstructured(ctx, clusterName, cachedObj, localCopy) // no need for patch because there is only this actor
345377
return err
346378
}

pkg/server/controllers.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1742,7 +1742,6 @@ func (s *Server) installCacheController(ctx context.Context, config *rest.Config
17421742
kcpClusterClient,
17431743
s.KcpCacheClusterClient,
17441744
dynamicClient,
1745-
s.CacheDynamicClient,
17461745
s.KubeClusterClient,
17471746
s.KubeSharedInformerFactory.Core().V1().Namespaces(),
17481747
s.KubeSharedInformerFactory.Core().V1().Secrets(),

0 commit comments

Comments
 (0)