diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index d4047b7a09..8bef660003 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -60,7 +61,7 @@ type Controller[request comparable] struct { // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing - Queue workqueue.TypedRateLimitingInterface[request] + Queue priorityqueue.PriorityQueue[request] // mu is used to synchronize Controller setup mu sync.Mutex @@ -157,7 +158,12 @@ func (c *Controller[request]) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx - c.Queue = c.NewQueue(c.Name, c.RateLimiter) + queue := c.NewQueue(c.Name, c.RateLimiter) + if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue { + c.Queue = priorityQueue + } else { + c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue} + } go func() { <-ctx.Done() c.Queue.ShutDown() @@ -268,7 +274,7 @@ func (c *Controller[request]) Start(ctx context.Context) error { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.Queue.Get() + obj, priority, shutdown := c.Queue.GetWithPriority() if shutdown { // Stop working return false @@ -285,7 +291,7 @@ func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool { ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) - c.reconcileHandler(ctx, obj) + c.reconcileHandler(ctx, obj, priority) return true } @@ -308,7 +314,7 @@ func (c *Controller[request]) initMetrics() { ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0) } -func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) { +func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, priority int) { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { @@ -331,7 +337,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) if errors.Is(err, reconcile.TerminalError(nil)) { ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc() } else { - c.Queue.AddRateLimited(req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) } ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() @@ -346,11 +352,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter c.Queue.Forget(req) - c.Queue.AddAfter(req, result.RequeueAfter) + c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: //nolint: staticcheck // We have to handle it until it is removed log.V(5).Info("Reconcile done, requeueing") - c.Queue.AddRateLimited(req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() default: log.V(5).Info("Reconcile successful") @@ -388,3 +394,25 @@ type reconcileIDKey struct{} func addReconcileID(ctx context.Context, reconcileID types.UID) context.Context { return context.WithValue(ctx, reconcileIDKey{}, reconcileID) } + +type priorityQueueWrapper[request comparable] struct { + workqueue.TypedRateLimitingInterface[request] +} + +func (p *priorityQueueWrapper[request]) AddWithOpts(opts priorityqueue.AddOpts, items ...request) { + for _, item := range items { + switch { + case opts.RateLimited: + p.TypedRateLimitingInterface.AddRateLimited(item) + case opts.After > 0: + p.TypedRateLimitingInterface.AddAfter(item, opts.After) + default: + p.TypedRateLimitingInterface.Add(item) + } + } +} + +func (p *priorityQueueWrapper[request]) GetWithPriority() (request, int, bool) { + item, shutdown := p.TypedRateLimitingInterface.Get() + return item, 0, shutdown +} diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 3a23156a9c..bf334d22e8 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" @@ -345,9 +346,10 @@ var _ = Describe("controller", func() { }) It("should check for correct TypedSyncingSource if custom types are used", func() { - queue := &controllertest.TypedQueue[TestRequest]{ - TypedInterface: workqueue.NewTyped[TestRequest](), - } + queue := &priorityQueueWrapper[TestRequest]{ + TypedRateLimitingInterface: &controllertest.TypedQueue[TestRequest]{ + TypedInterface: workqueue.NewTyped[TestRequest](), + }} ctrl := &Controller[TestRequest]{ NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] { return queue @@ -400,10 +402,6 @@ var _ = Describe("controller", func() { Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) }) - PIt("should forget an item if it is not a Request and continue processing items", func() { - // TODO(community): write this test - }) - It("should requeue a Request if there is an error and continue processing items", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -523,6 +521,37 @@ var _ = Describe("controller", func() { Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) + It("should retain the priority when the reconciler requests a requeue", func() { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + + By("Invoking Reconciler which will request a requeue") + fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + RateLimited: true, + Priority: 10, + }, + items: []reconcile.Request{request}, + }})) + }) + It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() { dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { @@ -555,6 +584,37 @@ var _ = Describe("controller", func() { Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) + It("should retain the priority with RequeAfter", func() { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + + By("Invoking Reconciler which will ask for RequeueAfter") + fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + After: time.Millisecond * 100, + Priority: 10, + }, + items: []reconcile.Request{request}, + }})) + }) + It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { @@ -586,6 +646,37 @@ var _ = Describe("controller", func() { Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) + It("should retain the priority when there was an error", func() { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + + By("Invoking Reconciler which will return an error") + fakeReconcile.AddResult(reconcile.Result{}, errors.New("oups, I did it again")) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + RateLimited: true, + Priority: 10, + }, + items: []reconcile.Request{request}, + }})) + }) + PIt("should return if the queue is shutdown", func() { // TODO(community): write this test }) @@ -977,3 +1068,21 @@ func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error { return ctx.Err() } } + +type priorityQueueAddition struct { + priorityqueue.AddOpts + items []reconcile.Request +} + +type fakePriorityQueue struct { + priorityqueue.PriorityQueue[reconcile.Request] + + lock sync.Mutex + added []priorityQueueAddition +} + +func (f *fakePriorityQueue) AddWithOpts(o priorityqueue.AddOpts, items ...reconcile.Request) { + f.lock.Lock() + defer f.lock.Unlock() + f.added = append(f.added, priorityQueueAddition{AddOpts: o, items: items}) +}