Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewTypedUnmanaged[request mcreconcile.ClusterAware[request]](name string, m
}
return &mcController[request]{
TypedController: c,
clusters: make(map[string]engagedCluster),
clusters: make(map[string]*engagedCluster),
}, nil
}

Expand All @@ -101,28 +101,39 @@ type mcController[request mcreconcile.ClusterAware[request]] struct {
controller.TypedController[request]

lock sync.Mutex
clusters map[string]engagedCluster
clusters map[string]*engagedCluster
sources []mcsource.TypedSource[client.Object, request]
}

type engagedCluster struct {
name string
cluster cluster.Cluster
ctx context.Context
cancel context.CancelFunc
}

func (c *mcController[request]) Engage(ctx context.Context, name string, cl cluster.Cluster) error {
c.lock.Lock()
defer c.lock.Unlock()

if old, ok := c.clusters[name]; ok && old.cluster == cl {
return nil
// Check if we already have this cluster engaged with the SAME context
if old, ok := c.clusters[name]; ok {
if old.cluster == cl && old.ctx.Err() == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a bit of a stupid Go question, but this comparison will fail when the pointer backing the interface points to a different object, right? Should we be concerned that providers might construct cluster objects on the fly and could depend on names being equal to stop replacing an existing cluster?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes—interface equality on a pointer-backed cluster compares pointer identity, so a new instance for the same name won’t equal the old one; that’s intentional because a new instance usually means kubeconfig/impl changed and we should re-engage. if a provider churns instances but is semantically the same, we can relax by also checking a stable handle (e.g., cl.GetCache()) or document that providers should return a stable instance per name unless they intend a reattach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I think this is a valid assumption. I wonder if we should document this explicitly though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Same impl, engagement still live → nothing to do
return nil
}
// Re-engage: either old ctx is done, or impl changed. Stop the old one if still live.
if old.ctx.Err() == nil {
old.cancel()
}
delete(c.clusters, name)
}

ctx, cancel := context.WithCancel(ctx) //nolint:govet // cancel is called in the error case only.
engCtx, cancel := context.WithCancel(ctx)

// pass through in case the controller itself is cluster aware
if ctrl, ok := c.TypedController.(multicluster.Aware); ok {
if err := ctrl.Engage(ctx, name, cl); err != nil {
if err := ctrl.Engage(engCtx, name, cl); err != nil {
cancel()
return err
}
Expand All @@ -135,49 +146,49 @@ func (c *mcController[request]) Engage(ctx context.Context, name string, cl clus
cancel()
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
}
if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil {
if err := c.TypedController.Watch(startWithinContext[request](engCtx, src)); err != nil {
cancel()
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
}
}

ec := engagedCluster{
ec := &engagedCluster{
name: name,
cluster: cl,
ctx: engCtx,
cancel: cancel,
}
c.clusters[name] = ec
go func() {
go func(ctx context.Context, key string, token *engagedCluster) {
<-ctx.Done()
c.lock.Lock()
defer c.lock.Unlock()
if c.clusters[name] == ec {
delete(c.clusters, name)
if cur, ok := c.clusters[key]; ok && cur == token {
delete(c.clusters, key)
}
}()
// note: cancel() is driven by parent; no need to call here
}(engCtx, name, ec)

return nil //nolint:govet // cancel is called in the error case only.
return nil
}

func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error {
c.lock.Lock()
defer c.lock.Unlock()

ctx, cancel := context.WithCancel(context.Background()) //nolint:govet // cancel is called in the error case only.

for name, eng := range c.clusters {
src, err := src.ForCluster(name, eng.cluster)
if err != nil {
cancel()
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
}
if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil {
cancel()
if err := c.TypedController.Watch(startWithinContext[request](eng.ctx, src)); err != nil {
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
}
}

c.sources = append(c.sources, src)

return nil //nolint:govet // cancel is called in the error case only.
return nil
}

func startWithinContext[request mcreconcile.ClusterAware[request]](ctx context.Context, src source.TypedSource[request]) source.TypedSource[request] {
Expand Down
21 changes: 14 additions & 7 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type Runnable interface {

var _ Manager = &mcManager{}

// Option mutates mcManager configuration.
type Option func(*mcManager)

type mcManager struct {
manager.Manager
provider multicluster.Provider
Expand All @@ -134,20 +137,24 @@ type mcManager struct {
// New returns a new Manager for creating Controllers. The provider is used to
// discover and manage clusters. With a provider set to nil, the manager will
// behave like a regular controller-runtime manager.
func New(config *rest.Config, provider multicluster.Provider, opts Options) (Manager, error) {
func New(config *rest.Config, provider multicluster.Provider, opts manager.Options, mcOpts ...Option) (Manager, error) {
mgr, err := manager.New(config, opts)
if err != nil {
return nil, err
}
return WithMultiCluster(mgr, provider)
return WithMultiCluster(mgr, provider, mcOpts...)
}

// WithMultiCluster wraps a host manager to run multi-cluster controllers.
func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider) (Manager, error) {
return &mcManager{
Manager: mgr,
provider: provider,
}, nil
func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider, mcOpts ...Option) (Manager, error) {
m := &mcManager{Manager: mgr, provider: provider}

// Apply options before wiring the Runnable so overrides take effect early.
for _, o := range mcOpts {
o(m)
}

return m, nil
}

// GetCluster returns a cluster for the given identifying cluster name. Get
Expand Down
3 changes: 3 additions & 0 deletions pkg/multicluster/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Aware interface {
// The given context is tied to the Cluster's lifecycle and will be cancelled when the
// Cluster is removed or an error occurs.
//
// Engage is a no-op if the passed cluster is equal to the existing, already
// engaged cluster (equal means interface equality, i.e. the same instance).
//
// Implementers should return an error if they cannot start operations for the given Cluster,
// and should ensure this operation is re-entrant and non-blocking.
//
Expand Down
Loading
Loading