Skip to content

Commit 43b0e35

Browse files
✨ Delay reconciliation until handlers sync (kubernetes-sigs#3406)
* Delay reconciliation until handlers sync * Address comments * Revert some changes so that the unit tests pass
1 parent 137b9c0 commit 43b0e35

File tree

4 files changed

+115
-6
lines changed

4 files changed

+115
-6
lines changed

pkg/cache/informertest/fake_cache.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
116116
return informer, nil
117117
}
118118

119-
c.InformersByGVK[gvk] = &controllertest.FakeInformer{}
119+
// Set Synced to true by default so that WaitForCacheSync returns immediately
120+
c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: true}
120121
return c.InformersByGVK[gvk], nil
121122
}
122123

pkg/controller/controllertest/util.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ type FakeInformer struct {
3737
handlers []cache.ResourceEventHandler
3838
}
3939

40+
// fakeHandlerRegistration implements cache.ResourceEventHandlerRegistration for testing.
41+
type fakeHandlerRegistration struct {
42+
informer *FakeInformer
43+
}
44+
45+
// HasSynced implements cache.ResourceEventHandlerRegistration.
46+
func (f *fakeHandlerRegistration) HasSynced() bool {
47+
return f.informer.Synced
48+
}
49+
4050
// AddIndexers does nothing. TODO(community): Implement this.
4151
func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error {
4252
return nil
@@ -60,19 +70,19 @@ func (f *FakeInformer) HasSynced() bool {
6070
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
6171
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
6272
f.handlers = append(f.handlers, handler)
63-
return nil, nil
73+
return &fakeHandlerRegistration{informer: f}, nil
6474
}
6575

6676
// AddEventHandlerWithResyncPeriod implements the Informer interface. Adds an EventHandler to the fake Informers (ignores resyncPeriod). TODO(community): Implement Registration.
6777
func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) {
6878
f.handlers = append(f.handlers, handler)
69-
return nil, nil
79+
return &fakeHandlerRegistration{informer: f}, nil
7080
}
7181

7282
// AddEventHandlerWithOptions implements the Informer interface. Adds an EventHandler to the fake Informers (ignores options). TODO(community): Implement Registration.
7383
func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) {
7484
f.handlers = append(f.handlers, handler)
75-
return nil, nil
85+
return &fakeHandlerRegistration{informer: f}, nil
7686
}
7787

7888
// Run implements the Informer interface. Increments f.RunCount.

pkg/internal/controller/controller_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"strconv"
2324
"sync"
2425
"sync/atomic"
2526
"time"
@@ -386,6 +387,96 @@ var _ = Describe("controller", func() {
386387
<-sourceSynced
387388
})
388389

390+
It("should not call Reconcile until all event handlers have processed initial objects", func(specCtx SpecContext) {
391+
nPods := 20
392+
pods := make([]*corev1.Pod, nPods)
393+
for i := range nPods {
394+
pods[i] = &corev1.Pod{
395+
ObjectMeta: metav1.ObjectMeta{
396+
Name: strconv.Itoa(i),
397+
Namespace: "default",
398+
},
399+
Spec: corev1.PodSpec{
400+
Containers: []corev1.Container{{Name: "test", Image: "test"}},
401+
},
402+
}
403+
_, err := clientset.CoreV1().Pods("default").Create(specCtx, pods[i], metav1.CreateOptions{})
404+
Expect(err).NotTo(HaveOccurred())
405+
}
406+
defer func() {
407+
for _, pod := range pods {
408+
_ = clientset.CoreV1().Pods("default").Delete(specCtx, pod.Name, metav1.DeleteOptions{})
409+
}
410+
}()
411+
412+
testCache, err := cache.New(cfg, cache.Options{})
413+
Expect(err).NotTo(HaveOccurred())
414+
415+
ctx, cancel := context.WithCancel(specCtx)
416+
defer cancel()
417+
go func() {
418+
defer GinkgoRecover()
419+
_ = testCache.Start(ctx)
420+
}()
421+
422+
// Tracks how many objects have been processed by the event handler.
423+
var handlerProcessedCount atomic.Int32
424+
425+
// Channel to block one of the event handlers to simulate slow event handler processing.
426+
blockHandler := make(chan struct{})
427+
428+
// Tracks whether Reconcile was called.
429+
var reconcileCalled atomic.Bool
430+
431+
// Create the controller.
432+
testCtrl := New(Options[reconcile.Request]{
433+
MaxConcurrentReconciles: 1,
434+
CacheSyncTimeout: 10 * time.Second,
435+
NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
436+
return &controllertest.Queue{
437+
TypedInterface: workqueue.NewTyped[reconcile.Request](),
438+
}
439+
},
440+
Name: "test-reconcile-order",
441+
LogConstructor: func(_ *reconcile.Request) logr.Logger {
442+
return log.RuntimeLog.WithName("test-reconcile-order")
443+
},
444+
Do: reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
445+
// handlerProcessedCount should be equal to the number of pods created since we are waiting
446+
// for the handlers to finish processing before reconciling.
447+
Expect(handlerProcessedCount.Load()).To(Equal(int32(nPods)))
448+
reconcileCalled.Store(true)
449+
return reconcile.Result{}, nil
450+
})},
451+
)
452+
453+
err = testCtrl.Watch(source.Kind(testCache, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{
454+
CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
455+
<-blockHandler
456+
handlerProcessedCount.Add(1)
457+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace()}})
458+
},
459+
}))
460+
Expect(err).NotTo(HaveOccurred())
461+
462+
controllerDone := make(chan error)
463+
go func() {
464+
defer GinkgoRecover()
465+
controllerDone <- testCtrl.Start(ctx)
466+
}()
467+
468+
// Give the controller time to start the reconciler. We asserts
469+
// in there that all events have been processed, so if we start it
470+
// prematurely, that assertion will fail. We can not get rid of the
471+
// sleep unless we stop using envtest for this test.
472+
time.Sleep(1 * time.Second)
473+
close(blockHandler)
474+
Eventually(reconcileCalled.Load).Should(BeTrue())
475+
476+
cancel()
477+
Eventually(controllerDone, 5*time.Second).Should(Receive())
478+
})
479+
389480
It("should process events from source.Channel", func(ctx SpecContext) {
390481
ctrl.CacheSyncTimeout = 10 * time.Second
391482
// channel to be closed when event is processed

pkg/internal/source/kind.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,23 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
9191
return
9292
}
9393

94-
_, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
94+
handlerRegistration, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
9595
Logger: &logKind,
9696
})
9797
if err != nil {
9898
ks.startedErr <- err
9999
return
100100
}
101+
// First, wait for the cache to sync. For real caches this waits for startup.
102+
// For fakes with Synced=false, this returns immediately allowing fast failure.
101103
if !ks.Cache.WaitForCacheSync(ctx) {
102-
// Would be great to return something more informative here
103104
ks.startedErr <- errors.New("cache did not sync")
105+
close(ks.startedErr)
106+
return
107+
}
108+
// Then wait for this specific handler to receive all initial events.
109+
if !toolscache.WaitForCacheSync(ctx.Done(), handlerRegistration.HasSynced) {
110+
ks.startedErr <- errors.New("handler did not sync")
104111
}
105112
close(ks.startedErr)
106113
}()

0 commit comments

Comments
 (0)