Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
return informer, nil
}

c.InformersByGVK[gvk] = &controllertest.FakeInformer{}
// Set Synced to true by default so that WaitForCacheSync returns immediately
c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: true}
return c.InformersByGVK[gvk], nil
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type FakeInformer struct {
handlers []cache.ResourceEventHandler
}

// fakeHandlerRegistration implements cache.ResourceEventHandlerRegistration for testing.
type fakeHandlerRegistration struct {
informer *FakeInformer
}

// HasSynced implements cache.ResourceEventHandlerRegistration.
func (f *fakeHandlerRegistration) HasSynced() bool {
return f.informer.Synced
}

// AddIndexers does nothing. TODO(community): Implement this.
func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error {
return nil
Expand All @@ -60,19 +70,19 @@ func (f *FakeInformer) HasSynced() bool {
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// AddEventHandlerWithResyncPeriod implements the Informer interface. Adds an EventHandler to the fake Informers (ignores resyncPeriod). TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// AddEventHandlerWithOptions implements the Informer interface. Adds an EventHandler to the fake Informers (ignores options). TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// Run implements the Informer interface. Increments f.RunCount.
Expand Down
119 changes: 119 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,125 @@ var _ = Describe("controller", func() {
<-sourceSynced
})

It("should not call Reconcile until all event handlers have processed initial objects with WaitForHandlerSync", func(specCtx SpecContext) {
genPodName := func(i int) string {
return fmt.Sprintf("test-reconcile-order-pod-%d", i)
}
// Create some pods in the API server before starting the controller.
nPods := 20
pods := make([]*corev1.Pod, nPods)
for i := range nPods {
pods[i] = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: genPodName(i),
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "test", Image: "test"}},
},
}
_, err := clientset.CoreV1().Pods("default").Create(specCtx, pods[i], metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
}
defer func() {
for _, pod := range pods {
_ = clientset.CoreV1().Pods("default").Delete(specCtx, pod.Name, metav1.DeleteOptions{})
}
}()

testCache, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

// Tracks how many objects have been processed by the event handler.
var handlerProcessedCount atomic.Int32

// Channel to block one of the event handlers to simulate slow event handler processing.
blockHandler := make(chan struct{})

// Tracks whether Reconcile was called.
var reconcileCalled atomic.Bool
reconcileCalledChan := make(chan struct{})

// Create the controller.
testQueue := &controllertest.Queue{
TypedInterface: workqueue.NewTyped[reconcile.Request](),
}
var timeFirstReconcile time.Time
testCtrl := &Controller[reconcile.Request]{
MaxConcurrentReconciles: 1,
CacheSyncTimeout: 10 * time.Second,
Name: "test-reconcile-order",
NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
return testQueue
},
LogConstructor: func(_ *reconcile.Request) logr.Logger {
return log.RuntimeLog.WithName("test-reconcile-order")
},
Do: reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
if timeFirstReconcile.IsZero() {
timeFirstReconcile = time.Now()
}
// handlerProcessedCount should be equal to the number of pods created since we are waiting
// for the handlers to finish processing before reconciling.
Expect(handlerProcessedCount.Load()).To(BeNumerically("==", nPods))
reconcileCalled.Store(true)
close(reconcileCalledChan)
return reconcile.Result{}, nil
}),
}

var timeLastCreate time.Time
// Watch pods with an event handler that blocks all pods but the first one in the list.
// Use KindWithOptions with WithWaitForHandlerSync to ensure that Reconcile is not called until all
// initial objects have been processed by the event handlers.
firstPod := true
testCtrl.startWatches = []source.TypedSource[reconcile.Request]{
source.KindWithOptions(testCache, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{
CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if !firstPod {
<-blockHandler
}
firstPod = false
if val := handlerProcessedCount.Add(1); val == int32(nPods) {
timeLastCreate = time.Now()
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace()}})
},
}, []source.KindOption[*corev1.Pod, reconcile.Request]{source.WithWaitForHandlerSync[*corev1.Pod, reconcile.Request]()}),
}

ctx, cancel := context.WithCancel(specCtx)
defer cancel()
go func() {
defer GinkgoRecover()
_ = testCache.Start(ctx)
}()
controllerDone := make(chan error)
go func() {
defer GinkgoRecover()
controllerDone <- testCtrl.Start(ctx)
}()

// Verify the first pod was processed but the rest are still blocked.
time.Sleep(2 * time.Second)
Eventually(handlerProcessedCount.Load, 10*time.Second, 1*time.Second).
Should(Equal(int32(1)), "Only the first pod should have been processed")

// Unblock the handler which should result in Reconcile being called at some point.
close(blockHandler)
Eventually(reconcileCalledChan, 5*time.Second).Should(BeClosed())

// All handlers should have been eventually processed.
Eventually(handlerProcessedCount.Load, 5*time.Second, 1*time.Second).
Should(BeNumerically("==", nPods))

// Reconcile should not have been called until all handlers have processed the initial objects.
Expect(timeFirstReconcile.After(timeLastCreate)).To(BeTrue())

cancel()
Eventually(controllerDone, 5*time.Second).Should(Receive())
})

It("should process events from source.Channel", func(ctx SpecContext) {
ctrl.CacheSyncTimeout = 10 * time.Second
// channel to be closed when event is processed
Expand Down
16 changes: 14 additions & 2 deletions pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Kind[object client.Object, request comparable] struct {

Predicates []predicate.TypedPredicate[object]

// WaitForHandlerSync when set to true, waits for the handler registration's HasSynced
// before starting reconciliation.
WaitForHandlerSync bool

// startedErr may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
startedErr chan error
Expand Down Expand Up @@ -91,16 +95,24 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
return
}

_, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
handlerRegistration, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
Logger: &logKind,
})
if err != nil {
ks.startedErr <- err
return
}
sentError := false
if ks.WaitForHandlerSync && !toolscache.WaitForCacheSync(ctx.Done(), handlerRegistration.HasSynced) {
sentError = true
ks.startedErr <- errors.New("cache did not sync")
}
if !ks.Cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.startedErr <- errors.New("cache did not sync")
if !sentError {
sentError = true
ks.startedErr <- errors.New("cache did not sync")
}
}
close(ks.startedErr)
}()
Expand Down
40 changes: 39 additions & 1 deletion pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ type TypedSyncingSource[request comparable] interface {
WaitForSync(ctx context.Context) error
}

// KindOption is a functional option for Kind sources.
type KindOption[object client.Object, request comparable] func(*internal.Kind[object, request])

// WithWaitForHandlerSync configures the Kind source to wait for the handler registration's
// HasSynced before starting reconciliation. This ensures all initial objects from the first
// List have been delivered to the event handlers before reconciliation starts.
func WithWaitForHandlerSync[object client.Object, request comparable]() KindOption[object, request] {
return func(k *internal.Kind[object, request]) {
k.WaitForHandlerSync = true
}
}

// Kind creates a KindSource with the given cache provider.
func Kind[object client.Object](
cache cache.Cache,
Expand All @@ -83,19 +95,45 @@ func Kind[object client.Object](
return TypedKind(cache, obj, handler, predicates...)
}

// KindWithOptions creates a KindSource with the given cache provider and options.
func KindWithOptions[object client.Object](
cache cache.Cache,
obj object,
handler handler.TypedEventHandler[object, reconcile.Request],
opts []KindOption[object, reconcile.Request],
predicates ...predicate.TypedPredicate[object],
) SyncingSource {
return TypedKindWithOptions(cache, obj, handler, opts, predicates...)
}

// TypedKind creates a KindSource with the given cache provider.
func TypedKind[object client.Object, request comparable](
cache cache.Cache,
obj object,
handler handler.TypedEventHandler[object, request],
predicates ...predicate.TypedPredicate[object],
) TypedSyncingSource[request] {
return &internal.Kind[object, request]{
return TypedKindWithOptions(cache, obj, handler, nil, predicates...)
}

// TypedKindWithOptions creates a KindSource with the given cache provider and options.
func TypedKindWithOptions[object client.Object, request comparable](
cache cache.Cache,
obj object,
handler handler.TypedEventHandler[object, request],
opts []KindOption[object, request],
predicates ...predicate.TypedPredicate[object],
) TypedSyncingSource[request] {
k := &internal.Kind[object, request]{
Type: obj,
Cache: cache,
Handler: handler,
Predicates: predicates,
}
for _, opt := range opts {
opt(k)
}
return k
}

var _ Source = &channel[string, reconcile.Request]{}
Expand Down