diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 1f1a245849..49942186c0 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -19,7 +19,10 @@ import ( type AddOpts struct { After time.Duration RateLimited bool - Priority int + // Priority is the priority of the item. Higher values + // indicate higher priority. + // Defaults to zero if unset. + Priority *int } // PriorityQueue is a priority queue for a controller. It @@ -154,7 +157,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { item := &item[T]{ Key: key, AddedCounter: w.addedCounter, - Priority: o.Priority, + Priority: ptr.Deref(o.Priority, 0), ReadyAt: readyAt, } w.items[key] = item @@ -169,12 +172,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { // The b-tree de-duplicates based on ordering and any change here // will affect the order - Just delete and re-add. item, _ := w.queue.Delete(w.items[key]) - if o.Priority > item.Priority { + if newPriority := ptr.Deref(o.Priority, 0); newPriority > item.Priority { // Update depth metric only if the item in the queue was already added to the depth metric. if item.ReadyAt == nil || w.becameReady.Has(key) { - w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority) + w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority) } - item.Priority = o.Priority + item.Priority = newPriority } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 7e5d3ba3ed..13cf59b7e8 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -12,6 +12,7 @@ import ( . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" ) var _ = Describe("Controllerworkqueue", func() { @@ -106,8 +107,8 @@ var _ = Describe("Controllerworkqueue", func() { q, metrics := newQueue() defer q.ShutDown() - q.AddWithOpts(AddOpts{Priority: 1}, "foo") - q.AddWithOpts(AddOpts{Priority: 2}, "foo") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo") + q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, "foo") item, priority, _ := q.GetWithPriority() Expect(item).To(Equal("foo")) @@ -126,7 +127,7 @@ var _ = Describe("Controllerworkqueue", func() { q.AddWithOpts(AddOpts{}, "foo") q.AddWithOpts(AddOpts{}, "bar") q.AddWithOpts(AddOpts{}, "baz") - q.AddWithOpts(AddOpts{Priority: 1}, "baz") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "baz") item, priority, _ := q.GetWithPriority() Expect(item).To(Equal("baz")) @@ -381,7 +382,7 @@ var _ = Describe("Controllerworkqueue", func() { if rn < 10 { q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i)) } else { - q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i)) + q.AddWithOpts(AddOpts{Priority: &rn}, fmt.Sprintf("foo%d", i)) } } @@ -623,8 +624,8 @@ func TestFuzzPriorityQueue(t *testing.T) { defer inQueueLock.Unlock() q.AddWithOpts(opts, item) - if existingPriority, exists := inQueue[item]; !exists || existingPriority < opts.Priority { - inQueue[item] = opts.Priority + if existingPriority, exists := inQueue[item]; !exists || existingPriority < ptr.Deref(opts.Priority, 0) { + inQueue[item] = ptr.Deref(opts.Priority, 0) } }() } diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index fe78f21a2c..62d6728151 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -20,6 +20,7 @@ import ( "context" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" @@ -141,7 +142,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue( if !ok { if lowPriority { q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{ - Priority: LowPriority, + Priority: ptr.To(LowPriority), }, req) } else { q.Add(req) diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 29e755cbfa..bae955d6fb 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -19,6 +19,7 @@ package handler import ( "context" "reflect" + "time" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" @@ -126,20 +127,14 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr h.CreateFunc(ctx, e, q) return } - wq := workqueueWithCustomAddFunc[request]{ - TypedRateLimitingInterface: q, + + wq := workqueueWithDefaultPriority[request]{ // We already know that we have a priority queue, that event.Object implements // client.Object and that its not nil - addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - var priority int - if e.IsInInitialList { - priority = LowPriority - } - q.(priorityqueue.PriorityQueue[request]).AddWithOpts( - priorityqueue.AddOpts{Priority: priority}, - item, - ) - }, + PriorityQueue: q.(priorityqueue.PriorityQueue[request]), + } + if e.IsInInitialList { + wq.priority = LowPriority } h.CreateFunc(ctx, e, wq) } @@ -160,20 +155,13 @@ func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUp return } - wq := workqueueWithCustomAddFunc[request]{ - TypedRateLimitingInterface: q, + wq := workqueueWithDefaultPriority[request]{ // We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement // client.Object and that they are not nil - addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - var priority int - if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() { - priority = LowPriority - } - q.(priorityqueue.PriorityQueue[request]).AddWithOpts( - priorityqueue.AddOpts{Priority: priority}, - item, - ) - }, + PriorityQueue: q.(priorityqueue.PriorityQueue[request]), + } + if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() { + wq.priority = LowPriority } h.UpdateFunc(ctx, e, wq) } @@ -201,13 +189,28 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty } } -type workqueueWithCustomAddFunc[request comparable] struct { - workqueue.TypedRateLimitingInterface[request] - addFunc func(item request, q workqueue.TypedRateLimitingInterface[request]) +type workqueueWithDefaultPriority[request comparable] struct { + priorityqueue.PriorityQueue[request] + priority int +} + +func (w workqueueWithDefaultPriority[request]) Add(item request) { + w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority}, item) } -func (w workqueueWithCustomAddFunc[request]) Add(item request) { - w.addFunc(item, w.TypedRateLimitingInterface) +func (w workqueueWithDefaultPriority[request]) AddAfter(item request, after time.Duration) { + w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority, After: after}, item) +} + +func (w workqueueWithDefaultPriority[request]) AddRateLimited(item request) { + w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority, RateLimited: true}, item) +} + +func (w workqueueWithDefaultPriority[request]) AddWithOpts(o priorityqueue.AddOpts, items ...request) { + if o.Priority == nil { + o.Priority = &w.priority + } + w.PriorityQueue.AddWithOpts(o, items...) } // addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler @@ -223,7 +226,7 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate if evt.IsInInitialList { priority = LowPriority } - priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item) } // addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler @@ -239,5 +242,5 @@ func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRate if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() { priority = LowPriority } - priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item) } diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index 5e9bf9974b..a4ee74723d 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -18,6 +18,7 @@ package handler_test import ( "context" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -776,8 +777,11 @@ var _ = Describe("Eventhandler", func() { Describe("WithLowPriorityWhenUnchanged", func() { handlerPriorityTests := []struct { - name string - handler func() handler.EventHandler + name string + handler func() handler.EventHandler + after time.Duration + ratelimited bool + overridePriority int }{ { name: "WithLowPriorityWhenUnchanged wrapper", @@ -837,6 +841,103 @@ var _ = Describe("Eventhandler", func() { }) }, }, + { + name: "WithLowPriorityWhenUnchanged - Add", + handler: func() handler.EventHandler { + return handler.WithLowPriorityWhenUnchanged( + handler.TypedFuncs[client.Object, reconcile.Request]{ + CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tce.Object.GetNamespace(), + Name: tce.Object.GetName(), + }}) + }, + UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tue.ObjectNew.GetNamespace(), + Name: tue.ObjectNew.GetName(), + }}) + }, + }) + }, + }, + { + name: "WithLowPriorityWhenUnchanged - AddAfter", + handler: func() handler.EventHandler { + return handler.WithLowPriorityWhenUnchanged( + handler.TypedFuncs[client.Object, reconcile.Request]{ + CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + wq.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tce.Object.GetNamespace(), + Name: tce.Object.GetName(), + }}, time.Second) + }, + UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + wq.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tue.ObjectNew.GetNamespace(), + Name: tue.ObjectNew.GetName(), + }}, time.Second) + }, + }) + }, + after: time.Second, + }, + { + name: "WithLowPriorityWhenUnchanged - AddRateLimited", + handler: func() handler.EventHandler { + return handler.WithLowPriorityWhenUnchanged( + handler.TypedFuncs[client.Object, reconcile.Request]{ + CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + wq.AddRateLimited(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tce.Object.GetNamespace(), + Name: tce.Object.GetName(), + }}) + }, + UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + wq.AddRateLimited(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tue.ObjectNew.GetNamespace(), + Name: tue.ObjectNew.GetName(), + }}) + }, + }) + }, + ratelimited: true, + }, + { + name: "WithLowPriorityWhenUnchanged - AddWithOpts priority is retained", + handler: func() handler.EventHandler { + return handler.WithLowPriorityWhenUnchanged( + handler.TypedFuncs[client.Object, reconcile.Request]{ + CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + if pq, isPQ := wq.(priorityqueue.PriorityQueue[reconcile.Request]); isPQ { + pq.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(100)}, reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tce.Object.GetNamespace(), + Name: tce.Object.GetName(), + }}) + return + } + wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tce.Object.GetNamespace(), + Name: tce.Object.GetName(), + }}) + }, + UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + if pq, isPQ := wq.(priorityqueue.PriorityQueue[reconcile.Request]); isPQ { + pq.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(100)}, reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tue.ObjectNew.GetNamespace(), + Name: tue.ObjectNew.GetName(), + }}) + return + } + wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tue.ObjectNew.GetNamespace(), + Name: tue.ObjectNew.GetName(), + }}) + }, + }) + }, + overridePriority: 100, + }, } for _, test := range handlerPriorityTests { When("handler is "+test.name, func() { @@ -862,7 +963,16 @@ var _ = Describe("Eventhandler", func() { IsInInitialList: true, }, wq) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) + expected := handler.LowPriority + if test.overridePriority != 0 { + expected = test.overridePriority + } + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{ + Priority: ptr.To(expected), + After: test.after, + RateLimited: test.ratelimited, + })) Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) @@ -888,7 +998,14 @@ var _ = Describe("Eventhandler", func() { IsInInitialList: false, }, wq) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) + if test.overridePriority != 0 { + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(test.overridePriority)})) + } else { + Expect(actualOpts).To(Or( + Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited}), + Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(0)}), + )) + } Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) @@ -919,7 +1036,12 @@ var _ = Describe("Eventhandler", func() { }}, }, wq) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) + expectedPriority := handler.LowPriority + if test.overridePriority != 0 { + expectedPriority = test.overridePriority + } + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(expectedPriority)})) Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) @@ -951,7 +1073,14 @@ var _ = Describe("Eventhandler", func() { }}, }, wq) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) + if test.overridePriority != 0 { + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(test.overridePriority)})) + } else { + Expect(actualOpts).To(Or( + Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited}), + Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(0)}), + )) + } Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 18042e41f6..d0f5be44e6 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -30,6 +30,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" @@ -449,7 +450,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, if errors.Is(err, reconcile.TerminalError(nil)) { ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc() } else { - c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req) } ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() @@ -464,11 +465,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter c.Queue.Forget(req) - c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: ptr.To(priority)}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: //nolint: staticcheck // We have to handle it until it is removed log.V(5).Info("Reconcile done, requeueing") - c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() default: log.V(5).Info("Reconcile successful") diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 71d778c041..74e5f3f2a2 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -702,7 +702,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() - q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) By("Invoking Reconciler which will request a requeue") fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil) @@ -714,7 +714,7 @@ var _ = Describe("controller", func() { }).Should(Equal([]priorityQueueAddition{{ AddOpts: priorityqueue.AddOpts{ RateLimited: true, - Priority: 10, + Priority: ptr.To(10), }, items: []reconcile.Request{request}, }})) @@ -761,7 +761,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() - q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) By("Invoking Reconciler which will ask for RequeueAfter") fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil) @@ -773,7 +773,7 @@ var _ = Describe("controller", func() { }).Should(Equal([]priorityQueueAddition{{ AddOpts: priorityqueue.AddOpts{ After: time.Millisecond * 100, - Priority: 10, + Priority: ptr.To(10), }, items: []reconcile.Request{request}, }})) @@ -819,7 +819,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() - q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) By("Invoking Reconciler which will return an error") fakeReconcile.AddResult(reconcile.Result{}, errors.New("oups, I did it again")) @@ -831,7 +831,7 @@ var _ = Describe("controller", func() { }).Should(Equal([]priorityQueueAddition{{ AddOpts: priorityqueue.AddOpts{ RateLimited: true, - Priority: 10, + Priority: ptr.To(10), }, items: []reconcile.Request{request}, }}))