Skip to content

Commit e614097

Browse files
committed
CachedResources: replicate full objects with
DDSIF, the informer for retrieving local objects, yields only PartialMetadataObjects. This commit adds a live GET call to retrieve the full object for replication. On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent c28c5ff commit e614097

File tree

6 files changed

+62
-44
lines changed

6 files changed

+62
-44
lines changed

pkg/reconciler/cache/cachedresources/cachedresources_controller.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ func NewController(
6868
kcpClusterClient kcpclientset.ClusterInterface,
6969
kcpCacheClient kcpclientset.ClusterInterface,
7070
dynamicClient kcpdynamic.ClusterInterface,
71-
cacheDynamicClient kcpdynamic.ClusterInterface,
7271

7372
kubeClusterClient kcpkubernetesclientset.ClusterInterface,
7473
namespaceInformer kcpcorev1informers.NamespaceClusterInformer,
@@ -93,8 +92,7 @@ func NewController(
9392
kcpClient: kcpClusterClient,
9493
kcpCacheClient: kcpCacheClient,
9594

96-
dynamicClient: dynamicClient,
97-
cacheDynamicClient: cacheDynamicClient,
95+
dynamicClient: dynamicClient,
9896

9997
dynRESTMapper: dynRESTMapper,
10098

@@ -165,8 +163,7 @@ type Controller struct {
165163
kcpClient kcpclientset.ClusterInterface
166164
kcpCacheClient kcpclientset.ClusterInterface
167165

168-
dynamicClient kcpdynamic.ClusterInterface
169-
cacheDynamicClient kcpdynamic.ClusterInterface
166+
dynamicClient kcpdynamic.ClusterInterface
170167

171168
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
172169

pkg/reconciler/cache/cachedresources/cachedresources_reconcile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (c *Controller) reconcile(ctx context.Context, cluster logicalcluster.Name,
8383
},
8484
&replication{
8585
shardName: c.shardName,
86-
dynamicCacheClient: c.dynamicClient,
86+
dynamicClusterClient: c.dynamicClient,
8787
kcpCacheClient: c.kcpCacheClient,
8888
dynRESTMapper: c.dynRESTMapper,
8989
cacheKcpInformers: c.cacheKcpInformers,

pkg/reconciler/cache/cachedresources/cachedresources_reconcile_replication.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
// Or deletes the replication controller if the published resource is being deleted.
4242
type replication struct {
4343
shardName string
44-
dynamicCacheClient kcpdynamic.ClusterInterface
44+
dynamicClusterClient kcpdynamic.ClusterInterface
4545
kcpCacheClient kcpclientset.ClusterInterface
4646
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
4747
cacheKcpInformers kcpinformers.SharedInformerFactory
@@ -103,7 +103,7 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
103103

104104
c, err := replicationcontroller.NewController(
105105
r.shardName,
106-
r.dynamicCacheClient,
106+
r.dynamicClusterClient,
107107
r.kcpCacheClient,
108108
gvr,
109109
replicated,

pkg/reconciler/cache/cachedresources/replication/replication_controller.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ const (
5353
// NewController returns a new replication controller.
5454
func NewController(
5555
shardName string,
56-
dynamicCacheClient kcpdynamic.ClusterInterface,
56+
dynamicClusterClient kcpdynamic.ClusterInterface,
5757
kcpCacheClient kcpclientset.ClusterInterface,
5858
gvr schema.GroupVersionResource,
5959
replicated *ReplicatedGVR,
@@ -68,12 +68,12 @@ func NewController(
6868
Name: ControllerName,
6969
},
7070
),
71-
dynamicCacheClient: dynamicCacheClient,
72-
kcpCacheClient: kcpCacheClient,
73-
replicated: replicated,
74-
callback: callback,
75-
cleanupFuncs: make([]func(), 0),
76-
localLabelSelector: localLabelSelector,
71+
dynamicClusterClient: dynamicClusterClient,
72+
kcpCacheClient: kcpCacheClient,
73+
replicated: replicated,
74+
callback: callback,
75+
cleanupFuncs: make([]func(), 0),
76+
localLabelSelector: localLabelSelector,
7777
}
7878

7979
localHandler, err := c.replicated.Local.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -203,8 +203,8 @@ type Controller struct {
203203
shardName string
204204
queue workqueue.TypedRateLimitingInterface[string]
205205

206-
dynamicCacheClient kcpdynamic.ClusterInterface
207-
kcpCacheClient kcpclientset.ClusterInterface
206+
dynamicClusterClient kcpdynamic.ClusterInterface
207+
kcpCacheClient kcpclientset.ClusterInterface
208208

209209
replicated *ReplicatedGVR
210210

pkg/reconciler/cache/cachedresources/replication/replication_reconcile.go

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -85,32 +85,6 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
8585
r := &replicationReconciler{
8686
shardName: c.shardName,
8787
localLabelSelector: c.localLabelSelector,
88-
getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
89-
gvr := gvrFromKey
90-
key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name)
91-
obj, exists, err := c.replicated.Local.GetIndexer().GetByKey(key)
92-
if !exists {
93-
return nil, apierrors.NewNotFound(gvr.GroupResource(), name)
94-
} else if err != nil {
95-
return nil, err // necessary to avoid non-zero nil interface
96-
}
97-
98-
u, err := toUnstructured(obj)
99-
if err != nil {
100-
return nil, err
101-
}
102-
103-
if c.replicated.Filter != nil && !c.replicated.Filter(u) {
104-
return nil, apierrors.NewNotFound(gvr.GroupResource(), name)
105-
}
106-
107-
if _, ok := obj.(*unstructured.Unstructured); ok {
108-
u = u.DeepCopy()
109-
}
110-
u.SetKind(c.replicated.Kind)
111-
u.SetAPIVersion(gvr.GroupVersion().String())
112-
return u, nil
113-
},
11488
getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
11589
gvr := gvrFromKey
11690
if gvr.Group == "" {
@@ -237,6 +211,54 @@ func (c *Controller) reconcile(ctx context.Context, gvrKey string) error {
237211
return c.kcpCacheClient.Cluster(cluster.Path()).CacheV1alpha1().CachedObjects().Delete(ctx, cachedObjName, metav1.DeleteOptions{})
238212
},
239213
}
214+
// r.getLocalCopy is defined separately because it calls r.getGlobalCopy internally.
215+
r.getLocalCopy = func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
216+
gvr := gvrFromKey
217+
key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name)
218+
partialMetadataObj, exists, err := c.replicated.Local.GetIndexer().GetByKey(key)
219+
if !exists {
220+
return nil, apierrors.NewNotFound(gvr.GroupResource(), name)
221+
} else if err != nil {
222+
return nil, err // necessary to avoid non-zero nil interface
223+
}
224+
225+
// The local informer only yields a PartialMetadataObject, but we need a full object for replication.
226+
// We'll need to do a live GET to retrieve it, but before doing so we check RV against the cached copy
227+
// to be sure we actually need do the GET call.
228+
229+
var globalCopyRV string
230+
globalCopy, err := r.getGlobalCopy(cluster, namespace, name)
231+
if err != nil {
232+
if !apierrors.IsNotFound(err) {
233+
return nil, err
234+
}
235+
} else {
236+
ann := globalCopy.GetAnnotations()
237+
if ann != nil {
238+
globalCopyRV = ann[AnnotationKeyOriginalResourceVersion]
239+
}
240+
}
241+
242+
uPartialMetadataObj, err := toUnstructured(partialMetadataObj)
243+
if err != nil {
244+
return nil, err
245+
}
246+
247+
if c.replicated.Filter != nil && !c.replicated.Filter(uPartialMetadataObj) {
248+
return nil, apierrors.NewNotFound(gvr.GroupResource(), name)
249+
}
250+
251+
if uPartialMetadataObj.GetResourceVersion() == globalCopyRV {
252+
return globalCopy, nil
253+
}
254+
255+
// The RV doesn't match either because it's different, or the globalCopy doesn't exist. Fallback to GET.
256+
return c.dynamicClusterClient.
257+
Cluster(cluster.Path()).
258+
Resource(gvr).
259+
Namespace(namespace).
260+
Get(ctx, name, metav1.GetOptions{})
261+
}
240262
defer c.callback()
241263
return r.reconcile(ctx, key)
242264
}

pkg/server/controllers.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1742,7 +1742,6 @@ func (s *Server) installCacheController(ctx context.Context, config *rest.Config
17421742
kcpClusterClient,
17431743
s.KcpCacheClusterClient,
17441744
dynamicClient,
1745-
s.CacheDynamicClient,
17461745
s.KubeClusterClient,
17471746
s.KubeSharedInformerFactory.Core().V1().Namespaces(),
17481747
s.KubeSharedInformerFactory.Core().V1().Secrets(),

0 commit comments

Comments
 (0)