@@ -145,6 +145,7 @@ var _ = Describe("controller", func() {
145145
146146 Describe ("Start" , func () {
147147 It ("should return an error if there is an error waiting for the informers" , func () {
148+ ctrl .CacheSyncTimeout = time .Second
148149 f := false
149150 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{
150151 source .Kind (& informertest.FakeInformers {Synced : & f }, & corev1.Pod {}, & handler.TypedEnqueueRequestForObject [* corev1.Pod ]{}),
@@ -158,12 +159,11 @@ var _ = Describe("controller", func() {
158159 })
159160
160161 It ("should error when cache sync timeout occurs" , func () {
161- ctrl .CacheSyncTimeout = 10 * time .Nanosecond
162-
163162 c , err := cache .New (cfg , cache.Options {})
164163 Expect (err ).NotTo (HaveOccurred ())
165164 c = & cacheWithIndefinitelyBlockingGetInformer {c }
166165
166+ ctrl .CacheSyncTimeout = time .Second
167167 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{
168168 source .Kind (c , & appsv1.Deployment {}, & handler.TypedEnqueueRequestForObject [* appsv1.Deployment ]{}),
169169 }
@@ -174,7 +174,7 @@ var _ = Describe("controller", func() {
174174 Expect (err .Error ()).To (ContainSubstring ("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced" ))
175175 })
176176
177- It ("should not error when context cancelled" , func () {
177+ It ("should not error when controller Start context is cancelled during Sources WaitForSync " , func () {
178178 ctrl .CacheSyncTimeout = 1 * time .Second
179179
180180 sourceSynced := make (chan struct {})
@@ -200,27 +200,40 @@ var _ = Describe("controller", func() {
200200 <- sourceSynced
201201 })
202202
203- It ("should not error when cache sync timeout is of sufficiently high" , func () {
204- ctrl .CacheSyncTimeout = 1 * time .Second
203+ It ("should error when Start() is blocking forever" , func () {
204+ ctrl .CacheSyncTimeout = 0
205+
206+ controllerDone := make (chan struct {})
207+ ctrl .startWatches = []source.TypedSource [reconcile.Request ]{
208+ source .Func (func (ctx context.Context , _ workqueue.TypedRateLimitingInterface [reconcile.Request ]) error {
209+ <- controllerDone
210+ return ctx .Err ()
211+ })}
212+
213+ ctx , cancel := context .WithTimeout (context .TODO (), 10 * time .Second )
214+ defer cancel ()
205215
216+ err := ctrl .Start (ctx )
217+ Expect (err ).To (HaveOccurred ())
218+ Expect (err .Error ()).To (ContainSubstring ("Please ensure that its Start() method is non-blocking" ))
219+
220+ close (controllerDone )
221+ })
222+
223+ It ("should not error when cache sync timeout is of sufficiently high" , func () {
224+ ctrl .CacheSyncTimeout = 10 * time .Second
206225 ctx , cancel := context .WithCancel (context .Background ())
207226 defer cancel ()
208227
209228 sourceSynced := make (chan struct {})
210- c , err := cache .New (cfg , cache.Options {})
211- Expect (err ).NotTo (HaveOccurred ())
229+ c := & informertest.FakeInformers {}
212230 ctrl .startWatches = []source.TypedSource [reconcile.Request ]{
213231 & singnallingSourceWrapper {
214232 SyncingSource : source .Kind [client.Object ](c , & appsv1.Deployment {}, & handler.EnqueueRequestForObject {}),
215233 cacheSyncDone : sourceSynced ,
216234 },
217235 }
218236
219- go func () {
220- defer GinkgoRecover ()
221- Expect (c .Start (ctx )).To (Succeed ())
222- }()
223-
224237 go func () {
225238 defer GinkgoRecover ()
226239 Expect (ctrl .Start (ctx )).To (Succeed ())
@@ -230,6 +243,7 @@ var _ = Describe("controller", func() {
230243 })
231244
232245 It ("should process events from source.Channel" , func () {
246+ ctrl .CacheSyncTimeout = 10 * time .Second
233247 // channel to be closed when event is processed
234248 processed := make (chan struct {})
235249 // source channel
@@ -269,6 +283,7 @@ var _ = Describe("controller", func() {
269283 })
270284
271285 It ("should error when channel source is not specified" , func () {
286+ ctrl .CacheSyncTimeout = 10 * time .Second
272287 ctx , cancel := context .WithCancel (context .Background ())
273288 defer cancel ()
274289
@@ -281,24 +296,26 @@ var _ = Describe("controller", func() {
281296 })
282297
283298 It ("should call Start on sources with the appropriate EventHandler, Queue, and Predicates" , func () {
299+ ctrl .CacheSyncTimeout = 10 * time .Second
284300 started := false
301+ ctx , cancel := context .WithCancel (context .Background ())
285302 src := source .Func (func (ctx context.Context , q workqueue.TypedRateLimitingInterface [reconcile.Request ]) error {
286303 defer GinkgoRecover ()
287304 Expect (q ).To (Equal (ctrl .Queue ))
288305
289306 started = true
307+ cancel ()
290308 return nil
291309 })
292310 Expect (ctrl .Watch (src )).NotTo (HaveOccurred ())
293311
294- // Use a cancelled context so Start doesn't block
295- ctx , cancel := context .WithCancel (context .Background ())
296- cancel ()
297- Expect (ctrl .Start (ctx )).To (Succeed ())
312+ err := ctrl .Start (ctx )
313+ Expect (err ).To (Succeed ())
298314 Expect (started ).To (BeTrue ())
299315 })
300316
301317 It ("should return an error if there is an error starting sources" , func () {
318+ ctrl .CacheSyncTimeout = 10 * time .Second
302319 err := fmt .Errorf ("Expected Error: could not start source" )
303320 src := source .Func (func (context.Context ,
304321 workqueue.TypedRateLimitingInterface [reconcile.Request ],
@@ -852,6 +869,15 @@ type singnallingSourceWrapper struct {
852869 source.SyncingSource
853870}
854871
872+ func (s * singnallingSourceWrapper ) Start (ctx context.Context , q workqueue.TypedRateLimitingInterface [reconcile.Request ]) error {
873+ err := s .SyncingSource .Start (ctx , q )
874+ if err != nil {
875+ // WaitForSync will never be called if this errors, so close the channel to prevent deadlocks in tests
876+ close (s .cacheSyncDone )
877+ }
878+ return err
879+ }
880+
855881func (s * singnallingSourceWrapper ) WaitForSync (ctx context.Context ) error {
856882 defer func () {
857883 close (s .cacheSyncDone )
0 commit comments