Skip to content

Commit b545226

Browse files
Refactor kubeconfig provider to separate logic in controller into smaller responsibilities.
Improved readability in lock handling. Added more tests around kubeconfigs updating. Signed-off-by: Codey Jenkins <[email protected]>
1 parent 0595ad0 commit b545226

File tree

2 files changed

+206
-98
lines changed

2 files changed

+206
-98
lines changed

providers/kubeconfig/provider.go

Lines changed: 149 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ type index struct {
9090
type Provider struct {
9191
opts Options
9292
log logr.Logger
93-
lock sync.RWMutex // protects everything below.
93+
lock sync.RWMutex // protects clusters and indexers
9494
clusters map[string]activeCluster
9595
indexers []index
9696
mgr mcmanager.Manager
@@ -103,16 +103,38 @@ type activeCluster struct {
103103
Hash string // hash of the kubeconfig
104104
}
105105

106-
// Get returns the cluster with the given name, if it is known.
107-
func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
106+
// getCluster retrieves a cluster by name with read lock
107+
func (p *Provider) getCluster(clusterName string) (activeCluster, bool) {
108108
p.lock.RLock()
109109
defer p.lock.RUnlock()
110110

111-
if cl, ok := p.clusters[clusterName]; ok {
112-
return cl.Cluster, nil
113-
}
111+
ac, exists := p.clusters[clusterName]
112+
return ac, exists
113+
}
114+
115+
// setCluster adds a cluster with write lock
116+
func (p *Provider) setCluster(clusterName string, ac activeCluster) {
117+
p.lock.Lock()
118+
defer p.lock.Unlock()
114119

115-
return nil, fmt.Errorf("cluster %s not found", clusterName)
120+
p.clusters[clusterName] = ac
121+
}
122+
123+
// addIndexer adds an indexer with write lock
124+
func (p *Provider) addIndexer(idx index) {
125+
p.lock.Lock()
126+
defer p.lock.Unlock()
127+
128+
p.indexers = append(p.indexers, idx)
129+
}
130+
131+
// Get returns the cluster with the given name, if it is known.
132+
func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
133+
ac, exists := p.getCluster(clusterName)
134+
if !exists {
135+
return nil, fmt.Errorf("cluster %s not found", clusterName)
136+
}
137+
return ac.Cluster, nil
116138
}
117139

118140
// SetupWithManager sets up the provider with the manager.
@@ -149,80 +171,109 @@ func (p *Provider) SetupWithManager(ctx context.Context, mgr mcmanager.Manager)
149171
}
150172

151173
// Reconcile is the main controller function that reconciles secrets containing kubeconfig data
152-
// when
153174
func (p *Provider) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
154-
secret := &corev1.Secret{}
155-
if err := p.mgr.GetLocalManager().GetClient().Get(ctx, req.NamespacedName, secret); err != nil {
156-
// If the secret is not found, remove the cluster and return.
157-
// This is a normal occurence when the secret is deleted.
158-
if apierrors.IsNotFound(err) {
159-
p.removeCluster(req.Name)
160-
return ctrl.Result{}, nil
161-
}
162-
return ctrl.Result{}, fmt.Errorf("failed to get secret: %w", err)
175+
// Handle secret retrieval and basic validation
176+
secret, err := p.getSecret(ctx, req.NamespacedName)
177+
if err != nil {
178+
return ctrl.Result{}, err
179+
}
180+
if secret == nil {
181+
// Secret not found, remove cluster if it exists
182+
p.removeCluster(req.Name)
183+
return ctrl.Result{}, nil
163184
}
164185

165-
// Extract name to use as cluster name
186+
// Extract cluster name and create logger
166187
clusterName := secret.Name
167188
log := p.log.WithValues("cluster", clusterName, "secret", fmt.Sprintf("%s/%s", secret.Namespace, secret.Name))
168189

169-
// If the secret is being deleted, remove the cluster and return.
170-
// Will probably only hit this if there is a finalizer on the secret.
190+
// Handle secret deletion, this is usually only hit if there is a finalizer on the secret.
171191
if secret.DeletionTimestamp != nil {
172192
p.removeCluster(clusterName)
173193
return ctrl.Result{}, nil
174194
}
175195

176-
// Check if this secret has kubeconfig data
177-
kubeconfigData, ok := secret.Data[p.opts.KubeconfigSecretKey]
178-
if !ok {
179-
log.Info("Secret does not contain kubeconfig data", "key", p.opts.KubeconfigSecretKey)
180-
return ctrl.Result{}, nil
196+
// Extract and validate kubeconfig data
197+
kubeconfigData, err := p.extractKubeconfigData(secret, log)
198+
if err != nil {
199+
return ctrl.Result{}, err
200+
}
201+
if kubeconfigData == nil {
202+
return ctrl.Result{}, nil // No kubeconfig data found
181203
}
182204

183-
// Hash the kubeconfig
184-
hash := sha256.New()
185-
hash.Write(kubeconfigData)
186-
hashStr := hex.EncodeToString(hash.Sum(nil))
205+
// Hash the kubeconfig for change detection
206+
hashStr := p.hashKubeconfig(kubeconfigData)
187207

188-
// Check if cluster exists and remove it if it does
189-
p.lock.RLock()
190-
ac, clusterExists := p.clusters[clusterName]
191-
p.lock.RUnlock()
208+
// Check if cluster exists and needs to be updated
209+
existingCluster, clusterExists := p.getCluster(clusterName)
192210
if clusterExists {
193-
if ac.Hash == hashStr {
211+
if existingCluster.Hash == hashStr {
194212
log.Info("Cluster already exists and has the same kubeconfig, skipping")
195213
return ctrl.Result{}, nil
196214
}
197-
215+
// If the cluster exists and the kubeconfig has changed,
216+
// remove it and continue to create a new cluster in its place.
217+
// Creating a new cluster will ensure all new configuration is applied.
198218
log.Info("Cluster already exists, updating it")
199219
p.removeCluster(clusterName)
200220
}
201221

222+
// Create and setup the new cluster
223+
if err := p.createAndEngageCluster(ctx, clusterName, kubeconfigData, hashStr, log); err != nil {
224+
return ctrl.Result{}, err
225+
}
226+
227+
return ctrl.Result{}, nil
228+
}
229+
230+
// getSecret retrieves a secret and handles not found errors
231+
func (p *Provider) getSecret(ctx context.Context, namespacedName client.ObjectKey) (*corev1.Secret, error) {
232+
secret := &corev1.Secret{}
233+
if err := p.mgr.GetLocalManager().GetClient().Get(ctx, namespacedName, secret); err != nil {
234+
if apierrors.IsNotFound(err) {
235+
return nil, nil // Secret not found is not an error
236+
}
237+
return nil, fmt.Errorf("failed to get secret: %w", err)
238+
}
239+
return secret, nil
240+
}
241+
242+
// extractKubeconfigData extracts kubeconfig data from a secret
243+
func (p *Provider) extractKubeconfigData(secret *corev1.Secret, log logr.Logger) ([]byte, error) {
244+
kubeconfigData, ok := secret.Data[p.opts.KubeconfigSecretKey]
245+
if !ok {
246+
log.Info("Secret does not contain kubeconfig data", "key", p.opts.KubeconfigSecretKey)
247+
return nil, nil
248+
}
249+
return kubeconfigData, nil
250+
}
251+
252+
// hashKubeconfig creates a hash of the kubeconfig data
253+
func (p *Provider) hashKubeconfig(kubeconfigData []byte) string {
254+
hash := sha256.New()
255+
hash.Write(kubeconfigData)
256+
return hex.EncodeToString(hash.Sum(nil))
257+
}
258+
259+
// createAndEngageCluster creates a new cluster, sets it up, stores it, and engages it with the manager
260+
func (p *Provider) createAndEngageCluster(ctx context.Context, clusterName string, kubeconfigData []byte, hashStr string, log logr.Logger) error {
202261
// Parse the kubeconfig
203262
restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigData)
204263
if err != nil {
205-
return ctrl.Result{}, fmt.Errorf("failed to parse kubeconfig: %w", err)
264+
return fmt.Errorf("failed to parse kubeconfig: %w", err)
206265
}
207266

208267
// Create a new cluster
209268
log.Info("Creating new cluster from kubeconfig")
210269
cl, err := cluster.New(restConfig)
211270
if err != nil {
212-
return ctrl.Result{}, fmt.Errorf("failed to create cluster: %w", err)
271+
return fmt.Errorf("failed to create cluster: %w", err)
213272
}
214273

215-
// Copy indexers to avoid holding lock.
216-
p.lock.RLock()
217-
indexers := make([]index, len(p.indexers))
218-
copy(indexers, p.indexers)
219-
p.lock.RUnlock()
220-
221-
// Apply any field indexers
222-
for _, idx := range indexers {
223-
if err := cl.GetFieldIndexer().IndexField(ctx, idx.object, idx.field, idx.extractValue); err != nil {
224-
return ctrl.Result{}, fmt.Errorf("failed to index field %q: %w", idx.field, err)
225-
}
274+
// Apply field indexers
275+
if err := p.applyIndexers(ctx, cl); err != nil {
276+
return err
226277
}
227278

228279
// Create a context that will be canceled when this cluster is removed
@@ -238,81 +289,61 @@ func (p *Provider) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result
238289
// Wait for cache to be ready
239290
log.Info("Waiting for cluster cache to be ready")
240291
if !cl.GetCache().WaitForCacheSync(clusterCtx) {
241-
cancel() // Cancel context before returning error
242-
return ctrl.Result{}, fmt.Errorf("failed to wait for cache sync")
292+
cancel()
293+
return fmt.Errorf("failed to wait for cache sync")
243294
}
244295
log.Info("Cluster cache is ready")
245296

246297
// Store the cluster
247-
p.lock.Lock()
248-
p.clusters[clusterName] = activeCluster{
298+
p.setCluster(clusterName, activeCluster{
249299
Cluster: cl,
250300
Context: clusterCtx,
251301
Cancel: cancel,
252302
Hash: hashStr,
253-
}
254-
p.lock.Unlock()
303+
})
255304

256305
log.Info("Successfully added cluster")
257306

258-
// Engage the manager
307+
// Engage cluster so that the manager can start operating on the cluster
259308
if err := p.mgr.Engage(clusterCtx, clusterName, cl); err != nil {
260309
log.Error(err, "Failed to engage manager, removing cluster")
261-
p.lock.Lock()
262-
delete(p.clusters, clusterName)
263-
p.lock.Unlock()
264-
cancel() // Cancel the cluster context
265-
return ctrl.Result{}, fmt.Errorf("failed to engage manager: %w", err)
310+
p.removeCluster(clusterName)
311+
return fmt.Errorf("failed to engage manager: %w", err)
266312
}
267-
log.Info("Successfully engaged manager")
268313

269-
return ctrl.Result{}, nil
314+
log.Info("Successfully engaged manager")
315+
return nil
270316
}
271317

272-
// removeCluster removes a cluster by name
273-
func (p *Provider) removeCluster(clusterName string) {
274-
log := p.log.WithValues("cluster", clusterName)
318+
// applyIndexers applies field indexers to a cluster
319+
func (p *Provider) applyIndexers(ctx context.Context, cl cluster.Cluster) error {
320+
p.lock.RLock()
321+
defer p.lock.RUnlock()
275322

276-
p.lock.Lock()
277-
ac, exists := p.clusters[clusterName]
278-
if !exists {
279-
p.lock.Unlock()
280-
return
323+
for _, idx := range p.indexers {
324+
if err := cl.GetFieldIndexer().IndexField(ctx, idx.object, idx.field, idx.extractValue); err != nil {
325+
return fmt.Errorf("failed to index field %q: %w", idx.field, err)
326+
}
281327
}
282328

283-
log.Info("Removing cluster")
284-
delete(p.clusters, clusterName)
285-
p.lock.Unlock()
286-
287-
// Cancel the context to trigger cleanup for this cluster.
288-
// This is done outside the lock to avoid holding the lock for a long time.
289-
ac.Cancel()
290-
log.Info("Cancelled cluster context")
291-
292-
log.Info("Successfully removed cluster")
329+
return nil
293330
}
294331

295332
// IndexField indexes a field on all clusters, existing and future.
296333
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
297-
p.lock.Lock()
298-
299334
// Save for future clusters
300-
p.indexers = append(p.indexers, index{
335+
p.addIndexer(index{
301336
object: obj,
302337
field: field,
303338
extractValue: extractValue,
304339
})
305340

306-
// Create a copy of the clusters to avoid holding the lock.
307-
clustersSnapshot := make(map[string]cluster.Cluster, len(p.clusters))
308-
for name, ac := range p.clusters {
309-
clustersSnapshot[name] = ac.Cluster
310-
}
311-
p.lock.Unlock()
312-
313341
// Apply to existing clusters
314-
for name, cl := range clustersSnapshot {
315-
if err := cl.GetFieldIndexer().IndexField(ctx, obj, field, extractValue); err != nil {
342+
p.lock.RLock()
343+
defer p.lock.RUnlock()
344+
345+
for name, ac := range p.clusters {
346+
if err := ac.Cluster.GetFieldIndexer().IndexField(ctx, obj, field, extractValue); err != nil {
316347
return fmt.Errorf("failed to index field %q on cluster %q: %w", field, name, err)
317348
}
318349
}
@@ -321,14 +352,35 @@ func (p *Provider) IndexField(ctx context.Context, obj client.Object, field stri
321352
}
322353

323354
// ListClusters returns a list of all discovered clusters.
324-
func (p *Provider) ListClusters() map[string]cluster.Cluster {
355+
func (p *Provider) ListClusters() []string {
325356
p.lock.RLock()
326357
defer p.lock.RUnlock()
327358

328-
// Return a copy of the map to avoid race conditions
329-
result := make(map[string]cluster.Cluster, len(p.clusters))
330-
for k, v := range p.clusters {
331-
result[k] = v.Cluster
359+
result := make([]string, 0, len(p.clusters))
360+
for name := range p.clusters {
361+
result = append(result, name)
332362
}
333363
return result
334364
}
365+
366+
// removeCluster removes a cluster by name with write lock and cleanup
367+
func (p *Provider) removeCluster(clusterName string) {
368+
log := p.log.WithValues("cluster", clusterName)
369+
370+
p.lock.Lock()
371+
ac, exists := p.clusters[clusterName]
372+
if !exists {
373+
p.lock.Unlock()
374+
log.Info("Cluster not found, nothing to remove")
375+
return
376+
}
377+
378+
log.Info("Removing cluster")
379+
delete(p.clusters, clusterName)
380+
p.lock.Unlock()
381+
382+
// Cancel the context to trigger cleanup for this cluster.
383+
// This is done outside the lock to avoid holding the lock for a long time.
384+
ac.Cancel()
385+
log.Info("Successfully removed cluster and cancelled cluster context")
386+
}

0 commit comments

Comments
 (0)