diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 1f1a245849..49942186c0 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -19,7 +19,10 @@ import ( type AddOpts struct { After time.Duration RateLimited bool - Priority int + // Priority is the priority of the item. Higher values + // indicate higher priority. + // Defaults to zero if unset. + Priority *int } // PriorityQueue is a priority queue for a controller. It @@ -154,7 +157,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { item := &item[T]{ Key: key, AddedCounter: w.addedCounter, - Priority: o.Priority, + Priority: ptr.Deref(o.Priority, 0), ReadyAt: readyAt, } w.items[key] = item @@ -169,12 +172,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { // The b-tree de-duplicates based on ordering and any change here // will affect the order - Just delete and re-add. item, _ := w.queue.Delete(w.items[key]) - if o.Priority > item.Priority { + if newPriority := ptr.Deref(o.Priority, 0); newPriority > item.Priority { // Update depth metric only if the item in the queue was already added to the depth metric. if item.ReadyAt == nil || w.becameReady.Has(key) { - w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority) + w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority) } - item.Priority = o.Priority + item.Priority = newPriority } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 7e5d3ba3ed..13cf59b7e8 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -12,6 +12,7 @@ import ( . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" ) var _ = Describe("Controllerworkqueue", func() { @@ -106,8 +107,8 @@ var _ = Describe("Controllerworkqueue", func() { q, metrics := newQueue() defer q.ShutDown() - q.AddWithOpts(AddOpts{Priority: 1}, "foo") - q.AddWithOpts(AddOpts{Priority: 2}, "foo") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo") + q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, "foo") item, priority, _ := q.GetWithPriority() Expect(item).To(Equal("foo")) @@ -126,7 +127,7 @@ var _ = Describe("Controllerworkqueue", func() { q.AddWithOpts(AddOpts{}, "foo") q.AddWithOpts(AddOpts{}, "bar") q.AddWithOpts(AddOpts{}, "baz") - q.AddWithOpts(AddOpts{Priority: 1}, "baz") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "baz") item, priority, _ := q.GetWithPriority() Expect(item).To(Equal("baz")) @@ -381,7 +382,7 @@ var _ = Describe("Controllerworkqueue", func() { if rn < 10 { q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i)) } else { - q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i)) + q.AddWithOpts(AddOpts{Priority: &rn}, fmt.Sprintf("foo%d", i)) } } @@ -623,8 +624,8 @@ func TestFuzzPriorityQueue(t *testing.T) { defer inQueueLock.Unlock() q.AddWithOpts(opts, item) - if existingPriority, exists := inQueue[item]; !exists || existingPriority < opts.Priority { - inQueue[item] = opts.Priority + if existingPriority, exists := inQueue[item]; !exists || existingPriority < ptr.Deref(opts.Priority, 0) { + inQueue[item] = ptr.Deref(opts.Priority, 0) } }() } diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index fe78f21a2c..62d6728151 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -20,6 +20,7 @@ import ( "context" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" @@ -141,7 +142,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue( if !ok { if lowPriority { q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{ - Priority: LowPriority, + Priority: ptr.To(LowPriority), }, req) } else { q.Add(req) diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 29e755cbfa..9a05a73d2a 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -136,7 +136,7 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr priority = LowPriority } q.(priorityqueue.PriorityQueue[request]).AddWithOpts( - priorityqueue.AddOpts{Priority: priority}, + priorityqueue.AddOpts{Priority: &priority}, item, ) }, @@ -170,7 +170,7 @@ func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUp priority = LowPriority } q.(priorityqueue.PriorityQueue[request]).AddWithOpts( - priorityqueue.AddOpts{Priority: priority}, + priorityqueue.AddOpts{Priority: &priority}, item, ) }, @@ -223,7 +223,7 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate if evt.IsInInitialList { priority = LowPriority } - priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item) } // addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler @@ -239,5 +239,5 @@ func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRate if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() { priority = LowPriority } - priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item) } diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index 5e9bf9974b..3b0417ad8d 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -862,7 +862,7 @@ var _ = Describe("Eventhandler", func() { IsInInitialList: true, }, wq) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)})) Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) @@ -888,7 +888,10 @@ var _ = Describe("Eventhandler", func() { IsInInitialList: false, }, wq) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) + Expect(actualOpts).To(Or( + Equal(priorityqueue.AddOpts{}), + Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}), + )) Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) @@ -919,7 +922,7 @@ var _ = Describe("Eventhandler", func() { }}, }, wq) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)})) Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) @@ -951,7 +954,10 @@ var _ = Describe("Eventhandler", func() { }}, }, wq) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) + Expect(actualOpts).To(Or( + Equal(priorityqueue.AddOpts{}), + Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}), + )) Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 18042e41f6..d0f5be44e6 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -30,6 +30,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" @@ -449,7 +450,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, if errors.Is(err, reconcile.TerminalError(nil)) { ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc() } else { - c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req) } ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() @@ -464,11 +465,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter c.Queue.Forget(req) - c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: ptr.To(priority)}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: //nolint: staticcheck // We have to handle it until it is removed log.V(5).Info("Reconcile done, requeueing") - c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() default: log.V(5).Info("Reconcile successful") diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 71d778c041..74e5f3f2a2 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -702,7 +702,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() - q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) By("Invoking Reconciler which will request a requeue") fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil) @@ -714,7 +714,7 @@ var _ = Describe("controller", func() { }).Should(Equal([]priorityQueueAddition{{ AddOpts: priorityqueue.AddOpts{ RateLimited: true, - Priority: 10, + Priority: ptr.To(10), }, items: []reconcile.Request{request}, }})) @@ -761,7 +761,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() - q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) By("Invoking Reconciler which will ask for RequeueAfter") fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil) @@ -773,7 +773,7 @@ var _ = Describe("controller", func() { }).Should(Equal([]priorityQueueAddition{{ AddOpts: priorityqueue.AddOpts{ After: time.Millisecond * 100, - Priority: 10, + Priority: ptr.To(10), }, items: []reconcile.Request{request}, }})) @@ -819,7 +819,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() - q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) By("Invoking Reconciler which will return an error") fakeReconcile.AddResult(reconcile.Result{}, errors.New("oups, I did it again")) @@ -831,7 +831,7 @@ var _ = Describe("controller", func() { }).Should(Equal([]priorityQueueAddition{{ AddOpts: priorityqueue.AddOpts{ RateLimited: true, - Priority: 10, + Priority: ptr.To(10), }, items: []reconcile.Request{request}, }}))