@@ -46,6 +46,10 @@ import (
4646 "sigs.k8s.io/controller-runtime/pkg/source"
4747)
4848
49+ type TestRequest struct {
50+ Key string
51+ }
52+
4953var _ = Describe ("controller" , func () {
5054 var fakeReconcile * fakeReconciler
5155 var ctrl * Controller [reconcile.Request ]
@@ -340,6 +344,41 @@ var _ = Describe("controller", func() {
340344 Expect (err .Error ()).To (Equal ("controller was started more than once. This is likely to be caused by being added to a manager multiple times" ))
341345 })
342346
347+ It ("should check for correct TypedSyncingSource if custom types are used" , func () {
348+ queue := & controllertest.TypedQueue [TestRequest ]{
349+ TypedInterface : workqueue .NewTyped [TestRequest ](),
350+ }
351+ ctrl := & Controller [TestRequest ]{
352+ NewQueue : func (string , workqueue.TypedRateLimiter [TestRequest ]) workqueue.TypedRateLimitingInterface [TestRequest ] {
353+ return queue
354+ },
355+ LogConstructor : func (* TestRequest ) logr.Logger {
356+ return log .RuntimeLog .WithName ("controller" ).WithName ("test" )
357+ },
358+ }
359+ ctrl .CacheSyncTimeout = time .Second
360+ src := & bisignallingSource [TestRequest ]{
361+ startCall : make (chan workqueue.TypedRateLimitingInterface [TestRequest ]),
362+ startDone : make (chan error , 1 ),
363+ waitCall : make (chan struct {}),
364+ waitDone : make (chan error , 1 ),
365+ }
366+ ctrl .startWatches = []source.TypedSource [TestRequest ]{src }
367+ ctrl .Name = "foo"
368+ ctx , cancel := context .WithCancel (context .Background ())
369+ defer cancel ()
370+ startCh := make (chan error )
371+ go func () {
372+ defer GinkgoRecover ()
373+ startCh <- ctrl .Start (ctx )
374+ }()
375+ Eventually (src .startCall ).Should (Receive (Equal (queue )))
376+ src .startDone <- nil
377+ Eventually (src .waitCall ).Should (BeClosed ())
378+ src .waitDone <- nil
379+ cancel ()
380+ Eventually (startCh ).Should (Receive (Succeed ()))
381+ })
343382 })
344383
345384 Describe ("Processing queue items from a Controller" , func () {
@@ -901,3 +940,40 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte
901940 <- ctx .Done ()
902941 return nil , errors .New ("GetInformer timed out" )
903942}
943+
944+ type bisignallingSource [T comparable ] struct {
945+ // receives the queue that is passed to Start
946+ startCall chan workqueue.TypedRateLimitingInterface [T ]
947+ // passes an error to return from Start
948+ startDone chan error
949+ // closed when WaitForSync is called
950+ waitCall chan struct {}
951+ // passes an error to return from WaitForSync
952+ waitDone chan error
953+ }
954+
955+ var _ source.TypedSyncingSource [int ] = (* bisignallingSource [int ])(nil )
956+
957+ func (t * bisignallingSource [T ]) Start (ctx context.Context , q workqueue.TypedRateLimitingInterface [T ]) error {
958+ select {
959+ case t .startCall <- q :
960+ case <- ctx .Done ():
961+ return ctx .Err ()
962+ }
963+ select {
964+ case err := <- t .startDone :
965+ return err
966+ case <- ctx .Done ():
967+ return ctx .Err ()
968+ }
969+ }
970+
971+ func (t * bisignallingSource [T ]) WaitForSync (ctx context.Context ) error {
972+ close (t .waitCall )
973+ select {
974+ case err := <- t .waitDone :
975+ return err
976+ case <- ctx .Done ():
977+ return ctx .Err ()
978+ }
979+ }
0 commit comments