Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/cluster-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func main() {
return ignoreCanceled(localMgr.Start(ctx))
})
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, mcMgr))
return ignoreCanceled(provider.Start(ctx, mcMgr))
})
g.Go(func() error {
return ignoreCanceled(mcMgr.Start(ctx))
Expand Down
2 changes: 1 addition & 1 deletion examples/file/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func main() {
return ignoreCanceled(mgr.Start(ctx))
})
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, mgr))
return ignoreCanceled(provider.Start(ctx, mgr))
})
if err := g.Wait(); err != nil {
entryLog.Info("error in errgroup: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion examples/kind/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
// Starting everything.
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, mgr))
return ignoreCanceled(provider.Start(ctx, mgr))
})
g.Go(func() error {
return ignoreCanceled(mgr.Start(ctx))
Expand Down
2 changes: 1 addition & 1 deletion examples/namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func run(ctx context.Context, log logr.Logger, kubeconfig string) error {
// Starting everything.
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, mgr))
return ignoreCanceled(provider.Start(ctx, mgr))
})
g.Go(func() error {
return ignoreCanceled(cl.Start(ctx))
Expand Down
5 changes: 5 additions & 0 deletions pkg/multicluster/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ type Provider interface {
// IndexField indexes the given object by the given field on all engaged
// clusters, current and future.
IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error

// Start runs the provider. Implementation of this method should block.
// If you need to pass in manager, it is recommended to implement SetupWithManager(mgr mcmanager.Manager) error method on individual providers.
// Even if a provider gets a manager through e.g. `SetupWithManager` the `Aware` passed to this method must be used to engage clusters.
Start(context.Context, Aware) error

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're mixing two concepts here.

  1. The manager that is used for running things (the ACTUAL manager that deals with runnables) And to provide a kubeconfig (thought those two could also be split)
  2. The Aware part that deals with cluster manager and register.

The start part is ONLY related to runnables. The aware part is a parameter that the provider should hold, provided during setup?.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what do you suggest here? I dont see mixing here, but I been soaking in this for way too long :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a mixing either.
The docstring specifies that if the full manager is needed that the provider should implement a different method. And I don't think that there will be many providers that actually need a reference to the full manager.

It only takes an Aware because it should use that Aware to engage clusters.

I'd instead argue that this will encourage cleaner design because provider developers will rather ask for a kubeconfig in their setup (either as parameters to their New or in their Options...) - which may be the local cluster config but could also be something else - rather than expecting the manager.

}
20 changes: 10 additions & 10 deletions providers/cluster-api/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
)

Expand Down Expand Up @@ -109,7 +108,7 @@ type Provider struct {
client client.Client

lock sync.Mutex
mcMgr mcmanager.Manager
mcAware multicluster.Aware
clusters map[string]cluster.Cluster
cancelFns map[string]context.CancelFunc
indexers []index
Expand All @@ -126,13 +125,14 @@ func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster,
return nil, multicluster.ErrClusterNotFound
}

// Run starts the provider and blocks.
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we mark it deprecated instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to?
First, its replaced with very similar method with same functionality
Second, go is good to catch these at compile time so people should just make a change.

Its not like we deprecating a functionality and giving people to find different way of doing things.

p.log.Info("Starting Cluster-API cluster provider")

// Start starts the provider and blocks.
func (p *Provider) Start(ctx context.Context, mcAware multicluster.Aware) error {
p.lock.Lock()
p.mcMgr = mgr
p.lock.Unlock()
defer p.lock.Unlock()

p.mcAware = mcAware

p.log.Info("Starting Cluster-API cluster provider")

<-ctx.Done()

Expand Down Expand Up @@ -171,7 +171,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
// TODO(sttts): do tighter logging.

// provider already started?
if p.mcMgr == nil {
if p.mcAware == nil {
return reconcile.Result{RequeueAfter: time.Second * 2}, nil
}

Expand Down Expand Up @@ -222,7 +222,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
p.log.Info("Added new cluster")

// engage manager.
if err := p.mcMgr.Engage(clusterCtx, key, cl); err != nil {
if err := p.mcAware.Engage(clusterCtx, key, cl); err != nil {
log.Error(err, "failed to engage manager")
delete(p.clusters, key)
delete(p.cancelFns, key)
Expand Down
14 changes: 10 additions & 4 deletions providers/cluster-inventory-api/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Provider struct {
strategy kubeconfigstrategy.Interface

lock sync.RWMutex
mcMgr mcmanager.Manager
manager mcmanager.Manager
clusters map[string]cluster.Cluster
cancelFns map[string]context.CancelFunc
kubeconfig map[string]*rest.Config
Expand Down Expand Up @@ -125,7 +125,7 @@ func (p *Provider) SetupWithManager(mgr mcmanager.Manager) error {
if mgr == nil {
return fmt.Errorf("manager is nil")
}
p.mcMgr = mgr
p.manager = mgr

// Get the local manager from the multi-cluster manager.
localMgr := mgr.GetLocalManager()
Expand Down Expand Up @@ -199,7 +199,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
defer p.lock.Unlock()

// provider already started?
if p.mcMgr == nil {
if p.manager == nil {
log.V(3).Info("Provider not started yet, requeuing")
return reconcile.Result{RequeueAfter: time.Second * 2}, nil
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
log.Info("Added new cluster for ClusterProfile")

// engage manager.
if err := p.mcMgr.Engage(clusterCtx, key, cl); err != nil {
if err := p.manager.Engage(clusterCtx, key, cl); err != nil {
log.Error(err, "failed to engage manager for ClusterProfile")
delete(p.clusters, key)
delete(p.cancelFns, key)
Expand Down Expand Up @@ -301,3 +301,9 @@ func (p *Provider) IndexField(ctx context.Context, obj client.Object, field stri

return nil
}

// Start runs the provider and blocks.
func (p *Provider) Start(ctx context.Context, _ multicluster.Aware) error {
<-ctx.Done()
return ctx.Err()
}
52 changes: 28 additions & 24 deletions providers/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/log"

mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
)

Expand Down Expand Up @@ -137,14 +136,19 @@ type Provider struct {

log logr.Logger

clustersLock sync.RWMutex
lock sync.RWMutex
mcAware multicluster.Aware
clusters map[string]cluster.Cluster
clusterCancel map[string]func()
}

// Run starts the provider and updates the clusters and is blocking.
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
if err := p.run(ctx, mgr); err != nil {
// Start starts the provider and updates the clusters and is blocking.
func (p *Provider) Start(ctx context.Context, mcAware multicluster.Aware) error {
p.lock.Lock()
p.mcAware = mcAware
p.lock.Unlock()

if err := p.run(ctx); err != nil {
return fmt.Errorf("initial update failed: %w", err)
}

Expand Down Expand Up @@ -181,7 +185,7 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
// would also require to track which cluster belongs to
// which file.
// Instead clusters are just updated from all files.
if err := p.run(ctx, mgr); err != nil {
if err := p.run(ctx); err != nil {
p.log.Error(err, "failed to update clusters after file change")
}
case err, ok := <-watcher.Errors:
Expand All @@ -194,17 +198,17 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
}

// RunOnce performs a single update of the clusters.
func (p *Provider) RunOnce(ctx context.Context, mgr mcmanager.Manager) error {
return p.run(ctx, mgr)
func (p *Provider) RunOnce(ctx context.Context) error {
return p.run(ctx)
}

func (p *Provider) addCluster(ctx context.Context, mgr mcmanager.Manager, name string, cl cluster.Cluster) {
func (p *Provider) addCluster(ctx context.Context, name string, cl cluster.Cluster) {
ctx, cancel := context.WithCancel(ctx)

p.clustersLock.Lock()
p.lock.Lock()
p.clusters[name] = cl
p.clusterCancel[name] = cancel
p.clustersLock.Unlock()
p.lock.Unlock()

go func() {
if err := cl.Start(ctx); err != nil {
Expand All @@ -213,17 +217,17 @@ func (p *Provider) addCluster(ctx context.Context, mgr mcmanager.Manager, name s
p.removeCluster(name)
}()

if mgr != nil {
if err := mgr.Engage(ctx, name, cl); err != nil {
if p.mcAware != nil {
if err := p.mcAware.Engage(ctx, name, cl); err != nil {
cancel()
p.log.Error(err, "failed to engage cluster", "name", name)
}
}
}

func (p *Provider) removeCluster(name string) {
p.clustersLock.Lock()
defer p.clustersLock.Unlock()
p.lock.Lock()
defer p.lock.Unlock()

if cancel, ok := p.clusterCancel[name]; ok {
cancel()
Expand All @@ -232,7 +236,7 @@ func (p *Provider) removeCluster(name string) {
}
}

func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
func (p *Provider) run(ctx context.Context) error {
loadedClusters, err := p.loadClusters()
if err != nil {
return fmt.Errorf("failed to load clusters: %w", err)
Expand All @@ -247,12 +251,12 @@ func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
if !cmp.Equal(existingCluster.GetConfig(), cl.GetConfig()) {
p.log.Info("updating cluster", "name", name)
p.removeCluster(name)
p.addCluster(ctx, mgr, name, cl)
p.addCluster(ctx, name, cl)
}
continue
}
p.log.Info("adding cluster", "name", name)
p.addCluster(ctx, mgr, name, cl)
p.addCluster(ctx, name, cl)
}

// delete clusters that are no longer present
Expand All @@ -271,8 +275,8 @@ func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
// If the cluster name is empty (""), it returns the first cluster
// found.
func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster, error) {
p.clustersLock.RLock()
defer p.clustersLock.RUnlock()
p.lock.RLock()
defer p.lock.RUnlock()

if clusterName == "" {
for _, cl := range p.clusters {
Expand All @@ -289,8 +293,8 @@ func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster,

// IndexField indexes a field on all clusters.
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
p.clustersLock.RLock()
defer p.clustersLock.RUnlock()
p.lock.RLock()
defer p.lock.RUnlock()

for name, cl := range p.clusters {
if err := cl.GetCache().IndexField(ctx, obj, field, extractValue); err != nil {
Expand All @@ -302,7 +306,7 @@ func (p *Provider) IndexField(ctx context.Context, obj client.Object, field stri

// ClusterNames returns the names of all clusters known to the provider.
func (p *Provider) ClusterNames() []string {
p.clustersLock.RLock()
defer p.clustersLock.RUnlock()
p.lock.RLock()
defer p.lock.RUnlock()
return slices.Sorted(maps.Keys(p.clusters))
}
3 changes: 2 additions & 1 deletion providers/file/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ var _ = Describe("Provider File", Ordered, func() {
KubeconfigDirs: []string{discoverDir},
})
Expect(err).NotTo(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
})

By("Starting the provider", func() {
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, nil))
return ignoreCanceled(provider.Start(ctx, nil)) // Pass nil for aware since we are not using a manager in this test
})
})

Expand Down
8 changes: 0 additions & 8 deletions providers/kind/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ require (
require (
github.com/BurntSushi/toml v1.4.0 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand All @@ -41,10 +38,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand All @@ -54,7 +47,6 @@ require (
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/time v0.9.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
6 changes: 0 additions & 6 deletions providers/kind/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU=
github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
Expand Down Expand Up @@ -64,17 +62,13 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
Expand Down
Loading