Skip to content

Commit 07de7da

Browse files
committed
CachedResources: skip CachedObjects that are not coming from their source CachedResource
Each instance of the replication controller is dedicated to a specific CachedResource object, and so its CachedObjects handler should only react on objects that are coming from that CachedResource. On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent fa4e118 commit 07de7da

File tree

2 files changed

+31
-11
lines changed

2 files changed

+31
-11
lines changed

pkg/reconciler/cache/cachedresources/cachedresources_reconcile_replication.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
105105
r.shardName,
106106
r.dynamicCacheClient,
107107
r.kcpCacheClient,
108+
cluster,
108109
gvr,
109110
replicated,
110111
callback,

pkg/reconciler/cache/cachedresources/replication/replication_controller.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
3434
kcpdynamic "github.com/kcp-dev/client-go/dynamic"
35+
"github.com/kcp-dev/logicalcluster/v3"
3536

3637
cacheclient "github.com/kcp-dev/kcp/pkg/cache/client"
3738
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
@@ -55,6 +56,7 @@ func NewController(
5556
shardName string,
5657
dynamicCacheClient kcpdynamic.ClusterInterface,
5758
kcpCacheClient kcpclientset.ClusterInterface,
59+
cluster logicalcluster.Name,
5860
gvr schema.GroupVersionResource,
5961
replicated *ReplicatedGVR,
6062
callback func(),
@@ -88,10 +90,33 @@ func NewController(
8890
_ = c.replicated.Local.RemoveEventHandler(localHandler)
8991
})
9092

91-
globalHandler, err := c.replicated.Global.AddEventHandler(cache.ResourceEventHandlerFuncs{
92-
AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
93-
UpdateFunc: func(_, obj interface{}) { c.enqueueCacheObject(obj) },
94-
DeleteFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
93+
globalHandler, err := c.replicated.Global.AddEventHandler(cache.FilteringResourceEventHandler{
94+
FilterFunc: func(obj interface{}) bool {
95+
cachedObj := obj.(*cachev1alpha1.CachedObject)
96+
labels := cachedObj.Labels
97+
if labels == nil {
98+
return false
99+
}
100+
101+
// Skip CachedObjects that are not coming from the source CachedResource.
102+
103+
if logicalcluster.From(cachedObj) != cluster {
104+
return false
105+
}
106+
107+
if gvr.Group != labels[LabelKeyObjectGroup] ||
108+
gvr.Version != labels[LabelKeyObjectVersion] ||
109+
gvr.Resource != labels[LabelKeyObjectResource] {
110+
return false
111+
}
112+
113+
return true
114+
},
115+
Handler: cache.ResourceEventHandlerFuncs{
116+
AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
117+
UpdateFunc: func(_, obj interface{}) { c.enqueueCacheObject(obj) },
118+
DeleteFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
119+
},
95120
})
96121
if err != nil {
97122
return nil, err
@@ -114,12 +139,6 @@ func (c *Controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResou
114139
}
115140

116141
func (c *Controller) enqueueCacheObject(obj interface{}) {
117-
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
118-
if err != nil {
119-
utilruntime.HandleError(err)
120-
return
121-
}
122-
123142
// This way we extract what is the original GVR of the object that we are replicating.
124143
cr, ok := obj.(*cachev1alpha1.CachedObject)
125144
if !ok {
@@ -133,7 +152,7 @@ func (c *Controller) enqueueCacheObject(obj interface{}) {
133152
Version: labels[LabelKeyObjectVersion],
134153
Resource: labels[LabelKeyObjectResource],
135154
}
136-
155+
key := kcpcache.ToClusterAwareKey(string(logicalcluster.From(cr)), labels[LabelKeyObjectOriginalNamespace], labels[LabelKeyObjectOriginalName])
137156
gvrKey := fmt.Sprintf("%s.%s.%s::%s", gvr.Version, gvr.Resource, gvr.Group, key)
138157
c.queue.Add(gvrKey)
139158
}

0 commit comments

Comments
 (0)