diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a8ce31a..b24f768 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -91,7 +91,7 @@ func NewTypedUnmanaged[request mcreconcile.ClusterAware[request]](name string, m } return &mcController[request]{ TypedController: c, - clusters: make(map[string]engagedCluster), + clusters: make(map[string]*engagedCluster), }, nil } @@ -101,28 +101,39 @@ type mcController[request mcreconcile.ClusterAware[request]] struct { controller.TypedController[request] lock sync.Mutex - clusters map[string]engagedCluster + clusters map[string]*engagedCluster sources []mcsource.TypedSource[client.Object, request] } type engagedCluster struct { name string cluster cluster.Cluster + ctx context.Context + cancel context.CancelFunc } func (c *mcController[request]) Engage(ctx context.Context, name string, cl cluster.Cluster) error { c.lock.Lock() defer c.lock.Unlock() - if old, ok := c.clusters[name]; ok && old.cluster == cl { - return nil + // Check if we already have this cluster engaged with the SAME context + if old, ok := c.clusters[name]; ok { + if old.cluster == cl && old.ctx.Err() == nil { + // Same impl, engagement still live → nothing to do + return nil + } + // Re-engage: either old ctx is done, or impl changed. Stop the old one if still live. + if old.ctx.Err() == nil { + old.cancel() + } + delete(c.clusters, name) } - ctx, cancel := context.WithCancel(ctx) //nolint:govet // cancel is called in the error case only. + engCtx, cancel := context.WithCancel(ctx) // pass through in case the controller itself is cluster aware if ctrl, ok := c.TypedController.(multicluster.Aware); ok { - if err := ctrl.Engage(ctx, name, cl); err != nil { + if err := ctrl.Engage(engCtx, name, cl); err != nil { cancel() return err } @@ -135,49 +146,49 @@ func (c *mcController[request]) Engage(ctx context.Context, name string, cl clus cancel() return fmt.Errorf("failed to engage for cluster %q: %w", name, err) } - if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil { + if err := c.TypedController.Watch(startWithinContext[request](engCtx, src)); err != nil { cancel() return fmt.Errorf("failed to watch for cluster %q: %w", name, err) } } - ec := engagedCluster{ + ec := &engagedCluster{ name: name, cluster: cl, + ctx: engCtx, + cancel: cancel, } c.clusters[name] = ec - go func() { + go func(ctx context.Context, key string, token *engagedCluster) { + <-ctx.Done() c.lock.Lock() defer c.lock.Unlock() - if c.clusters[name] == ec { - delete(c.clusters, name) + if cur, ok := c.clusters[key]; ok && cur == token { + delete(c.clusters, key) } - }() + // note: cancel() is driven by parent; no need to call here + }(engCtx, name, ec) - return nil //nolint:govet // cancel is called in the error case only. + return nil } func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error { c.lock.Lock() defer c.lock.Unlock() - ctx, cancel := context.WithCancel(context.Background()) //nolint:govet // cancel is called in the error case only. - for name, eng := range c.clusters { src, err := src.ForCluster(name, eng.cluster) if err != nil { - cancel() return fmt.Errorf("failed to engage for cluster %q: %w", name, err) } - if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil { - cancel() + if err := c.TypedController.Watch(startWithinContext[request](eng.ctx, src)); err != nil { return fmt.Errorf("failed to watch for cluster %q: %w", name, err) } } c.sources = append(c.sources, src) - return nil //nolint:govet // cancel is called in the error case only. + return nil } func startWithinContext[request mcreconcile.ClusterAware[request]](ctx context.Context, src source.TypedSource[request]) source.TypedSource[request] { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index b79663e..f47cb90 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -124,6 +124,9 @@ type Runnable interface { var _ Manager = &mcManager{} +// Option mutates mcManager configuration. +type Option func(*mcManager) + type mcManager struct { manager.Manager provider multicluster.Provider @@ -134,20 +137,24 @@ type mcManager struct { // New returns a new Manager for creating Controllers. The provider is used to // discover and manage clusters. With a provider set to nil, the manager will // behave like a regular controller-runtime manager. -func New(config *rest.Config, provider multicluster.Provider, opts Options) (Manager, error) { +func New(config *rest.Config, provider multicluster.Provider, opts manager.Options, mcOpts ...Option) (Manager, error) { mgr, err := manager.New(config, opts) if err != nil { return nil, err } - return WithMultiCluster(mgr, provider) + return WithMultiCluster(mgr, provider, mcOpts...) } // WithMultiCluster wraps a host manager to run multi-cluster controllers. -func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider) (Manager, error) { - return &mcManager{ - Manager: mgr, - provider: provider, - }, nil +func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider, mcOpts ...Option) (Manager, error) { + m := &mcManager{Manager: mgr, provider: provider} + + // Apply options before wiring the Runnable so overrides take effect early. + for _, o := range mcOpts { + o(m) + } + + return m, nil } // GetCluster returns a cluster for the given identifying cluster name. Get diff --git a/pkg/multicluster/multicluster.go b/pkg/multicluster/multicluster.go index 4f6f777..d5a5931 100644 --- a/pkg/multicluster/multicluster.go +++ b/pkg/multicluster/multicluster.go @@ -30,6 +30,9 @@ type Aware interface { // The given context is tied to the Cluster's lifecycle and will be cancelled when the // Cluster is removed or an error occurs. // + // Engage is a no-op if the passed cluster is equal to the existing, already + // engaged cluster (equal means interface equality, i.e. the same instance). + // // Implementers should return an error if they cannot start operations for the given Cluster, // and should ensure this operation is re-entrant and non-blocking. // diff --git a/pkg/source/kind.go b/pkg/source/kind.go index 78f70e4..f5cbbad 100644 --- a/pkg/source/kind.go +++ b/pkg/source/kind.go @@ -17,10 +17,21 @@ limitations under the License. package source import ( + "context" + "sync" + "time" + + toolscache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + crcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/source" + crsource "sigs.k8s.io/controller-runtime/pkg/source" mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" @@ -46,6 +57,7 @@ func TypedKind[object client.Object, request mcreconcile.ClusterAware[request]]( handler: handler, predicates: predicates, project: func(_ cluster.Cluster, obj object) (object, error) { return obj, nil }, + resync: 0, // no periodic resync by default } } @@ -54,10 +66,20 @@ type kind[object client.Object, request mcreconcile.ClusterAware[request]] struc handler mchandler.TypedEventHandlerFunc[object, request] predicates []predicate.TypedPredicate[object] project func(cluster.Cluster, object) (object, error) + resync time.Duration } type clusterKind[object client.Object, request mcreconcile.ClusterAware[request]] struct { - source.TypedSyncingSource[request] + clusterName string + cl cluster.Cluster + obj object + h handler.TypedEventHandler[object, request] + preds []predicate.TypedPredicate[object] + resync time.Duration + + mu sync.Mutex + registration toolscache.ResourceEventHandlerRegistration + activeCtx context.Context } // WithProjection sets the projection function for the KindSource. @@ -66,22 +88,209 @@ func (k *kind[object, request]) WithProjection(project func(cluster.Cluster, obj return k } -func (k *kind[object, request]) ForCluster(name string, cl cluster.Cluster) (source.TypedSource[request], error) { +func (k *kind[object, request]) ForCluster(name string, cl cluster.Cluster) (crsource.TypedSource[request], error) { obj, err := k.project(cl, k.obj) if err != nil { return nil, err } return &clusterKind[object, request]{ - TypedSyncingSource: source.TypedKind(cl.GetCache(), obj, k.handler(name, cl), k.predicates...), + clusterName: name, + cl: cl, + obj: obj, + h: k.handler(name, cl), + preds: k.predicates, + resync: k.resync, }, nil } -func (k *kind[object, request]) SyncingForCluster(name string, cl cluster.Cluster) (source.TypedSyncingSource[request], error) { - obj, err := k.project(cl, k.obj) +func (k *kind[object, request]) SyncingForCluster(name string, cl cluster.Cluster) (crsource.TypedSyncingSource[request], error) { + src, err := k.ForCluster(name, cl) if err != nil { return nil, err } - return &clusterKind[object, request]{ - TypedSyncingSource: source.TypedKind(cl.GetCache(), obj, k.handler(name, cl), k.predicates...), - }, nil + return src.(crsource.TypedSyncingSource[request]), nil +} + +// WaitForSync satisfies TypedSyncingSource. +func (ck *clusterKind[object, request]) WaitForSync(ctx context.Context) error { + if !ck.cl.GetCache().WaitForCacheSync(ctx) { + return ctx.Err() + } + return nil +} + +// Start registers a removable handler on the (scoped) informer and removes it on ctx.Done(). +func (ck *clusterKind[object, request]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[request]) error { + log := log.FromContext(ctx).WithValues("cluster", ck.clusterName, "source", "kind") + + // Check if we're already started with this context + ck.mu.Lock() + if ck.registration != nil && ck.activeCtx != nil { + // Check if the active context is still valid + select { + case <-ck.activeCtx.Done(): + // Previous context cancelled, need to clean up and re-register + log.V(1).Info("previous context cancelled, cleaning up for re-registration") + // Clean up old registration is handled below + default: + // Still active with same context - check if it's the same context + if ck.activeCtx == ctx { + ck.mu.Unlock() + log.V(1).Info("handler already registered with same context") + return nil + } + // Different context but old one still active - this shouldn't happen + log.V(1).Info("different context while old one active, will re-register") + } + } + ck.mu.Unlock() + + inf, err := ck.getInformer(ctx, ck.obj) + if err != nil { + log.Error(err, "get informer failed") + return err + } + + // If there's an old registration, remove it first + ck.mu.Lock() + if ck.registration != nil { + log.V(1).Info("removing old event handler registration") + if err := inf.RemoveEventHandler(ck.registration); err != nil { + log.Error(err, "failed to remove old event handler") + } + ck.registration = nil + ck.activeCtx = nil + } + ck.mu.Unlock() + + // predicate helpers + passCreate := func(e event.TypedCreateEvent[object]) bool { + for _, p := range ck.preds { + if !p.Create(e) { + return false + } + } + return true + } + passUpdate := func(e event.TypedUpdateEvent[object]) bool { + for _, p := range ck.preds { + if !p.Update(e) { + return false + } + } + return true + } + passDelete := func(e event.TypedDeleteEvent[object]) bool { + for _, p := range ck.preds { + if !p.Delete(e) { + return false + } + } + return true + } + + // typed event builders + makeCreate := func(o client.Object) event.TypedCreateEvent[object] { + return event.TypedCreateEvent[object]{Object: any(o).(object)} + } + makeUpdate := func(oo, no client.Object) event.TypedUpdateEvent[object] { + return event.TypedUpdateEvent[object]{ObjectOld: any(oo).(object), ObjectNew: any(no).(object)} + } + makeDelete := func(o client.Object) event.TypedDeleteEvent[object] { + return event.TypedDeleteEvent[object]{Object: any(o).(object)} + } + + // Adapter that forwards to controller handler, honoring ctx. + h := toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(i interface{}) { + if ctx.Err() != nil { + return + } + if o, ok := i.(client.Object); ok { + e := makeCreate(o) + if passCreate(e) { + ck.h.Create(ctx, e, q) + } + } + }, + UpdateFunc: func(oo, no interface{}) { + if ctx.Err() != nil { + return + } + ooObj, ok1 := oo.(client.Object) + noObj, ok2 := no.(client.Object) + if ok1 && ok2 { + e := makeUpdate(ooObj, noObj) + if passUpdate(e) { + ck.h.Update(ctx, e, q) + } + } + }, + DeleteFunc: func(i interface{}) { + if ctx.Err() != nil { + return + } + // be robust to tombstones (provider should already unwrap) + if ts, ok := i.(toolscache.DeletedFinalStateUnknown); ok { + i = ts.Obj + } + if o, ok := i.(client.Object); ok { + e := makeDelete(o) + if passDelete(e) { + ck.h.Delete(ctx, e, q) + } + } + }, + } + + // Register via removable API. + reg, addErr := inf.AddEventHandlerWithResyncPeriod(h, ck.resync) + if addErr != nil { + log.Error(addErr, "AddEventHandlerWithResyncPeriod failed") + return addErr + } + + // Store registration and context + ck.mu.Lock() + ck.registration = reg + ck.activeCtx = ctx + ck.mu.Unlock() + + log.V(1).Info("kind source handler registered", "hasRegistration", reg != nil) + + // Defensive: ensure cache is synced. + if !ck.cl.GetCache().WaitForCacheSync(ctx) { + ck.mu.Lock() + _ = inf.RemoveEventHandler(ck.registration) + ck.registration = nil + ck.activeCtx = nil + ck.mu.Unlock() + log.V(1).Info("cache not synced; handler removed") + return ctx.Err() + } + log.V(1).Info("kind source cache synced") + + // Wait for context cancellation in a goroutine + go func() { + <-ctx.Done() + ck.mu.Lock() + defer ck.mu.Unlock() + + // Only remove if this is still our active registration + if ck.activeCtx == ctx && ck.registration != nil { + if err := inf.RemoveEventHandler(ck.registration); err != nil { + log.Error(err, "failed to remove event handler on context cancel") + } + ck.registration = nil + ck.activeCtx = nil + log.V(1).Info("kind source handler removed due to context cancellation") + } + }() + + return nil +} + +// getInformer resolves the informer from the cluster cache (provider returns a scoped informer). +func (ck *clusterKind[object, request]) getInformer(ctx context.Context, obj client.Object) (crcache.Informer, error) { + return ck.cl.GetCache().GetInformer(ctx, obj) }