Skip to content

Commit 0595ad0

Browse files
Updated kubeconfig provider to use controller instead of informer for watching secrets
Signed-off-by: Codey Jenkins <[email protected]>
1 parent 9729c81 commit 0595ad0

File tree

3 files changed

+100
-156
lines changed

3 files changed

+100
-156
lines changed

examples/kubeconfig/main.go

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,16 @@ import (
2222
"flag"
2323
"os"
2424

25-
"golang.org/x/sync/errgroup"
26-
2725
corev1 "k8s.io/api/core/v1"
2826
apierrors "k8s.io/apimachinery/pkg/api/errors"
2927

3028
ctrl "sigs.k8s.io/controller-runtime"
3129
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
3230
"sigs.k8s.io/controller-runtime/pkg/log/zap"
3331
"sigs.k8s.io/controller-runtime/pkg/manager"
32+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3433
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3534

36-
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3735
mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder"
3836
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
3937
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
@@ -78,28 +76,21 @@ func main() {
7876
},
7977
}
8078

81-
// Log only the serializable fields from manager options
82-
safeOpts := map[string]interface{}{
83-
"leaderElection": managerOpts.LeaderElection,
84-
"metrics": map[string]interface{}{
85-
"bindAddress": managerOpts.Metrics.BindAddress,
86-
},
87-
"healthProbeBindAddress": managerOpts.HealthProbeBindAddress,
88-
"pprofBindAddress": managerOpts.PprofBindAddress,
89-
"gracefulShutdownTimeout": managerOpts.GracefulShutdownTimeout,
90-
"controller": map[string]interface{}{
91-
"groupKindConcurrency": managerOpts.Controller.GroupKindConcurrency,
92-
"cacheSyncTimeout": managerOpts.Controller.CacheSyncTimeout,
93-
},
94-
}
95-
96-
entryLog.Info("Creating manager", "options", safeOpts)
79+
// Create multicluster manager
80+
entryLog.Info("Creating manager")
9781
mgr, err := mcmanager.New(ctrl.GetConfigOrDie(), provider, managerOpts)
9882
if err != nil {
9983
entryLog.Error(err, "Unable to create manager")
10084
os.Exit(1)
10185
}
10286

87+
// Setup provider controller with the manager.
88+
err = provider.SetupWithManager(ctx, mgr)
89+
if err != nil {
90+
entryLog.Error(err, "Unable to setup provider with manager")
91+
os.Exit(1)
92+
}
93+
10394
// Create a controller that watches ConfigMaps and logs when they are found.
10495
err = mcbuilder.ControllerManagedBy(mgr).
10596
Named("multicluster-configmaps").
@@ -132,23 +123,10 @@ func main() {
132123
os.Exit(1)
133124
}
134125

135-
// Starting everything.
136-
g, ctx := errgroup.WithContext(ctx)
137-
g.Go(func() error {
138-
return ignoreCanceled(provider.Run(ctx, mgr))
139-
})
140-
g.Go(func() error {
141-
return ignoreCanceled(mgr.Start(ctx))
142-
})
143-
if err := g.Wait(); err != nil {
126+
// Start the manager.
127+
err = mgr.Start(ctx)
128+
if err != nil && !errors.Is(err, context.Canceled) {
144129
entryLog.Error(err, "unable to start")
145130
os.Exit(1)
146131
}
147132
}
148-
149-
func ignoreCanceled(err error) error {
150-
if errors.Is(err, context.Canceled) {
151-
return nil
152-
}
153-
return err
154-
}

providers/kubeconfig/provider.go

Lines changed: 66 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@ import (
2828
"github.com/go-logr/logr"
2929

3030
corev1 "k8s.io/api/core/v1"
31-
toolscache "k8s.io/client-go/tools/cache"
31+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3232
"k8s.io/client-go/tools/clientcmd"
3333

34+
ctrl "sigs.k8s.io/controller-runtime"
35+
"sigs.k8s.io/controller-runtime/pkg/builder"
3436
"sigs.k8s.io/controller-runtime/pkg/client"
3537
"sigs.k8s.io/controller-runtime/pkg/cluster"
3638
"sigs.k8s.io/controller-runtime/pkg/log"
39+
"sigs.k8s.io/controller-runtime/pkg/predicate"
3740

3841
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
3942
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
@@ -62,7 +65,6 @@ func New(opts Options) *Provider {
6265
return &Provider{
6366
opts: opts,
6467
log: log.Log.WithName("kubeconfig-provider"),
65-
client: nil, // Will be set in Run
6668
clusters: map[string]activeCluster{},
6769
}
6870
}
@@ -88,10 +90,10 @@ type index struct {
8890
type Provider struct {
8991
opts Options
9092
log logr.Logger
91-
client client.Client
9293
lock sync.RWMutex // protects everything below.
9394
clusters map[string]activeCluster
9495
indexers []index
96+
mgr mcmanager.Manager
9597
}
9698

9799
type activeCluster struct {
@@ -113,83 +115,69 @@ func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster
113115
return nil, fmt.Errorf("cluster %s not found", clusterName)
114116
}
115117

116-
// Run starts the provider and blocks, watching for kubeconfig secrets.
117-
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
118+
// SetupWithManager sets up the provider with the manager.
119+
func (p *Provider) SetupWithManager(ctx context.Context, mgr mcmanager.Manager) error {
118120
log := p.log
119121
log.Info("Starting kubeconfig provider", "options", p.opts)
120122

121-
// If client isn't set yet, get it from the manager
122-
if p.client == nil && mgr != nil {
123-
log.Info("Setting client from manager")
124-
p.client = mgr.GetLocalManager().GetClient()
125-
if p.client == nil {
126-
return fmt.Errorf("failed to get client from manager")
127-
}
123+
if mgr == nil {
124+
return fmt.Errorf("manager is nil")
128125
}
126+
p.mgr = mgr
129127

130-
// Get the informer for secrets
131-
secretInf, err := mgr.GetLocalManager().GetCache().GetInformer(ctx, &corev1.Secret{})
132-
if err != nil {
133-
return fmt.Errorf("failed to get secret informer: %w", err)
128+
// Get the local manager from the multicluster manager
129+
localMgr := mgr.GetLocalManager()
130+
if localMgr == nil {
131+
return fmt.Errorf("local manager is nil")
134132
}
135133

136-
// Add event handlers for secrets
137-
if _, err := secretInf.AddEventHandler(toolscache.FilteringResourceEventHandler{
138-
FilterFunc: func(obj interface{}) bool {
139-
secret, ok := obj.(*corev1.Secret)
140-
if !ok {
141-
return false
142-
}
143-
// Only process secrets in our namespace with our label
144-
return secret.Namespace == p.opts.Namespace &&
145-
secret.Labels[p.opts.KubeconfigSecretLabel] == "true"
146-
},
147-
Handler: toolscache.ResourceEventHandlerFuncs{
148-
AddFunc: func(obj interface{}) {
149-
secret := obj.(*corev1.Secret)
150-
log.Info("Processing new secret", "name", secret.Name)
151-
if err := p.handleSecret(ctx, secret, mgr); err != nil {
152-
log.Error(err, "Failed to handle secret", "name", secret.Name)
153-
}
154-
},
155-
UpdateFunc: func(oldObj, newObj interface{}) {
156-
secret := newObj.(*corev1.Secret)
157-
log.Info("Processing updated secret", "name", secret.Name)
158-
if err := p.handleSecret(ctx, secret, mgr); err != nil {
159-
log.Error(err, "Failed to handle secret", "name", secret.Name)
160-
}
161-
},
162-
DeleteFunc: func(obj interface{}) {
163-
secret := obj.(*corev1.Secret)
164-
log.Info("Processing deleted secret", "name", secret.Name)
165-
p.handleSecretDelete(secret)
134+
// Setup the controller to watch for secrets containing kubeconfig data
135+
err := ctrl.NewControllerManagedBy(localMgr).
136+
For(&corev1.Secret{}, builder.WithPredicates(predicate.NewPredicateFuncs(
137+
// Only watch for secrets in the configured namespace and with the configured label
138+
func(obj client.Object) bool {
139+
return obj.GetNamespace() == p.opts.Namespace &&
140+
obj.GetLabels()[p.opts.KubeconfigSecretLabel] == "true"
166141
},
167-
},
168-
}); err != nil {
169-
return fmt.Errorf("failed to add event handlers: %w", err)
142+
))).
143+
Complete(p)
144+
if err != nil {
145+
return fmt.Errorf("failed to create controller: %w", err)
170146
}
171147

172-
// Block until context is done
173-
<-ctx.Done()
174-
log.Info("Context cancelled, exiting provider")
175-
return ctx.Err()
148+
return nil
176149
}
177150

178-
// handleSecret processes a secret containing kubeconfig data
179-
func (p *Provider) handleSecret(ctx context.Context, secret *corev1.Secret, mgr mcmanager.Manager) error {
180-
if secret == nil {
181-
return fmt.Errorf("received nil secret")
151+
// Reconcile is the main controller function that reconciles secrets containing kubeconfig data
152+
// when
153+
func (p *Provider) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
154+
secret := &corev1.Secret{}
155+
if err := p.mgr.GetLocalManager().GetClient().Get(ctx, req.NamespacedName, secret); err != nil {
156+
// If the secret is not found, remove the cluster and return.
157+
// This is a normal occurence when the secret is deleted.
158+
if apierrors.IsNotFound(err) {
159+
p.removeCluster(req.Name)
160+
return ctrl.Result{}, nil
161+
}
162+
return ctrl.Result{}, fmt.Errorf("failed to get secret: %w", err)
182163
}
183164

184165
// Extract name to use as cluster name
185166
clusterName := secret.Name
186167
log := p.log.WithValues("cluster", clusterName, "secret", fmt.Sprintf("%s/%s", secret.Namespace, secret.Name))
187168

169+
// If the secret is being deleted, remove the cluster and return.
170+
// Will probably only hit this if there is a finalizer on the secret.
171+
if secret.DeletionTimestamp != nil {
172+
p.removeCluster(clusterName)
173+
return ctrl.Result{}, nil
174+
}
175+
188176
// Check if this secret has kubeconfig data
189177
kubeconfigData, ok := secret.Data[p.opts.KubeconfigSecretKey]
190178
if !ok {
191179
log.Info("Secret does not contain kubeconfig data", "key", p.opts.KubeconfigSecretKey)
192-
return nil
180+
return ctrl.Result{}, nil
193181
}
194182

195183
// Hash the kubeconfig
@@ -204,26 +192,24 @@ func (p *Provider) handleSecret(ctx context.Context, secret *corev1.Secret, mgr
204192
if clusterExists {
205193
if ac.Hash == hashStr {
206194
log.Info("Cluster already exists and has the same kubeconfig, skipping")
207-
return nil
195+
return ctrl.Result{}, nil
208196
}
209197

210198
log.Info("Cluster already exists, updating it")
211-
if err := p.removeCluster(clusterName); err != nil {
212-
return fmt.Errorf("failed to remove existing cluster: %w", err)
213-
}
199+
p.removeCluster(clusterName)
214200
}
215201

216202
// Parse the kubeconfig
217203
restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigData)
218204
if err != nil {
219-
return fmt.Errorf("failed to parse kubeconfig: %w", err)
205+
return ctrl.Result{}, fmt.Errorf("failed to parse kubeconfig: %w", err)
220206
}
221207

222208
// Create a new cluster
223209
log.Info("Creating new cluster from kubeconfig")
224210
cl, err := cluster.New(restConfig)
225211
if err != nil {
226-
return fmt.Errorf("failed to create cluster: %w", err)
212+
return ctrl.Result{}, fmt.Errorf("failed to create cluster: %w", err)
227213
}
228214

229215
// Copy indexers to avoid holding lock.
@@ -235,7 +221,7 @@ func (p *Provider) handleSecret(ctx context.Context, secret *corev1.Secret, mgr
235221
// Apply any field indexers
236222
for _, idx := range indexers {
237223
if err := cl.GetFieldIndexer().IndexField(ctx, idx.object, idx.field, idx.extractValue); err != nil {
238-
return fmt.Errorf("failed to index field %q: %w", idx.field, err)
224+
return ctrl.Result{}, fmt.Errorf("failed to index field %q: %w", idx.field, err)
239225
}
240226
}
241227

@@ -253,7 +239,7 @@ func (p *Provider) handleSecret(ctx context.Context, secret *corev1.Secret, mgr
253239
log.Info("Waiting for cluster cache to be ready")
254240
if !cl.GetCache().WaitForCacheSync(clusterCtx) {
255241
cancel() // Cancel context before returning error
256-
return fmt.Errorf("failed to wait for cache sync")
242+
return ctrl.Result{}, fmt.Errorf("failed to wait for cache sync")
257243
}
258244
log.Info("Cluster cache is ready")
259245

@@ -269,50 +255,32 @@ func (p *Provider) handleSecret(ctx context.Context, secret *corev1.Secret, mgr
269255

270256
log.Info("Successfully added cluster")
271257

272-
// Engage the manager if provided
273-
if mgr != nil {
274-
if err := mgr.Engage(clusterCtx, clusterName, cl); err != nil {
275-
log.Error(err, "Failed to engage manager, removing cluster")
276-
p.lock.Lock()
277-
delete(p.clusters, clusterName)
278-
p.lock.Unlock()
279-
cancel() // Cancel the cluster context
280-
return fmt.Errorf("failed to engage manager: %w", err)
281-
}
282-
log.Info("Successfully engaged manager")
283-
}
284-
285-
return nil
286-
}
287-
288-
// handleSecretDelete handles the deletion of a secret
289-
func (p *Provider) handleSecretDelete(secret *corev1.Secret) {
290-
if secret == nil {
291-
return
258+
// Engage the manager
259+
if err := p.mgr.Engage(clusterCtx, clusterName, cl); err != nil {
260+
log.Error(err, "Failed to engage manager, removing cluster")
261+
p.lock.Lock()
262+
delete(p.clusters, clusterName)
263+
p.lock.Unlock()
264+
cancel() // Cancel the cluster context
265+
return ctrl.Result{}, fmt.Errorf("failed to engage manager: %w", err)
292266
}
267+
log.Info("Successfully engaged manager")
293268

294-
clusterName := secret.Name
295-
log := p.log.WithValues("cluster", clusterName)
296-
297-
log.Info("Handling deleted secret")
298-
299-
// Remove the cluster
300-
if err := p.removeCluster(clusterName); err != nil {
301-
log.Error(err, "Failed to remove cluster")
302-
}
269+
return ctrl.Result{}, nil
303270
}
304271

305272
// removeCluster removes a cluster by name
306-
func (p *Provider) removeCluster(clusterName string) error {
273+
func (p *Provider) removeCluster(clusterName string) {
307274
log := p.log.WithValues("cluster", clusterName)
308-
log.Info("Removing cluster")
309275

310276
p.lock.Lock()
311277
ac, exists := p.clusters[clusterName]
312278
if !exists {
313279
p.lock.Unlock()
314-
return fmt.Errorf("cluster not found")
280+
return
315281
}
282+
283+
log.Info("Removing cluster")
316284
delete(p.clusters, clusterName)
317285
p.lock.Unlock()
318286

@@ -322,7 +290,6 @@ func (p *Provider) removeCluster(clusterName string) error {
322290
log.Info("Cancelled cluster context")
323291

324292
log.Info("Successfully removed cluster")
325-
return nil
326293
}
327294

328295
// IndexField indexes a field on all clusters, existing and future.

0 commit comments

Comments
 (0)