Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func NewController(
kcpClusterClient kcpclientset.ClusterInterface,
kcpCacheClient kcpclientset.ClusterInterface,
dynamicClient kcpdynamic.ClusterInterface,
cacheDynamicClient kcpdynamic.ClusterInterface,

kubeClusterClient kcpkubernetesclientset.ClusterInterface,
namespaceInformer kcpcorev1informers.NamespaceClusterInformer,
Expand All @@ -93,8 +92,7 @@ func NewController(
kcpClient: kcpClusterClient,
kcpCacheClient: kcpCacheClient,

dynamicClient: dynamicClient,
cacheDynamicClient: cacheDynamicClient,
dynamicClient: dynamicClient,

dynRESTMapper: dynRESTMapper,

Expand Down Expand Up @@ -165,8 +163,7 @@ type Controller struct {
kcpClient kcpclientset.ClusterInterface
kcpCacheClient kcpclientset.ClusterInterface

dynamicClient kcpdynamic.ClusterInterface
cacheDynamicClient kcpdynamic.ClusterInterface
dynamicClient kcpdynamic.ClusterInterface

dynRESTMapper *dynamicrestmapper.DynamicRESTMapper

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *Controller) reconcile(ctx context.Context, cluster logicalcluster.Name,
},
&replication{
shardName: c.shardName,
dynamicCacheClient: c.dynamicClient,
dynamicClusterClient: c.dynamicClient,
kcpCacheClient: c.kcpCacheClient,
dynRESTMapper: c.dynRESTMapper,
cacheKcpInformers: c.cacheKcpInformers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
// Or deletes the replication controller if the published resource is being deleted.
type replication struct {
shardName string
dynamicCacheClient kcpdynamic.ClusterInterface
dynamicClusterClient kcpdynamic.ClusterInterface
kcpCacheClient kcpclientset.ClusterInterface
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
cacheKcpInformers kcpinformers.SharedInformerFactory
Expand Down Expand Up @@ -103,8 +103,9 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph

c, err := replicationcontroller.NewController(
r.shardName,
r.dynamicCacheClient,
r.dynamicClusterClient,
r.kcpCacheClient,
cluster,
gvr,
replicated,
callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
kcpdynamic "github.com/kcp-dev/client-go/dynamic"
"github.com/kcp-dev/logicalcluster/v3"
cachev1alpha1 "github.com/kcp-dev/sdk/apis/cache/v1alpha1"
kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster"

Expand All @@ -53,8 +54,9 @@ const (
// NewController returns a new replication controller.
func NewController(
shardName string,
dynamicCacheClient kcpdynamic.ClusterInterface,
dynamicClusterClient kcpdynamic.ClusterInterface,
kcpCacheClient kcpclientset.ClusterInterface,
cluster logicalcluster.Name,
gvr schema.GroupVersionResource,
replicated *ReplicatedGVR,
callback func(),
Expand All @@ -68,12 +70,12 @@ func NewController(
Name: ControllerName,
},
),
dynamicCacheClient: dynamicCacheClient,
kcpCacheClient: kcpCacheClient,
replicated: replicated,
callback: callback,
cleanupFuncs: make([]func(), 0),
localLabelSelector: localLabelSelector,
dynamicClusterClient: dynamicClusterClient,
kcpCacheClient: kcpCacheClient,
replicated: replicated,
callback: callback,
cleanupFuncs: make([]func(), 0),
localLabelSelector: localLabelSelector,
}

localHandler, err := c.replicated.Local.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -88,10 +90,33 @@ func NewController(
_ = c.replicated.Local.RemoveEventHandler(localHandler)
})

globalHandler, err := c.replicated.Global.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
UpdateFunc: func(_, obj interface{}) { c.enqueueCacheObject(obj) },
DeleteFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
globalHandler, err := c.replicated.Global.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
cachedObj := obj.(*cachev1alpha1.CachedObject)
labels := cachedObj.Labels
if labels == nil {
return false
}

// Skip CachedObjects that are not coming from the source CachedResource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: dont need these new lines :)

if logicalcluster.From(cachedObj) != cluster {
return false
}

if gvr.Group != labels[LabelKeyObjectGroup] ||
gvr.Version != labels[LabelKeyObjectVersion] ||
gvr.Resource != labels[LabelKeyObjectResource] {
return false
}

return true
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
UpdateFunc: func(_, obj interface{}) { c.enqueueCacheObject(obj) },
DeleteFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
},
})
if err != nil {
return nil, err
Expand All @@ -114,12 +139,6 @@ func (c *Controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResou
}

func (c *Controller) enqueueCacheObject(obj interface{}) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
return
}

// This way we extract what is the original GVR of the object that we are replicating.
cr, ok := obj.(*cachev1alpha1.CachedObject)
if !ok {
Expand All @@ -133,7 +152,7 @@ func (c *Controller) enqueueCacheObject(obj interface{}) {
Version: labels[LabelKeyObjectVersion],
Resource: labels[LabelKeyObjectResource],
}

key := kcpcache.ToClusterAwareKey(string(logicalcluster.From(cr)), labels[LabelKeyObjectOriginalNamespace], labels[LabelKeyObjectOriginalName])
gvrKey := fmt.Sprintf("%s.%s.%s::%s", gvr.Version, gvr.Resource, gvr.Group, key)
c.queue.Add(gvrKey)
}
Expand Down Expand Up @@ -203,8 +222,8 @@ type Controller struct {
shardName string
queue workqueue.TypedRateLimitingInterface[string]

dynamicCacheClient kcpdynamic.ClusterInterface
kcpCacheClient kcpclientset.ClusterInterface
dynamicClusterClient kcpdynamic.ClusterInterface
kcpCacheClient kcpclientset.ClusterInterface

replicated *ReplicatedGVR

Expand Down
Loading