Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewTypedUnmanaged[request mcreconcile.ClusterAware[request]](name string, m
}
return &mcController[request]{
TypedController: c,
clusters: make(map[string]engagedCluster),
clusters: make(map[string]*engagedCluster),
}, nil
}

Expand All @@ -101,28 +101,39 @@ type mcController[request mcreconcile.ClusterAware[request]] struct {
controller.TypedController[request]

lock sync.Mutex
clusters map[string]engagedCluster
clusters map[string]*engagedCluster
sources []mcsource.TypedSource[client.Object, request]
}

type engagedCluster struct {
name string
cluster cluster.Cluster
ctx context.Context
cancel context.CancelFunc
}

func (c *mcController[request]) Engage(ctx context.Context, name string, cl cluster.Cluster) error {
c.lock.Lock()
defer c.lock.Unlock()

if old, ok := c.clusters[name]; ok && old.cluster == cl {
return nil
// Check if we already have this cluster engaged with the SAME context
if old, ok := c.clusters[name]; ok {
if old.cluster == cl && old.ctx.Err() == nil {
// Same impl, engagement still live → nothing to do
return nil
}
// Re-engage: either old ctx is done, or impl changed. Stop the old one if still live.
if old.ctx.Err() == nil {
old.cancel()
}
delete(c.clusters, name)
}

ctx, cancel := context.WithCancel(ctx) //nolint:govet // cancel is called in the error case only.
engCtx, cancel := context.WithCancel(ctx)

// pass through in case the controller itself is cluster aware
if ctrl, ok := c.TypedController.(multicluster.Aware); ok {
if err := ctrl.Engage(ctx, name, cl); err != nil {
if err := ctrl.Engage(engCtx, name, cl); err != nil {
cancel()
return err
}
Expand All @@ -135,49 +146,49 @@ func (c *mcController[request]) Engage(ctx context.Context, name string, cl clus
cancel()
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
}
if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil {
if err := c.TypedController.Watch(startWithinContext[request](engCtx, src)); err != nil {
cancel()
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
}
}

ec := engagedCluster{
ec := &engagedCluster{
name: name,
cluster: cl,
ctx: engCtx,
cancel: cancel,
}
c.clusters[name] = ec
go func() {
go func(ctx context.Context, key string, token *engagedCluster) {
<-ctx.Done()
c.lock.Lock()
defer c.lock.Unlock()
if c.clusters[name] == ec {
delete(c.clusters, name)
if cur, ok := c.clusters[key]; ok && cur == token {
delete(c.clusters, key)
}
}()
// note: cancel() is driven by parent; no need to call here
}(engCtx, name, ec)

return nil //nolint:govet // cancel is called in the error case only.
return nil
}

func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error {
c.lock.Lock()
defer c.lock.Unlock()

ctx, cancel := context.WithCancel(context.Background()) //nolint:govet // cancel is called in the error case only.

for name, eng := range c.clusters {
src, err := src.ForCluster(name, eng.cluster)
if err != nil {
cancel()
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
}
if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil {
cancel()
if err := c.TypedController.Watch(startWithinContext[request](eng.ctx, src)); err != nil {
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
}
}

c.sources = append(c.sources, src)

return nil //nolint:govet // cancel is called in the error case only.
return nil
}

func startWithinContext[request mcreconcile.ClusterAware[request]](ctx context.Context, src source.TypedSource[request]) source.TypedSource[request] {
Expand Down
21 changes: 14 additions & 7 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type Runnable interface {

var _ Manager = &mcManager{}

// Option mutates mcManager configuration.
type Option func(*mcManager)

type mcManager struct {
manager.Manager
provider multicluster.Provider
Expand All @@ -134,20 +137,24 @@ type mcManager struct {
// New returns a new Manager for creating Controllers. The provider is used to
// discover and manage clusters. With a provider set to nil, the manager will
// behave like a regular controller-runtime manager.
func New(config *rest.Config, provider multicluster.Provider, opts Options) (Manager, error) {
func New(config *rest.Config, provider multicluster.Provider, opts manager.Options, mcOpts ...Option) (Manager, error) {
mgr, err := manager.New(config, opts)
if err != nil {
return nil, err
}
return WithMultiCluster(mgr, provider)
return WithMultiCluster(mgr, provider, mcOpts...)
}

// WithMultiCluster wraps a host manager to run multi-cluster controllers.
func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider) (Manager, error) {
return &mcManager{
Manager: mgr,
provider: provider,
}, nil
func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider, mcOpts ...Option) (Manager, error) {
m := &mcManager{Manager: mgr, provider: provider}

// Apply options before wiring the Runnable so overrides take effect early.
for _, o := range mcOpts {
o(m)
}

return m, nil
}

// GetCluster returns a cluster for the given identifying cluster name. Get
Expand Down
Loading
Loading