@@ -386,13 +386,13 @@ var _ = Describe("controller", func() {
386386 })
387387 })
388388
389- Describe ("startEventSources " , func () {
389+ Describe ("startEventSourcesLocked " , func () {
390390 It ("should return nil when no sources are provided" , func () {
391391 ctx , cancel := context .WithCancel (context .Background ())
392392 defer cancel ()
393393
394394 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{}
395- err := ctrl .startEventSources (ctx )
395+ err := ctrl .startEventSourcesLocked (ctx )
396396 Expect (err ).NotTo (HaveOccurred ())
397397 })
398398
@@ -409,7 +409,7 @@ var _ = Describe("controller", func() {
409409 // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
410410 ctrl .CacheSyncTimeout = 5 * time .Second
411411 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{src }
412- err := ctrl .startEventSources (ctx )
412+ err := ctrl .startEventSourcesLocked (ctx )
413413 Expect (err ).To (Equal (expectedErr ))
414414 })
415415
@@ -423,7 +423,7 @@ var _ = Describe("controller", func() {
423423 ctrl .Name = "test-controller"
424424 ctrl .CacheSyncTimeout = 5 * time .Second
425425
426- err := ctrl .startEventSources (ctx )
426+ err := ctrl .startEventSourcesLocked (ctx )
427427 Expect (err ).To (HaveOccurred ())
428428 Expect (err .Error ()).To (ContainSubstring ("failed to wait for test-controller caches to sync" ))
429429 })
@@ -439,7 +439,7 @@ var _ = Describe("controller", func() {
439439 ctrl .Name = "test-controller"
440440 ctrl .CacheSyncTimeout = 5 * time .Second
441441
442- err := ctrl .startEventSources (ctx )
442+ err := ctrl .startEventSourcesLocked (ctx )
443443 Expect (err ).NotTo (HaveOccurred ())
444444 })
445445
@@ -463,7 +463,7 @@ var _ = Describe("controller", func() {
463463 startErrCh := make (chan error )
464464 go func () {
465465 defer GinkgoRecover ()
466- startErrCh <- ctrl .startEventSources (sourceCtx )
466+ startErrCh <- ctrl .startEventSourcesLocked (sourceCtx )
467467 }()
468468
469469 // Allow source to start successfully
@@ -498,7 +498,7 @@ var _ = Describe("controller", func() {
498498
499499 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{blockingSrc }
500500
501- err := ctrl .startEventSources (ctx )
501+ err := ctrl .startEventSourcesLocked (ctx )
502502 Expect (err ).To (HaveOccurred ())
503503 Expect (err .Error ()).To (ContainSubstring ("timed out waiting for source" ))
504504 })
@@ -517,13 +517,13 @@ var _ = Describe("controller", func() {
517517
518518 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{src }
519519
520- By ("Calling startEventSources multiple times in parallel" )
520+ By ("Calling startEventSourcesLocked multiple times in parallel" )
521521 var wg sync.WaitGroup
522522 for i := 1 ; i <= 5 ; i ++ {
523523 wg .Add (1 )
524524 go func () {
525525 defer wg .Done ()
526- err := ctrl .startEventSources (ctx )
526+ err := ctrl .startEventSourcesLocked (ctx )
527527 // All calls should return the same nil error
528528 Expect (err ).NotTo (HaveOccurred ())
529529 }()
@@ -533,12 +533,12 @@ var _ = Describe("controller", func() {
533533 Expect (startCount .Load ()).To (Equal (int32 (1 )), "Source should only be started once even when called multiple times" )
534534 })
535535
536- It ("should block subsequent calls from returning until the first call to startEventSources has returned" , func () {
536+ It ("should block subsequent calls from returning until the first call to startEventSourcesLocked has returned" , func () {
537537 ctx , cancel := context .WithCancel (context .Background ())
538538 defer cancel ()
539539 ctrl .CacheSyncTimeout = 5 * time .Second
540540
541- // finishSourceChan is closed to unblock startEventSources from returning
541+ // finishSourceChan is closed to unblock startEventSourcesLocked from returning
542542 finishSourceChan := make (chan struct {})
543543
544544 src := source .Func (func (ctx context.Context , _ workqueue.TypedRateLimitingInterface [reconcile.Request ]) error {
@@ -547,28 +547,28 @@ var _ = Describe("controller", func() {
547547 })
548548 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{src }
549549
550- By ("Calling startEventSources asynchronously" )
550+ By ("Calling startEventSourcesLocked asynchronously" )
551551 go func () {
552552 defer GinkgoRecover ()
553- Expect (ctrl .startEventSources (ctx )).To (Succeed ())
553+ Expect (ctrl .startEventSourcesLocked (ctx )).To (Succeed ())
554554 }()
555555
556- By ("Calling startEventSources again" )
556+ By ("Calling startEventSourcesLocked again" )
557557 var didSubsequentCallComplete atomic.Bool
558558 go func () {
559559 defer GinkgoRecover ()
560- Expect (ctrl .startEventSources (ctx )).To (Succeed ())
560+ Expect (ctrl .startEventSourcesLocked (ctx )).To (Succeed ())
561561 didSubsequentCallComplete .Store (true )
562562 }()
563563
564- // Assert that second call to startEventSources is blocked while source has not finished
564+ // Assert that second call to startEventSourcesLocked is blocked while source has not finished
565565 Consistently (didSubsequentCallComplete .Load ).Should (BeFalse ())
566566
567567 By ("Finishing source start + sync" )
568568 finishSourceChan <- struct {}{}
569569
570- // Assert that second call to startEventSources is now complete
571- Eventually (didSubsequentCallComplete .Load ).Should (BeTrue (), "startEventSources should complete after source is started and synced" )
570+ // Assert that second call to startEventSourcesLocked is now complete
571+ Eventually (didSubsequentCallComplete .Load ).Should (BeTrue (), "startEventSourcesLocked should complete after source is started and synced" )
572572 })
573573
574574 It ("should reset c.startWatches to nil after returning" , func () {
@@ -583,7 +583,7 @@ var _ = Describe("controller", func() {
583583
584584 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{src }
585585
586- err := ctrl .startEventSources (ctx )
586+ err := ctrl .startEventSourcesLocked (ctx )
587587 Expect (err ).NotTo (HaveOccurred ())
588588 Expect (ctrl .startWatches ).To (BeNil (), "startWatches should be reset to nil after returning" )
589589 })
@@ -1236,6 +1236,31 @@ var _ = Describe("controller", func() {
12361236 Expect (<- runnableExecutionOrderChan ).To (Equal (nonWarmupRunnableName ))
12371237 })
12381238
1239+ It ("should not cause a data race when called concurrently" , func () {
1240+ ctx , cancel := context .WithCancel (context .Background ())
1241+ defer cancel ()
1242+
1243+ ctrl .CacheSyncTimeout = time .Second
1244+
1245+ ctrl .startWatches = []source.TypedSource [reconcile.Request ]{
1246+ source .Func (func (ctx context.Context , _ workqueue.TypedRateLimitingInterface [reconcile.Request ]) error {
1247+ return nil
1248+ }),
1249+ }
1250+
1251+ var wg sync.WaitGroup
1252+ for i := 0 ; i < 5 ; i ++ {
1253+ wg .Add (1 )
1254+ go func () {
1255+ defer GinkgoRecover ()
1256+ defer wg .Done ()
1257+ Expect (ctrl .Warmup (ctx )).To (Succeed ())
1258+ }()
1259+ }
1260+
1261+ wg .Wait ()
1262+ })
1263+
12391264 It ("should not race with Start and only start sources once" , func () {
12401265 ctx , cancel := context .WithCancel (context .Background ())
12411266 defer cancel ()
0 commit comments