@@ -533,7 +533,7 @@ var _ = Describe("controller", func() {
533533 Expect (startCount .Load ()).To (Equal (int32 (1 )), "Source should only be started once even when called multiple times" )
534534 })
535535
536- It ("should block subsequent calls from returning until the first call to startEventSources has returned" , func () {
536+ It ("should block subsequent calls from returning until the first call to startEventSources has returned" , func () {
537537 ctx , cancel := context .WithCancel (context .Background ())
538538 defer cancel ()
539539 ctrl .CacheSyncTimeout = 5 * time .Second
@@ -1257,6 +1257,39 @@ var _ = Describe("controller", func() {
12571257 close (ctrlWatchBlockingChan )
12581258 Eventually (hasNonWarmupCtrlWatchStarted .Load ).Should (BeTrue ())
12591259 })
1260+
1261+ It ("should not race with Start and only start sources once" , func () {
1262+ ctx , cancel := context .WithCancel (context .Background ())
1263+ defer cancel ()
1264+
1265+ ctrl .CacheSyncTimeout = time .Second
1266+
1267+ var watchStartedCount atomic.Int32
1268+ ctrl .startWatches = []source.TypedSource [reconcile.Request ]{
1269+ source .Func (func (ctx context.Context , _ workqueue.TypedRateLimitingInterface [reconcile.Request ]) error {
1270+ watchStartedCount .Add (1 )
1271+ return nil
1272+ }),
1273+ }
1274+
1275+ By ("calling Warmup and Start concurrently" )
1276+ go func () {
1277+ defer GinkgoRecover ()
1278+ Expect (ctrl .Start (ctx )).To (Succeed ())
1279+ }()
1280+
1281+ blockOnWarmupChan := make (chan struct {})
1282+ go func () {
1283+ defer GinkgoRecover ()
1284+ Expect (ctrl .Warmup (ctx )).To (Succeed ())
1285+ close (blockOnWarmupChan )
1286+ }()
1287+
1288+ <- blockOnWarmupChan
1289+
1290+ Expect (watchStartedCount .Load ()).To (Equal (int32 (1 )), "source should only be started once" )
1291+ Expect (ctrl .startWatches ).To (BeNil (), "startWatches should be reset to nil after they are started" )
1292+ })
12601293 })
12611294
12621295 Describe ("Warmup with warmup disabled" , func () {
0 commit comments