diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index 2c5a162ddf..4b2bb7ce7d 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -44,6 +44,7 @@ require ( golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.26.0 // indirect golang.org/x/term v0.25.0 // indirect golang.org/x/text v0.19.0 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 465f760db5..7eab7cbfe0 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -129,6 +129,8 @@ golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/go.mod b/go.mod index e78b00152b..9069e67a41 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,8 @@ require ( sigs.k8s.io/yaml v1.4.0 ) +require golang.org/x/sync v0.8.0 + require ( github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect @@ -79,7 +81,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sync v0.8.0 // indirect golang.org/x/term v0.25.0 // indirect golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.7.0 // indirect diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index dfe407f3b8..9d0ed67495 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -21,9 +21,11 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" @@ -171,41 +173,55 @@ func (c *Controller[request]) Start(ctx context.Context) error { // NB(directxman12): launch the sources *before* trying to wait for the // caches to sync so that they have a chance to register their intendeded // caches. + errGroup, _ := errgroup.WithContext(ctx) for _, watch := range c.startWatches { - c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch)) - - if err := watch.Start(ctx, c.Queue); err != nil { - return err - } - } - - // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches - c.LogConstructor(nil).Info("Starting Controller") - - for _, watch := range c.startWatches { - syncingSource, ok := watch.(source.SyncingSource) - if !ok { - continue - } - - if err := func() error { + log := c.LogConstructor(nil).WithValues("source", fmt.Sprintf("%s", watch)) + didStartSyncingSource := &atomic.Bool{} + errGroup.Go(func() error { // use a context with timeout for launching sources and syncing caches. sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) defer cancel() - // WaitForSync waits for a definitive timeout, and returns if there - // is an error or a timeout - if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { - err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) - c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync") + sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out + go func() { + defer close(sourceStartErrChan) + log.Info("Starting EventSource") + if err := watch.Start(ctx, c.Queue); err != nil { + sourceStartErrChan <- err + return + } + syncingSource, ok := watch.(source.SyncingSource) + if !ok { + return + } + didStartSyncingSource.Store(true) + if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { + err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) + log.Error(err, "Could not wait for Cache to sync") + sourceStartErrChan <- err + } + }() + + select { + case err := <-sourceStartErrChan: return err + case <-sourceStartCtx.Done(): + if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened + return <-sourceStartErrChan + } + if ctx.Err() != nil { // Don't return an error if the root context got cancelled + return nil + } + return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) } - - return nil - }(); err != nil { - return err - } + }) } + if err := errGroup.Wait(); err != nil { + return err + } + + // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches + c.LogConstructor(nil).Info("Starting Controller") // All the watches have been started, we can reset the local slice. // diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 638d21810e..9e48424b2a 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -145,6 +145,7 @@ var _ = Describe("controller", func() { Describe("Start", func() { It("should return an error if there is an error waiting for the informers", func() { + ctrl.CacheSyncTimeout = time.Second f := false ctrl.startWatches = []source.TypedSource[reconcile.Request]{ source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}), @@ -158,12 +159,11 @@ var _ = Describe("controller", func() { }) It("should error when cache sync timeout occurs", func() { - ctrl.CacheSyncTimeout = 10 * time.Nanosecond - c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) c = &cacheWithIndefinitelyBlockingGetInformer{c} + ctrl.CacheSyncTimeout = time.Second ctrl.startWatches = []source.TypedSource[reconcile.Request]{ source.Kind(c, &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}), } @@ -174,7 +174,7 @@ var _ = Describe("controller", func() { Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced")) }) - It("should not error when context cancelled", func() { + It("should not error when controller Start context is cancelled during Sources WaitForSync", func() { ctrl.CacheSyncTimeout = 1 * time.Second sourceSynced := make(chan struct{}) @@ -200,15 +200,33 @@ var _ = Describe("controller", func() { <-sourceSynced }) - It("should not error when cache sync timeout is of sufficiently high", func() { - ctrl.CacheSyncTimeout = 1 * time.Second + It("should error when Start() is blocking forever", func() { + ctrl.CacheSyncTimeout = 0 + + controllerDone := make(chan struct{}) + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + <-controllerDone + return ctx.Err() + })} + + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() + err := ctrl.Start(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("Please ensure that its Start() method is non-blocking")) + + close(controllerDone) + }) + + It("should not error when cache sync timeout is of sufficiently high", func() { + ctrl.CacheSyncTimeout = 10 * time.Second ctx, cancel := context.WithCancel(context.Background()) defer cancel() sourceSynced := make(chan struct{}) - c, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) + c := &informertest.FakeInformers{} ctrl.startWatches = []source.TypedSource[reconcile.Request]{ &singnallingSourceWrapper{ SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}), @@ -216,11 +234,6 @@ var _ = Describe("controller", func() { }, } - go func() { - defer GinkgoRecover() - Expect(c.Start(ctx)).To(Succeed()) - }() - go func() { defer GinkgoRecover() Expect(ctrl.Start(ctx)).To(Succeed()) @@ -230,6 +243,7 @@ var _ = Describe("controller", func() { }) It("should process events from source.Channel", func() { + ctrl.CacheSyncTimeout = 10 * time.Second // channel to be closed when event is processed processed := make(chan struct{}) // source channel @@ -269,6 +283,7 @@ var _ = Describe("controller", func() { }) It("should error when channel source is not specified", func() { + ctrl.CacheSyncTimeout = 10 * time.Second ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -281,24 +296,26 @@ var _ = Describe("controller", func() { }) It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { + ctrl.CacheSyncTimeout = 10 * time.Second started := false + ctx, cancel := context.WithCancel(context.Background()) src := source.Func(func(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error { defer GinkgoRecover() Expect(q).To(Equal(ctrl.Queue)) started = true + cancel() return nil }) Expect(ctrl.Watch(src)).NotTo(HaveOccurred()) - // Use a cancelled context so Start doesn't block - ctx, cancel := context.WithCancel(context.Background()) - cancel() - Expect(ctrl.Start(ctx)).To(Succeed()) + err := ctrl.Start(ctx) + Expect(err).To(Succeed()) Expect(started).To(BeTrue()) }) It("should return an error if there is an error starting sources", func() { + ctrl.CacheSyncTimeout = 10 * time.Second err := fmt.Errorf("Expected Error: could not start source") src := source.Func(func(context.Context, workqueue.TypedRateLimitingInterface[reconcile.Request], @@ -852,6 +869,15 @@ type singnallingSourceWrapper struct { source.SyncingSource } +func (s *singnallingSourceWrapper) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + err := s.SyncingSource.Start(ctx, q) + if err != nil { + // WaitForSync will never be called if this errors, so close the channel to prevent deadlocks in tests + close(s.cacheSyncDone) + } + return err +} + func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error { defer func() { close(s.cacheSyncDone) diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 4999edc432..2fdfbde8e3 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -52,7 +52,7 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) - ks.startedErr = make(chan error) + ks.startedErr = make(chan error, 1) // Buffer chan to not leak goroutines if WaitForSync isn't called go func() { var ( i cache.Informer