Skip to content

✨ Extend interface with runner #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/cluster-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func main() {
return ignoreCanceled(localMgr.Start(ctx))
})
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, mcMgr))
return ignoreCanceled(provider.Start(ctx, mcMgr))
})
g.Go(func() error {
return ignoreCanceled(mcMgr.Start(ctx))
Expand Down
2 changes: 1 addition & 1 deletion examples/file/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func main() {
return ignoreCanceled(mgr.Start(ctx))
})
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, mgr))
return ignoreCanceled(provider.Start(ctx, mgr))
})
if err := g.Wait(); err != nil {
entryLog.Info("error in errgroup: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion examples/kind/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
// Starting everything.
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, mgr))
return ignoreCanceled(provider.Start(ctx, mgr))
})
g.Go(func() error {
return ignoreCanceled(mgr.Start(ctx))
Expand Down
2 changes: 1 addition & 1 deletion examples/namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func run(ctx context.Context, log logr.Logger, kubeconfig string) error {
// Starting everything.
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, mgr))
return ignoreCanceled(provider.Start(ctx, mgr))
})
g.Go(func() error {
return ignoreCanceled(cl.Start(ctx))
Expand Down
5 changes: 5 additions & 0 deletions pkg/multicluster/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ type Provider interface {
// IndexField indexes the given object by the given field on all engaged
// clusters, current and future.
IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error

// Start runs the provider. Implementation of this method should block.
// If you need to pass in manager, it is recommended to implement SetupWithManager(mgr mcmanager.Manager) error method on individual providers.
// It is not part of the provider interface because it is not required for all providers.
Start(context.Context, Aware) error
}
20 changes: 10 additions & 10 deletions providers/cluster-api/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
)

Expand Down Expand Up @@ -109,7 +108,7 @@ type Provider struct {
client client.Client

lock sync.Mutex
mcMgr mcmanager.Manager
aware multicluster.Aware
clusters map[string]cluster.Cluster
cancelFns map[string]context.CancelFunc
indexers []index
Expand All @@ -126,13 +125,14 @@ func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster,
return nil, multicluster.ErrClusterNotFound
}

// Run starts the provider and blocks.
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
p.log.Info("Starting Cluster-API cluster provider")

// Start starts the provider and blocks.
func (p *Provider) Start(ctx context.Context, aware multicluster.Aware) error {
p.lock.Lock()
p.mcMgr = mgr
p.lock.Unlock()
defer p.lock.Unlock()

p.aware = aware
Comment on lines +131 to +133
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defer p.lock.Unlock()
p.aware = aware
p.aware = aware
p.lock.Unlock()

Otherwise the provider locks I think? Because the reconcile also locks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there should not be any reconciliation until started. However, it's better to be safe than sorry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct but the .Start is blocking until the context is done and holds the lock at the same time, so the Reconcile method will never execute because it cannot acquire the lock


p.log.Info("Starting Cluster-API cluster provider")

<-ctx.Done()

Expand Down Expand Up @@ -171,7 +171,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
// TODO(sttts): do tighter logging.

// provider already started?
if p.mcMgr == nil {
if p.aware == nil {
return reconcile.Result{RequeueAfter: time.Second * 2}, nil
}

Expand Down Expand Up @@ -222,7 +222,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
p.log.Info("Added new cluster")

// engage manager.
if err := p.mcMgr.Engage(clusterCtx, key, cl); err != nil {
if err := p.aware.Engage(clusterCtx, key, cl); err != nil {
log.Error(err, "failed to engage manager")
delete(p.clusters, key)
delete(p.cancelFns, key)
Expand Down
14 changes: 10 additions & 4 deletions providers/cluster-inventory-api/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Provider struct {
strategy kubeconfigstrategy.Interface

lock sync.RWMutex
mcMgr mcmanager.Manager
manager mcmanager.Manager
clusters map[string]cluster.Cluster
cancelFns map[string]context.CancelFunc
kubeconfig map[string]*rest.Config
Expand Down Expand Up @@ -114,7 +114,7 @@ func (p *Provider) SetupWithManager(mgr mcmanager.Manager) error {
if mgr == nil {
return fmt.Errorf("manager is nil")
}
p.mcMgr = mgr
p.manager = mgr

// Get the local manager from the multi-cluster manager.
localMgr := mgr.GetLocalManager()
Expand Down Expand Up @@ -188,7 +188,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
defer p.lock.Unlock()

// provider already started?
if p.mcMgr == nil {
if p.manager == nil {
log.V(3).Info("Provider not started yet, requeuing")
return reconcile.Result{RequeueAfter: time.Second * 2}, nil
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
log.Info("Added new cluster for ClusterProfile")

// engage manager.
if err := p.mcMgr.Engage(clusterCtx, key, cl); err != nil {
if err := p.manager.Engage(clusterCtx, key, cl); err != nil {
log.Error(err, "failed to engage manager for ClusterProfile")
delete(p.clusters, key)
delete(p.cancelFns, key)
Expand Down Expand Up @@ -291,3 +291,9 @@ func (p *Provider) IndexField(ctx context.Context, obj client.Object, field stri

return nil
}

// Start runs the provider and blocks.
func (p *Provider) Start(ctx context.Context, _ multicluster.Aware) error {
<-ctx.Done()
return ctx.Err()
}
31 changes: 18 additions & 13 deletions providers/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/log"

mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
)

Expand Down Expand Up @@ -138,13 +137,15 @@ type Provider struct {
log logr.Logger

clustersLock sync.RWMutex
aware multicluster.Aware
clusters map[string]cluster.Cluster
clusterCancel map[string]func()
}

// Run starts the provider and updates the clusters and is blocking.
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
if err := p.run(ctx, mgr); err != nil {
// Start starts the provider and updates the clusters and is blocking.
func (p *Provider) Start(ctx context.Context, aware multicluster.Aware) error {
p.aware = aware
if err := p.run(ctx); err != nil {
return fmt.Errorf("initial update failed: %w", err)
}

Expand Down Expand Up @@ -181,7 +182,7 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
// would also require to track which cluster belongs to
// which file.
// Instead clusters are just updated from all files.
if err := p.run(ctx, mgr); err != nil {
if err := p.run(ctx); err != nil {
p.log.Error(err, "failed to update clusters after file change")
}
case err, ok := <-watcher.Errors:
Expand All @@ -194,11 +195,11 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
}

// RunOnce performs a single update of the clusters.
func (p *Provider) RunOnce(ctx context.Context, mgr mcmanager.Manager) error {
return p.run(ctx, mgr)
func (p *Provider) RunOnce(ctx context.Context) error {
return p.run(ctx)
}

func (p *Provider) addCluster(ctx context.Context, mgr mcmanager.Manager, name string, cl cluster.Cluster) {
func (p *Provider) addCluster(ctx context.Context, name string, cl cluster.Cluster) {
ctx, cancel := context.WithCancel(ctx)

p.clustersLock.Lock()
Expand All @@ -213,8 +214,8 @@ func (p *Provider) addCluster(ctx context.Context, mgr mcmanager.Manager, name s
p.removeCluster(name)
}()

if mgr != nil {
if err := mgr.Engage(ctx, name, cl); err != nil {
if p.aware != nil {
if err := p.aware.Engage(ctx, name, cl); err != nil {
cancel()
p.log.Error(err, "failed to engage cluster", "name", name)
}
Expand All @@ -232,7 +233,11 @@ func (p *Provider) removeCluster(name string) {
}
}

func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
func (p *Provider) run(ctx context.Context) error {
if p.aware == nil {
return fmt.Errorf("aware is not set")
}

loadedClusters, err := p.loadClusters()
if err != nil {
return fmt.Errorf("failed to load clusters: %w", err)
Expand All @@ -247,12 +252,12 @@ func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
if !cmp.Equal(existingCluster.GetConfig(), cl.GetConfig()) {
p.log.Info("updating cluster", "name", name)
p.removeCluster(name)
p.addCluster(ctx, mgr, name, cl)
p.addCluster(ctx, name, cl)
}
continue
}
p.log.Info("adding cluster", "name", name)
p.addCluster(ctx, mgr, name, cl)
p.addCluster(ctx, name, cl)
}

// delete clusters that are no longer present
Expand Down
3 changes: 2 additions & 1 deletion providers/file/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ var _ = Describe("Provider File", Ordered, func() {
KubeconfigDirs: []string{discoverDir},
})
Expect(err).NotTo(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
})

By("Starting the provider", func() {
g.Go(func() error {
return ignoreCanceled(provider.Run(ctx, nil))
return ignoreCanceled(provider.Start(ctx, nil)) // Pass nil for aware since we are not using a manager in this test
})
})

Expand Down
8 changes: 0 additions & 8 deletions providers/kind/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ require (
require (
github.com/BurntSushi/toml v1.4.0 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand All @@ -41,10 +38,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand All @@ -54,7 +47,6 @@ require (
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/time v0.9.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
6 changes: 0 additions & 6 deletions providers/kind/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU=
github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
Expand Down Expand Up @@ -64,17 +62,13 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
Expand Down
25 changes: 13 additions & 12 deletions providers/kind/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/log"

mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
)

Expand All @@ -57,6 +56,8 @@ type index struct {

// Provider is a cluster Provider that works with a local Kind instance.
type Provider struct {
mcaware multicluster.Aware

opts []cluster.Option
log logr.Logger
lock sync.RWMutex
Expand All @@ -76,8 +77,10 @@ func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster
return nil, multicluster.ErrClusterNotFound
}

// Run starts the provider and blocks.
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
// Start starts the provider and blocks.
func (p *Provider) Start(ctx context.Context, aware multicluster.Aware) error {
p.mcaware = aware

p.log.Info("Starting kind cluster provider")

provider := kind.NewProvider()
Expand Down Expand Up @@ -152,15 +155,13 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
p.log.Info("Added new cluster", "cluster", clusterName)

// engage manager
if mgr != nil {
if err := mgr.Engage(clusterCtx, clusterName, cl); err != nil {
log.Error(err, "failed to engage manager")
p.lock.Lock()
delete(p.clusters, clusterName)
delete(p.cancelFns, clusterName)
p.lock.Unlock()
return false, nil
}
if err := p.mcaware.Engage(clusterCtx, clusterName, cl); err != nil {
log.Error(err, "failed to engage manager")
p.lock.Lock()
delete(p.clusters, clusterName)
delete(p.cancelFns, clusterName)
p.lock.Unlock()
return false, nil
}
}

Expand Down
Loading