Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
return informer, nil
}

c.InformersByGVK[gvk] = &controllertest.FakeInformer{}
// Set Synced to true by default so that WaitForCacheSync returns immediately.
// If c.Synced is explicitly set, propagate it to the FakeInformer so that
// handlerRegistration.HasSynced() returns the same value.
synced := c.Synced == nil || *c.Synced
c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: synced}
return c.InformersByGVK[gvk], nil
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type FakeInformer struct {
handlers []cache.ResourceEventHandler
}

// fakeHandlerRegistration implements cache.ResourceEventHandlerRegistration for testing.
type fakeHandlerRegistration struct {
informer *FakeInformer
}

// HasSynced implements cache.ResourceEventHandlerRegistration.
func (f *fakeHandlerRegistration) HasSynced() bool {
return f.informer.Synced
}

// AddIndexers does nothing. TODO(community): Implement this.
func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error {
return nil
Expand All @@ -60,19 +70,19 @@ func (f *FakeInformer) HasSynced() bool {
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// AddEventHandlerWithResyncPeriod implements the Informer interface. Adds an EventHandler to the fake Informers (ignores resyncPeriod). TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// AddEventHandlerWithOptions implements the Informer interface. Adds an EventHandler to the fake Informers (ignores options). TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// Run implements the Informer interface. Increments f.RunCount.
Expand Down
91 changes: 91 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -386,6 +387,96 @@ var _ = Describe("controller", func() {
<-sourceSynced
})

It("should not call Reconcile until all event handlers have processed initial objects", func(specCtx SpecContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simler version of the test below, main changes:

  • Start cache right at the beginning so it syncs in the background
  • The firstPod handling doesn't seem neccessary
  • Its enough to test inside reconcile that the handler processed everything to validate the ordering, we don't need to also test on the outside that reconcile was called after the handler finished
		It("should not call Reconcile until all event handlers have processed initial objects", func(specCtx SpecContext) {
			nPods := 20
			pods := make([]*corev1.Pod, nPods)
			for i := range nPods {
				pods[i] = &corev1.Pod{
					ObjectMeta: metav1.ObjectMeta{
						Name:      strconv.Itoa(i),
						Namespace: "default",
					},
					Spec: corev1.PodSpec{
						Containers: []corev1.Container{{Name: "test", Image: "test"}},
					},
				}
				_, err := clientset.CoreV1().Pods("default").Create(specCtx, pods[i], metav1.CreateOptions{})
				Expect(err).NotTo(HaveOccurred())
			}
			defer func() {
				for _, pod := range pods {
					_ = clientset.CoreV1().Pods("default").Delete(specCtx, pod.Name, metav1.DeleteOptions{})
				}
			}()

			testCache, err := cache.New(cfg, cache.Options{})
			Expect(err).NotTo(HaveOccurred())

			ctx, cancel := context.WithCancel(specCtx)
			defer cancel()
			go func() {
				defer GinkgoRecover()
				_ = testCache.Start(ctx)
			}()

			// Tracks how many objects have been processed by the event handler.
			var handlerProcessedCount atomic.Int32

			// Channel to block one of the event handlers to simulate slow event handler processing.
			blockHandler := make(chan struct{})

			// Tracks whether Reconcile was called.
			var reconcileCalled atomic.Bool

			// Create the controller.
			testCtrl := New(Options[reconcile.Request]{
				MaxConcurrentReconciles: 1,
				CacheSyncTimeout:        10 * time.Second,
				NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
					return &controllertest.Queue{
						TypedInterface: workqueue.NewTyped[reconcile.Request](),
					}
				},
				Name: "test-reconcile-order",
				LogConstructor: func(_ *reconcile.Request) logr.Logger {
					return log.RuntimeLog.WithName("test-reconcile-order")
				},
				Do: reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
					// handlerProcessedCount should be equal to the number of pods created since we are waiting
					// for the handlers to finish processing before reconciling.
					Expect(handlerProcessedCount.Load()).To(Equal(int32(nPods)))
					reconcileCalled.Store(true)
					return reconcile.Result{}, nil
				})},
			)

			// Watch pods with an event handler that blocks all pods but the first one in the list.
			// Kind sources wait for handler sync to ensure that Reconcile is not called until all
			// initial objects have been processed by the event handlers.
			err = testCtrl.Watch(source.Kind(testCache, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{
				CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
					<-blockHandler
					handlerProcessedCount.Add(1)
					q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace()}})
				},
			}))
			Expect(err).NotTo(HaveOccurred())

			controllerDone := make(chan error)
			go func() {
				defer GinkgoRecover()
				controllerDone <- testCtrl.Start(ctx)
			}()

			// Give the controller time to start the reconciler. We asserts
			// in there that all events have been processed, so if we start it
			// prematurely, that assertion will fail. We can not get rid of the
			// sleep unless we stop using envtest for this test.
			time.Sleep(1 * time.Second)
			close(blockHandler)
			Eventually(reconcileCalled.Load).Should(BeTrue())

			cancel()
			Eventually(controllerDone, 5*time.Second).Should(Receive())
		})

nPods := 20
pods := make([]*corev1.Pod, nPods)
for i := range nPods {
pods[i] = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: strconv.Itoa(i),
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "test", Image: "test"}},
},
}
_, err := clientset.CoreV1().Pods("default").Create(specCtx, pods[i], metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
}
defer func() {
for _, pod := range pods {
_ = clientset.CoreV1().Pods("default").Delete(specCtx, pod.Name, metav1.DeleteOptions{})
}
}()

testCache, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(specCtx)
defer cancel()
go func() {
defer GinkgoRecover()
_ = testCache.Start(ctx)
}()

// Tracks how many objects have been processed by the event handler.
var handlerProcessedCount atomic.Int32

// Channel to block one of the event handlers to simulate slow event handler processing.
blockHandler := make(chan struct{})

// Tracks whether Reconcile was called.
var reconcileCalled atomic.Bool

// Create the controller.
testCtrl := New(Options[reconcile.Request]{
MaxConcurrentReconciles: 1,
CacheSyncTimeout: 10 * time.Second,
NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
return &controllertest.Queue{
TypedInterface: workqueue.NewTyped[reconcile.Request](),
}
},
Name: "test-reconcile-order",
LogConstructor: func(_ *reconcile.Request) logr.Logger {
return log.RuntimeLog.WithName("test-reconcile-order")
},
Do: reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
// handlerProcessedCount should be equal to the number of pods created since we are waiting
// for the handlers to finish processing before reconciling.
Expect(handlerProcessedCount.Load()).To(Equal(int32(nPods)))
reconcileCalled.Store(true)
return reconcile.Result{}, nil
})},
)

err = testCtrl.Watch(source.Kind(testCache, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{
CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
<-blockHandler
handlerProcessedCount.Add(1)
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace()}})
},
}))
Expect(err).NotTo(HaveOccurred())

controllerDone := make(chan error)
go func() {
defer GinkgoRecover()
controllerDone <- testCtrl.Start(ctx)
}()

// Give the controller time to start the reconciler. We asserts
// in there that all events have been processed, so if we start it
// prematurely, that assertion will fail. We can not get rid of the
// sleep unless we stop using envtest for this test.
time.Sleep(1 * time.Second)
close(blockHandler)
Eventually(reconcileCalled.Load).Should(BeTrue())

cancel()
Eventually(controllerDone, 5*time.Second).Should(Receive())
})

It("should process events from source.Channel", func(ctx SpecContext) {
ctrl.CacheSyncTimeout = 10 * time.Second
// channel to be closed when event is processed
Expand Down
5 changes: 2 additions & 3 deletions pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,14 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
return
}

_, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
handlerRegistration, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
Logger: &logKind,
})
if err != nil {
ks.startedErr <- err
return
}
if !ks.Cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
if !toolscache.WaitForCacheSync(ctx.Done(), handlerRegistration.HasSynced) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we check both? Isn't the Cache.WaitForCacheSync implicit in the registration having synced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, I removed it and some unit tests fail sine they are moking the informers WaitForCacheSync return value and I cannot make it work. I'm not sure if the registration sync is equivalent to the cache sync though. I'm not sure if the cache can have more sources

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I guess there is little harm in having both so lets re-add it

ks.startedErr <- errors.New("cache did not sync")
}
close(ks.startedErr)
Expand Down
Loading