@@ -32,6 +32,7 @@ import (
3232
3333 kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
3434 kcpdynamic "github.com/kcp-dev/client-go/dynamic"
35+ "github.com/kcp-dev/logicalcluster/v3"
3536 cachev1alpha1 "github.com/kcp-dev/sdk/apis/cache/v1alpha1"
3637 kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster"
3738
@@ -53,8 +54,9 @@ const (
5354// NewController returns a new replication controller.
5455func NewController (
5556 shardName string ,
56- dynamicCacheClient kcpdynamic.ClusterInterface ,
57+ dynamicClusterClient kcpdynamic.ClusterInterface ,
5758 kcpCacheClient kcpclientset.ClusterInterface ,
59+ cluster logicalcluster.Name ,
5860 gvr schema.GroupVersionResource ,
5961 replicated * ReplicatedGVR ,
6062 callback func (),
@@ -68,12 +70,12 @@ func NewController(
6870 Name : ControllerName ,
6971 },
7072 ),
71- dynamicCacheClient : dynamicCacheClient ,
72- kcpCacheClient : kcpCacheClient ,
73- replicated : replicated ,
74- callback : callback ,
75- cleanupFuncs : make ([]func (), 0 ),
76- localLabelSelector : localLabelSelector ,
73+ dynamicClusterClient : dynamicClusterClient ,
74+ kcpCacheClient : kcpCacheClient ,
75+ replicated : replicated ,
76+ callback : callback ,
77+ cleanupFuncs : make ([]func (), 0 ),
78+ localLabelSelector : localLabelSelector ,
7779 }
7880
7981 localHandler , err := c .replicated .Local .AddEventHandler (cache.ResourceEventHandlerFuncs {
@@ -88,10 +90,33 @@ func NewController(
8890 _ = c .replicated .Local .RemoveEventHandler (localHandler )
8991 })
9092
91- globalHandler , err := c .replicated .Global .AddEventHandler (cache.ResourceEventHandlerFuncs {
92- AddFunc : func (obj interface {}) { c .enqueueCacheObject (obj ) },
93- UpdateFunc : func (_ , obj interface {}) { c .enqueueCacheObject (obj ) },
94- DeleteFunc : func (obj interface {}) { c .enqueueCacheObject (obj ) },
93+ globalHandler , err := c .replicated .Global .AddEventHandler (cache.FilteringResourceEventHandler {
94+ FilterFunc : func (obj interface {}) bool {
95+ cachedObj := obj .(* cachev1alpha1.CachedObject )
96+ labels := cachedObj .Labels
97+ if labels == nil {
98+ return false
99+ }
100+
101+ // Skip CachedObjects that are not coming from the source CachedResource.
102+
103+ if logicalcluster .From (cachedObj ) != cluster {
104+ return false
105+ }
106+
107+ if gvr .Group != labels [LabelKeyObjectGroup ] ||
108+ gvr .Version != labels [LabelKeyObjectVersion ] ||
109+ gvr .Resource != labels [LabelKeyObjectResource ] {
110+ return false
111+ }
112+
113+ return true
114+ },
115+ Handler : cache.ResourceEventHandlerFuncs {
116+ AddFunc : func (obj interface {}) { c .enqueueCacheObject (obj ) },
117+ UpdateFunc : func (_ , obj interface {}) { c .enqueueCacheObject (obj ) },
118+ DeleteFunc : func (obj interface {}) { c .enqueueCacheObject (obj ) },
119+ },
95120 })
96121 if err != nil {
97122 return nil , err
@@ -114,12 +139,6 @@ func (c *Controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResou
114139}
115140
116141func (c * Controller ) enqueueCacheObject (obj interface {}) {
117- key , err := kcpcache .DeletionHandlingMetaClusterNamespaceKeyFunc (obj )
118- if err != nil {
119- utilruntime .HandleError (err )
120- return
121- }
122-
123142 // This way we extract what is the original GVR of the object that we are replicating.
124143 cr , ok := obj .(* cachev1alpha1.CachedObject )
125144 if ! ok {
@@ -133,7 +152,7 @@ func (c *Controller) enqueueCacheObject(obj interface{}) {
133152 Version : labels [LabelKeyObjectVersion ],
134153 Resource : labels [LabelKeyObjectResource ],
135154 }
136-
155+ key := kcpcache . ToClusterAwareKey ( string ( logicalcluster . From ( cr )), labels [ LabelKeyObjectOriginalNamespace ], labels [ LabelKeyObjectOriginalName ])
137156 gvrKey := fmt .Sprintf ("%s.%s.%s::%s" , gvr .Version , gvr .Resource , gvr .Group , key )
138157 c .queue .Add (gvrKey )
139158}
@@ -203,8 +222,8 @@ type Controller struct {
203222 shardName string
204223 queue workqueue.TypedRateLimitingInterface [string ]
205224
206- dynamicCacheClient kcpdynamic.ClusterInterface
207- kcpCacheClient kcpclientset.ClusterInterface
225+ dynamicClusterClient kcpdynamic.ClusterInterface
226+ kcpCacheClient kcpclientset.ClusterInterface
208227
209228 replicated * ReplicatedGVR
210229
0 commit comments