diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index a1a442316f..5f907d774b 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -116,7 +116,8 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec return informer, nil } - c.InformersByGVK[gvk] = &controllertest.FakeInformer{} + // Set Synced to true by default so that WaitForCacheSync returns immediately + c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: true} return c.InformersByGVK[gvk], nil } diff --git a/pkg/controller/controllertest/util.go b/pkg/controller/controllertest/util.go index 2c9a248899..8df24fcf57 100644 --- a/pkg/controller/controllertest/util.go +++ b/pkg/controller/controllertest/util.go @@ -37,6 +37,16 @@ type FakeInformer struct { handlers []cache.ResourceEventHandler } +// fakeHandlerRegistration implements cache.ResourceEventHandlerRegistration for testing. +type fakeHandlerRegistration struct { + informer *FakeInformer +} + +// HasSynced implements cache.ResourceEventHandlerRegistration. +func (f *fakeHandlerRegistration) HasSynced() bool { + return f.informer.Synced +} + // AddIndexers does nothing. TODO(community): Implement this. func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error { return nil @@ -60,19 +70,19 @@ func (f *FakeInformer) HasSynced() bool { // AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration. func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { f.handlers = append(f.handlers, handler) - return nil, nil + return &fakeHandlerRegistration{informer: f}, nil } // AddEventHandlerWithResyncPeriod implements the Informer interface. Adds an EventHandler to the fake Informers (ignores resyncPeriod). TODO(community): Implement Registration. func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { f.handlers = append(f.handlers, handler) - return nil, nil + return &fakeHandlerRegistration{informer: f}, nil } // AddEventHandlerWithOptions implements the Informer interface. Adds an EventHandler to the fake Informers (ignores options). TODO(community): Implement Registration. func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) { f.handlers = append(f.handlers, handler) - return nil, nil + return &fakeHandlerRegistration{informer: f}, nil } // Run implements the Informer interface. Increments f.RunCount. diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 1cef9cc602..5e66e71b35 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -386,6 +386,118 @@ var _ = Describe("controller", func() { <-sourceSynced }) + It("should not call Reconcile until all event handlers have processed initial objects with WaitForHandlerSync", func(specCtx SpecContext) { + genPodName := func(i int) string { + return fmt.Sprintf("test-reconcile-order-pod-%d", i) + } + // Create some pods in the API server before starting the controller. + nPods := 20 + pods := make([]*corev1.Pod, nPods) + for i := range nPods { + pods[i] = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: genPodName(i), + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "test"}}, + }, + } + _, err := clientset.CoreV1().Pods("default").Create(specCtx, pods[i], metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + } + defer func() { + for _, pod := range pods { + _ = clientset.CoreV1().Pods("default").Delete(specCtx, pod.Name, metav1.DeleteOptions{}) + } + }() + + testCache, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + // Tracks how many objects have been processed by the event handler. + var handlerProcessedCount atomic.Int32 + + // Channel to block one of the event handlers to simulate slow event handler processing. + blockHandler := make(chan struct{}) + + // Tracks whether Reconcile was called. + var reconcileCalled atomic.Bool + + // Create the controller. + var timeFirstReconcile time.Time + testCtrl := New[reconcile.Request](Options[reconcile.Request]{ + MaxConcurrentReconciles: 1, + CacheSyncTimeout: 10 * time.Second, + Name: "test-reconcile-order", + NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return queue + }, + LogConstructor: func(_ *reconcile.Request) logr.Logger { + return log.RuntimeLog.WithName("test-reconcile-order") + }, + Do: reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + if timeFirstReconcile.IsZero() { + timeFirstReconcile = time.Now() + } + // handlerProcessedCount should be equal to the number of pods created since we are waiting + // for the handlers to finish processing before reconciling. + Expect(handlerProcessedCount.Load()).To(BeNumerically("==", nPods)) + reconcileCalled.Store(true) + return reconcile.Result{}, nil + })}, + ) + + // Watch pods with an event handler that blocks all pods but the first one in the list. + // Use KindWithOptions with WithWaitForHandlerSync to ensure that Reconcile is not called until all + // initial objects have been processed by the event handlers. + var timeLastCreate time.Time + firstPod := true + err = testCtrl.Watch(source.KindWithOptions(testCache, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{ + CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + if !firstPod { + <-blockHandler + } + firstPod = false + if val := handlerProcessedCount.Add(1); val == int32(nPods) { + timeLastCreate = time.Now() + } + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace()}}) + }, + }, []source.KindOption[*corev1.Pod, reconcile.Request]{source.WithWaitForHandlerSync[*corev1.Pod, reconcile.Request]()})) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(specCtx) + defer cancel() + go func() { + defer GinkgoRecover() + _ = testCache.Start(ctx) + }() + controllerDone := make(chan error) + go func() { + defer GinkgoRecover() + controllerDone <- testCtrl.Start(ctx) + }() + + // Verify the first pod was processed but the rest are still blocked. + Eventually(handlerProcessedCount.Load, 10*time.Second, 1*time.Second). + Should(Equal(int32(1)), "Only the first pod should have been processed") + + // Unblock the handler which should result in Reconcile being called at some point. + close(blockHandler) + Eventually(reconcileCalled.Load, 5*time.Second).Should(BeTrue()) + + // All handlers should have been eventually processed. + Eventually(handlerProcessedCount.Load, 5*time.Second, 1*time.Second). + Should(BeNumerically("==", nPods)) + + // Reconcile should not have been called until all handlers have processed the initial objects. + Expect(timeFirstReconcile.After(timeLastCreate)).To(BeTrue()) + + cancel() + Eventually(controllerDone, 5*time.Second).Should(Receive()) + }) + It("should process events from source.Channel", func(ctx SpecContext) { ctrl.CacheSyncTimeout = 10 * time.Second // channel to be closed when event is processed diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 2854244523..311ff7e717 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -34,6 +34,10 @@ type Kind[object client.Object, request comparable] struct { Predicates []predicate.TypedPredicate[object] + // WaitForHandlerSync when set to true, waits for the handler registration's HasSynced + // before starting reconciliation. + WaitForHandlerSync bool + // startedErr may contain an error if one was encountered during startup. If its closed and does not // contain an error, startup and syncing finished. startedErr chan error @@ -91,16 +95,23 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type return } - _, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{ + handlerRegistration, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{ Logger: &logKind, }) if err != nil { ks.startedErr <- err return } + sentError := false + if ks.WaitForHandlerSync && !toolscache.WaitForCacheSync(ctx.Done(), handlerRegistration.HasSynced) { + sentError = true + ks.startedErr <- errors.New("cache did not sync") + } if !ks.Cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here - ks.startedErr <- errors.New("cache did not sync") + if !sentError { + ks.startedErr <- errors.New("cache did not sync") + } } close(ks.startedErr) }() diff --git a/pkg/source/source.go b/pkg/source/source.go index c2c2dc4e07..33570d09c2 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -73,6 +73,18 @@ type TypedSyncingSource[request comparable] interface { WaitForSync(ctx context.Context) error } +// KindOption is a functional option for Kind sources. +type KindOption[object client.Object, request comparable] func(*internal.Kind[object, request]) + +// WithWaitForHandlerSync configures the Kind source to wait for the handler registration's +// HasSynced before starting reconciliation. This ensures all initial objects from the first +// List have been delivered to the event handlers before reconciliation starts. +func WithWaitForHandlerSync[object client.Object, request comparable]() KindOption[object, request] { + return func(k *internal.Kind[object, request]) { + k.WaitForHandlerSync = true + } +} + // Kind creates a KindSource with the given cache provider. func Kind[object client.Object]( cache cache.Cache, @@ -83,6 +95,17 @@ func Kind[object client.Object]( return TypedKind(cache, obj, handler, predicates...) } +// KindWithOptions creates a KindSource with the given cache provider and options. +func KindWithOptions[object client.Object]( + cache cache.Cache, + obj object, + handler handler.TypedEventHandler[object, reconcile.Request], + opts []KindOption[object, reconcile.Request], + predicates ...predicate.TypedPredicate[object], +) SyncingSource { + return TypedKindWithOptions(cache, obj, handler, opts, predicates...) +} + // TypedKind creates a KindSource with the given cache provider. func TypedKind[object client.Object, request comparable]( cache cache.Cache, @@ -90,12 +113,27 @@ func TypedKind[object client.Object, request comparable]( handler handler.TypedEventHandler[object, request], predicates ...predicate.TypedPredicate[object], ) TypedSyncingSource[request] { - return &internal.Kind[object, request]{ + return TypedKindWithOptions(cache, obj, handler, nil, predicates...) +} + +// TypedKindWithOptions creates a KindSource with the given cache provider and options. +func TypedKindWithOptions[object client.Object, request comparable]( + cache cache.Cache, + obj object, + handler handler.TypedEventHandler[object, request], + opts []KindOption[object, request], + predicates ...predicate.TypedPredicate[object], +) TypedSyncingSource[request] { + k := &internal.Kind[object, request]{ Type: obj, Cache: cache, Handler: handler, Predicates: predicates, } + for _, opt := range opts { + opt(k) + } + return k } var _ Source = &channel[string, reconcile.Request]{}