Skip to content

Commit 12e938c

Browse files
committed
Initialize queue in warmup with test.
1 parent bca3e2a commit 12e938c

File tree

2 files changed

+68
-39
lines changed

2 files changed

+68
-39
lines changed

pkg/internal/controller/controller.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,10 @@ type Controller[request comparable] struct {
128128
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
129129
startWatches []source.TypedSource[request]
130130

131-
// startedEventSources is used to track if the event sources have been started.
131+
// startedEventSourcesAndQueue is used to track if the event sources have been started.
132132
// It ensures that we append sources to c.startWatches only until we call Start() / Warmup()
133-
// It is true if startEventSourcesLocked has been called at least once.
134-
startedEventSources bool
133+
// It is true if startEventSourcesAndQueueLocked has been called at least once.
134+
startedEventSourcesAndQueue bool
135135

136136
// didStartEventSourcesOnce is used to ensure that the event sources are only started once.
137137
didStartEventSourcesOnce sync.Once
@@ -208,7 +208,7 @@ func (c *Controller[request]) Watch(src source.TypedSource[request]) error {
208208

209209
// Sources weren't started yet, store the watches locally and return.
210210
// These sources are going to be held until either Warmup() or Start(...) is called.
211-
if !c.startedEventSources {
211+
if !c.startedEventSourcesAndQueue {
212212
c.startWatches = append(c.startWatches, src)
213213
return nil
214214
}
@@ -237,7 +237,7 @@ func (c *Controller[request]) Warmup(ctx context.Context) error {
237237
// Set the ctx so later calls to watch use this internal context
238238
c.ctx = ctx
239239

240-
return c.startEventSourcesLocked(ctx)
240+
return c.startEventSourcesAndQueueLocked(ctx)
241241
}
242242

243243
// Start implements controller.Controller.
@@ -254,17 +254,6 @@ func (c *Controller[request]) Start(ctx context.Context) error {
254254
// Set the internal context.
255255
c.ctx = ctx
256256

257-
queue := c.NewQueue(c.Name, c.RateLimiter)
258-
if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue {
259-
c.Queue = priorityQueue
260-
} else {
261-
c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
262-
}
263-
go func() {
264-
<-ctx.Done()
265-
c.Queue.ShutDown()
266-
}()
267-
268257
wg := &sync.WaitGroup{}
269258
err := func() error {
270259
defer c.mu.Unlock()
@@ -275,7 +264,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
275264
// NB(directxman12): launch the sources *before* trying to wait for the
276265
// caches to sync so that they have a chance to register their intended
277266
// caches.
278-
if err := c.startEventSourcesLocked(ctx); err != nil {
267+
if err := c.startEventSourcesAndQueueLocked(ctx); err != nil {
279268
return err
280269
}
281270

@@ -308,12 +297,23 @@ func (c *Controller[request]) Start(ctx context.Context) error {
308297
return nil
309298
}
310299

311-
// startEventSourcesLocked launches all the sources registered with this controller and waits
300+
// startEventSourcesAndQueueLocked launches all the sources registered with this controller and waits
312301
// for them to sync. It returns an error if any of the sources fail to start or sync.
313-
func (c *Controller[request]) startEventSourcesLocked(ctx context.Context) error {
302+
func (c *Controller[request]) startEventSourcesAndQueueLocked(ctx context.Context) error {
314303
var retErr error
315304

316305
c.didStartEventSourcesOnce.Do(func() {
306+
queue := c.NewQueue(c.Name, c.RateLimiter)
307+
if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue {
308+
c.Queue = priorityQueue
309+
} else {
310+
c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
311+
}
312+
go func() {
313+
<-ctx.Done()
314+
c.Queue.ShutDown()
315+
}()
316+
317317
errGroup := &errgroup.Group{}
318318
for _, watch := range c.startWatches {
319319
log := c.LogConstructor(nil)
@@ -381,7 +381,7 @@ func (c *Controller[request]) startEventSourcesLocked(ctx context.Context) error
381381

382382
// Mark event sources as started after resetting the startWatches slice to no-op a Watch()
383383
// call after event sources have been started.
384-
c.startedEventSources = true
384+
c.startedEventSourcesAndQueue = true
385385
})
386386

387387
return retErr

pkg/internal/controller/controller_test.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -387,16 +387,26 @@ var _ = Describe("controller", func() {
387387
})
388388
})
389389

390-
Describe("startEventSourcesLocked", func() {
390+
Describe("startEventSourcesAndQueueLocked", func() {
391391
It("should return nil when no sources are provided", func() {
392392
ctx, cancel := context.WithCancel(context.Background())
393393
defer cancel()
394394

395395
ctrl.startWatches = []source.TypedSource[reconcile.Request]{}
396-
err := ctrl.startEventSourcesLocked(ctx)
396+
err := ctrl.startEventSourcesAndQueueLocked(ctx)
397397
Expect(err).NotTo(HaveOccurred())
398398
})
399399

400+
It("should initialize controller queue when called", func() {
401+
ctx, cancel := context.WithCancel(context.Background())
402+
defer cancel()
403+
404+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{}
405+
err := ctrl.startEventSourcesAndQueueLocked(ctx)
406+
Expect(err).NotTo(HaveOccurred())
407+
Expect(ctrl.Queue).NotTo(BeNil())
408+
})
409+
400410
It("should return an error if a source fails to start", func() {
401411
ctx, cancel := context.WithCancel(context.Background())
402412
defer cancel()
@@ -410,7 +420,7 @@ var _ = Describe("controller", func() {
410420
// Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
411421
ctrl.CacheSyncTimeout = 5 * time.Second
412422
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
413-
err := ctrl.startEventSourcesLocked(ctx)
423+
err := ctrl.startEventSourcesAndQueueLocked(ctx)
414424
Expect(err).To(Equal(expectedErr))
415425
})
416426

@@ -424,7 +434,7 @@ var _ = Describe("controller", func() {
424434
ctrl.Name = "test-controller"
425435
ctrl.CacheSyncTimeout = 5 * time.Second
426436

427-
err := ctrl.startEventSourcesLocked(ctx)
437+
err := ctrl.startEventSourcesAndQueueLocked(ctx)
428438
Expect(err).To(HaveOccurred())
429439
Expect(err.Error()).To(ContainSubstring("failed to wait for test-controller caches to sync"))
430440
})
@@ -440,7 +450,7 @@ var _ = Describe("controller", func() {
440450
ctrl.Name = "test-controller"
441451
ctrl.CacheSyncTimeout = 5 * time.Second
442452

443-
err := ctrl.startEventSourcesLocked(ctx)
453+
err := ctrl.startEventSourcesAndQueueLocked(ctx)
444454
Expect(err).NotTo(HaveOccurred())
445455
})
446456

@@ -464,7 +474,7 @@ var _ = Describe("controller", func() {
464474
startErrCh := make(chan error)
465475
go func() {
466476
defer GinkgoRecover()
467-
startErrCh <- ctrl.startEventSourcesLocked(sourceCtx)
477+
startErrCh <- ctrl.startEventSourcesAndQueueLocked(sourceCtx)
468478
}()
469479

470480
// Allow source to start successfully
@@ -499,7 +509,7 @@ var _ = Describe("controller", func() {
499509

500510
ctrl.startWatches = []source.TypedSource[reconcile.Request]{blockingSrc}
501511

502-
err := ctrl.startEventSourcesLocked(ctx)
512+
err := ctrl.startEventSourcesAndQueueLocked(ctx)
503513
Expect(err).To(HaveOccurred())
504514
Expect(err.Error()).To(ContainSubstring("timed out waiting for source"))
505515
})
@@ -518,13 +528,13 @@ var _ = Describe("controller", func() {
518528

519529
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
520530

521-
By("Calling startEventSourcesLocked multiple times in parallel")
531+
By("Calling startEventSourcesAndQueueLocked multiple times in parallel")
522532
var wg sync.WaitGroup
523533
for i := 1; i <= 5; i++ {
524534
wg.Add(1)
525535
go func() {
526536
defer wg.Done()
527-
err := ctrl.startEventSourcesLocked(ctx)
537+
err := ctrl.startEventSourcesAndQueueLocked(ctx)
528538
// All calls should return the same nil error
529539
Expect(err).NotTo(HaveOccurred())
530540
}()
@@ -534,12 +544,12 @@ var _ = Describe("controller", func() {
534544
Expect(startCount.Load()).To(Equal(int32(1)), "Source should only be started once even when called multiple times")
535545
})
536546

537-
It("should block subsequent calls from returning until the first call to startEventSourcesLocked has returned", func() {
547+
It("should block subsequent calls from returning until the first call to startEventSourcesAndQueueLocked has returned", func() {
538548
ctx, cancel := context.WithCancel(context.Background())
539549
defer cancel()
540550
ctrl.CacheSyncTimeout = 5 * time.Second
541551

542-
// finishSourceChan is closed to unblock startEventSourcesLocked from returning
552+
// finishSourceChan is closed to unblock startEventSourcesAndQueueLocked from returning
543553
finishSourceChan := make(chan struct{})
544554

545555
src := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
@@ -548,35 +558,35 @@ var _ = Describe("controller", func() {
548558
})
549559
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
550560

551-
By("Calling startEventSourcesLocked asynchronously")
561+
By("Calling startEventSourcesAndQueueLocked asynchronously")
552562
wg := sync.WaitGroup{}
553563
go func() {
554564
defer GinkgoRecover()
555565
defer wg.Done()
556566

557567
wg.Add(1)
558-
Expect(ctrl.startEventSourcesLocked(ctx)).To(Succeed())
568+
Expect(ctrl.startEventSourcesAndQueueLocked(ctx)).To(Succeed())
559569
}()
560570

561-
By("Calling startEventSourcesLocked again")
571+
By("Calling startEventSourcesAndQueueLocked again")
562572
var didSubsequentCallComplete atomic.Bool
563573
go func() {
564574
defer GinkgoRecover()
565575
defer wg.Done()
566576

567577
wg.Add(1)
568-
Expect(ctrl.startEventSourcesLocked(ctx)).To(Succeed())
578+
Expect(ctrl.startEventSourcesAndQueueLocked(ctx)).To(Succeed())
569579
didSubsequentCallComplete.Store(true)
570580
}()
571581

572-
// Assert that second call to startEventSourcesLocked is blocked while source has not finished
582+
// Assert that second call to startEventSourcesAndQueueLocked is blocked while source has not finished
573583
Consistently(didSubsequentCallComplete.Load).Should(BeFalse())
574584

575585
By("Finishing source start + sync")
576586
finishSourceChan <- struct{}{}
577587

578-
// Assert that second call to startEventSourcesLocked is now complete
579-
Eventually(didSubsequentCallComplete.Load).Should(BeTrue(), "startEventSourcesLocked should complete after source is started and synced")
588+
// Assert that second call to startEventSourcesAndQueueLocked is now complete
589+
Eventually(didSubsequentCallComplete.Load).Should(BeTrue(), "startEventSourcesAndQueueLocked should complete after source is started and synced")
580590
wg.Wait()
581591
})
582592

@@ -592,7 +602,7 @@ var _ = Describe("controller", func() {
592602

593603
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
594604

595-
err := ctrl.startEventSourcesLocked(ctx)
605+
err := ctrl.startEventSourcesAndQueueLocked(ctx)
596606
Expect(err).NotTo(HaveOccurred())
597607
Expect(ctrl.startWatches).To(BeNil(), "startWatches should be reset to nil after returning")
598608
})
@@ -1150,6 +1160,25 @@ var _ = Describe("controller", func() {
11501160
Expect(err.Error()).To(ContainSubstring("sync error"))
11511161
})
11521162

1163+
It("should call Start on sources with the appropriate non-nil queue", func() {
1164+
ctrl.CacheSyncTimeout = 10 * time.Second
1165+
started := false
1166+
ctx, cancel := context.WithCancel(context.Background())
1167+
src := source.Func(func(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1168+
defer GinkgoRecover()
1169+
Expect(q).ToNot(BeNil())
1170+
Expect(q).To(Equal(ctrl.Queue))
1171+
1172+
started = true
1173+
cancel() // Cancel the context so ctrl.Start() doesn't block forever
1174+
return nil
1175+
})
1176+
Expect(ctrl.Watch(src)).To(Succeed())
1177+
Expect(ctrl.Warmup(ctx)).To(Succeed())
1178+
Expect(ctrl.Queue).ToNot(BeNil())
1179+
Expect(started).To(BeTrue())
1180+
})
1181+
11531182
It("should return true if context is cancelled while waiting for source to start", func() {
11541183
// Setup controller with sources that complete with error
11551184
ctx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)