Skip to content

Commit f40b8f6

Browse files
fix: guard against some race conditions and add a basic race test
1 parent 389fe13 commit f40b8f6

File tree

2 files changed

+98
-18
lines changed

2 files changed

+98
-18
lines changed

providers/kubeconfig/provider.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -231,15 +231,18 @@ func (p *Provider) handleSecret(ctx context.Context, secret *corev1.Secret, mgr
231231
return fmt.Errorf("failed to create cluster: %w", err)
232232
}
233233

234-
// Apply any field indexers
234+
// Copy indexers to avoid holding lock.
235235
p.lock.RLock()
236-
for _, idx := range p.indexers {
236+
indexers := make([]index, len(p.indexers))
237+
copy(indexers, p.indexers)
238+
p.lock.RUnlock()
239+
240+
// Apply any field indexers
241+
for _, idx := range indexers {
237242
if err := cl.GetFieldIndexer().IndexField(ctx, idx.object, idx.field, idx.extractValue); err != nil {
238-
p.lock.RUnlock()
239243
return fmt.Errorf("failed to index field %q: %w", idx.field, err)
240244
}
241245
}
242-
p.lock.RUnlock()
243246

244247
// Create a context that will be canceled when this cluster is removed
245248
clusterCtx, cancel := context.WithCancel(ctx)
@@ -309,32 +312,27 @@ func (p *Provider) removeCluster(clusterName string) error {
309312
log := p.log.WithValues("cluster", clusterName)
310313
log.Info("Removing cluster")
311314

312-
// Find the cluster and cancel function
313-
p.lock.RLock()
315+
p.lock.Lock()
314316
ac, exists := p.clusters[clusterName]
315317
if !exists {
316-
p.lock.RUnlock()
317-
return fmt.Errorf("cluster %s not found", clusterName)
318+
p.lock.Unlock()
319+
return fmt.Errorf("cluster not found")
318320
}
319-
p.lock.RUnlock()
321+
delete(p.clusters, clusterName)
322+
p.lock.Unlock()
320323

321-
// Cancel the context to trigger cleanup for this cluster
324+
// Cancel the context to trigger cleanup for this cluster.
325+
// This is done outside the lock to avoid holding the lock for a long time.
322326
ac.Cancel()
323327
log.Info("Cancelled cluster context")
324328

325-
// Clean up our maps
326-
p.lock.Lock()
327-
delete(p.clusters, clusterName)
328-
p.lock.Unlock()
329-
330329
log.Info("Successfully removed cluster")
331330
return nil
332331
}
333332

334333
// IndexField indexes a field on all clusters, existing and future.
335334
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
336335
p.lock.Lock()
337-
defer p.lock.Unlock()
338336

339337
// Save for future clusters
340338
p.indexers = append(p.indexers, index{
@@ -343,9 +341,16 @@ func (p *Provider) IndexField(ctx context.Context, obj client.Object, field stri
343341
extractValue: extractValue,
344342
})
345343

346-
// Apply to existing clusters
344+
// Create a copy of the clusters to avoid holding the lock.
345+
clustersSnapshot := make(map[string]cluster.Cluster, len(p.clusters))
347346
for name, ac := range p.clusters {
348-
if err := ac.Cluster.GetFieldIndexer().IndexField(ctx, obj, field, extractValue); err != nil {
347+
clustersSnapshot[name] = ac.Cluster
348+
}
349+
p.lock.Unlock()
350+
351+
// Apply to existing clusters
352+
for name, cl := range clustersSnapshot {
353+
if err := cl.GetFieldIndexer().IndexField(ctx, obj, field, extractValue); err != nil {
349354
return fmt.Errorf("failed to index field %q on cluster %q: %w", field, name, err)
350355
}
351356
}

providers/kubeconfig/provider_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"errors"
2222
"fmt"
2323
"strconv"
24+
"sync"
25+
"time"
2426

2527
"golang.org/x/sync/errgroup"
2628

@@ -35,6 +37,7 @@ import (
3537

3638
ctrl "sigs.k8s.io/controller-runtime"
3739
"sigs.k8s.io/controller-runtime/pkg/client"
40+
"sigs.k8s.io/controller-runtime/pkg/cluster"
3841
"sigs.k8s.io/controller-runtime/pkg/log"
3942
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4043

@@ -317,3 +320,75 @@ func createKubeconfigSecret(ctx context.Context, name string, cfg *rest.Config,
317320
}
318321
return cl.Create(ctx, secret)
319322
}
323+
324+
// mockCluster is a mock implementation of cluster.Cluster for testing.
325+
type mockCluster struct {
326+
cluster.Cluster
327+
}
328+
329+
func (c *mockCluster) GetFieldIndexer() client.FieldIndexer {
330+
return &mockFieldIndexer{}
331+
}
332+
333+
type mockFieldIndexer struct{}
334+
335+
func (f *mockFieldIndexer) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
336+
// Simulate work to increase chance of race
337+
time.Sleep(time.Millisecond)
338+
return nil
339+
}
340+
341+
var _ = Describe("Provider race condition", func() {
342+
It("should handle concurrent operations without issues", func() {
343+
p := New(Options{})
344+
345+
// Pre-populate with some clusters to make the test meaningful
346+
numClusters := 20
347+
for i := 0; i < numClusters; i++ {
348+
clusterName := fmt.Sprintf("cluster-%d", i)
349+
p.clusters[clusterName] = activeCluster{
350+
Cluster: &mockCluster{},
351+
Cancel: func() {},
352+
}
353+
}
354+
355+
var wg sync.WaitGroup
356+
numGoroutines := 40
357+
wg.Add(numGoroutines)
358+
359+
for i := 0; i < numGoroutines; i++ {
360+
go func(i int) {
361+
defer GinkgoRecover()
362+
defer wg.Done()
363+
364+
// Mix of operations to stress the provider
365+
switch i % 4 {
366+
case 0:
367+
// Concurrently index a field. This will read the cluster list.
368+
err := p.IndexField(context.Background(), &corev1.Pod{}, "spec.nodeName", func(rawObj client.Object) []string {
369+
return nil
370+
})
371+
Expect(err).NotTo(HaveOccurred())
372+
case 1:
373+
// Concurrently get a cluster.
374+
_, err := p.Get(context.Background(), "cluster-1")
375+
Expect(err).NotTo(HaveOccurred())
376+
case 2:
377+
// Concurrently list clusters.
378+
p.ListClusters()
379+
case 3:
380+
// Concurrently delete a cluster. This will modify the cluster map.
381+
clusterToRemove := fmt.Sprintf("cluster-%d", i/4)
382+
secret := &corev1.Secret{
383+
ObjectMeta: metav1.ObjectMeta{
384+
Name: clusterToRemove,
385+
},
386+
}
387+
p.handleSecretDelete(secret)
388+
}
389+
}(i)
390+
}
391+
392+
wg.Wait()
393+
})
394+
})

0 commit comments

Comments
 (0)