Skip to content

⚠️ Priorityqueue: Make priority opt a pointer #3289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
type AddOpts struct {
After time.Duration
RateLimited bool
Priority int
// Priority is the priority of the item. Higher values
// indicate higher priority.
// Defaults to zero if unset.
Priority *int
}

// PriorityQueue is a priority queue for a controller. It
Expand Down Expand Up @@ -154,7 +157,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
item := &item[T]{
Key: key,
AddedCounter: w.addedCounter,
Priority: o.Priority,
Priority: ptr.Deref(o.Priority, 0),
ReadyAt: readyAt,
}
w.items[key] = item
Expand All @@ -169,12 +172,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
// The b-tree de-duplicates based on ordering and any change here
// will affect the order - Just delete and re-add.
item, _ := w.queue.Delete(w.items[key])
if o.Priority > item.Priority {
if newPriority := ptr.Deref(o.Priority, 0); newPriority > item.Priority {
// Update depth metric only if the item in the queue was already added to the depth metric.
if item.ReadyAt == nil || w.becameReady.Has(key) {
w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority)
w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority)
}
item.Priority = o.Priority
item.Priority = newPriority
}

if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
Expand Down
13 changes: 7 additions & 6 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
)

var _ = Describe("Controllerworkqueue", func() {
Expand Down Expand Up @@ -106,8 +107,8 @@ var _ = Describe("Controllerworkqueue", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{Priority: 1}, "foo")
q.AddWithOpts(AddOpts{Priority: 2}, "foo")
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo")
q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, "foo")

item, priority, _ := q.GetWithPriority()
Expect(item).To(Equal("foo"))
Expand All @@ -126,7 +127,7 @@ var _ = Describe("Controllerworkqueue", func() {
q.AddWithOpts(AddOpts{}, "foo")
q.AddWithOpts(AddOpts{}, "bar")
q.AddWithOpts(AddOpts{}, "baz")
q.AddWithOpts(AddOpts{Priority: 1}, "baz")
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "baz")

item, priority, _ := q.GetWithPriority()
Expect(item).To(Equal("baz"))
Expand Down Expand Up @@ -381,7 +382,7 @@ var _ = Describe("Controllerworkqueue", func() {
if rn < 10 {
q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i))
} else {
q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i))
q.AddWithOpts(AddOpts{Priority: &rn}, fmt.Sprintf("foo%d", i))
}
}

Expand Down Expand Up @@ -623,8 +624,8 @@ func TestFuzzPriorityQueue(t *testing.T) {
defer inQueueLock.Unlock()

q.AddWithOpts(opts, item)
if existingPriority, exists := inQueue[item]; !exists || existingPriority < opts.Priority {
inQueue[item] = opts.Priority
if existingPriority, exists := inQueue[item]; !exists || existingPriority < ptr.Deref(opts.Priority, 0) {
inQueue[item] = ptr.Deref(opts.Priority, 0)
}
}()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -141,7 +142,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
if !ok {
if lowPriority {
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
Priority: LowPriority,
Priority: ptr.To(LowPriority),
}, req)
} else {
q.Add(req)
Expand Down
8 changes: 4 additions & 4 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
priorityqueue.AddOpts{Priority: priority},
priorityqueue.AddOpts{Priority: &priority},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this only be set if IsInInitialList is true?

(currently we otherwise set it to &0)

(same below)

item,
)
},
Expand Down Expand Up @@ -170,7 +170,7 @@ func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUp
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
priorityqueue.AddOpts{Priority: priority},
priorityqueue.AddOpts{Priority: &priority},
item,
)
},
Expand Down Expand Up @@ -223,7 +223,7 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate
if evt.IsInInitialList {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item)
}

// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
Expand All @@ -239,5 +239,5 @@ func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRate
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item)
}
14 changes: 10 additions & 4 deletions pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ var _ = Describe("Eventhandler", func() {
IsInInitialList: true,
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

Expand All @@ -888,7 +888,10 @@ var _ = Describe("Eventhandler", func() {
IsInInitialList: false,
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualOpts).To(Or(
Equal(priorityqueue.AddOpts{}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if &0 is correct here

(similar below)

Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}),
))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

Expand Down Expand Up @@ -919,7 +922,7 @@ var _ = Describe("Eventhandler", func() {
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

Expand Down Expand Up @@ -951,7 +954,10 @@ var _ = Describe("Eventhandler", func() {
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualOpts).To(Or(
Equal(priorityqueue.AddOpts{}),
Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}),
))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

Expand Down
7 changes: 4 additions & 3 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand Down Expand Up @@ -449,7 +450,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request,
if errors.Is(err, reconcile.TerminalError(nil)) {
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
} else {
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req)
}
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
Expand All @@ -464,11 +465,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request,
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.Queue.Forget(req)
c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req)
c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: ptr.To(priority)}, req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue: //nolint: staticcheck // We have to handle it until it is removed
log.V(5).Info("Reconcile done, requeueing")
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:
log.V(5).Info("Reconcile successful")
Expand Down
12 changes: 6 additions & 6 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ var _ = Describe("controller", func() {
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request)

By("Invoking Reconciler which will request a requeue")
fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil)
Expand All @@ -714,7 +714,7 @@ var _ = Describe("controller", func() {
}).Should(Equal([]priorityQueueAddition{{
AddOpts: priorityqueue.AddOpts{
RateLimited: true,
Priority: 10,
Priority: ptr.To(10),
},
items: []reconcile.Request{request},
}}))
Expand Down Expand Up @@ -761,7 +761,7 @@ var _ = Describe("controller", func() {
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request)

By("Invoking Reconciler which will ask for RequeueAfter")
fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil)
Expand All @@ -773,7 +773,7 @@ var _ = Describe("controller", func() {
}).Should(Equal([]priorityQueueAddition{{
AddOpts: priorityqueue.AddOpts{
After: time.Millisecond * 100,
Priority: 10,
Priority: ptr.To(10),
},
items: []reconcile.Request{request},
}}))
Expand Down Expand Up @@ -819,7 +819,7 @@ var _ = Describe("controller", func() {
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request)

By("Invoking Reconciler which will return an error")
fakeReconcile.AddResult(reconcile.Result{}, errors.New("oups, I did it again"))
Expand All @@ -831,7 +831,7 @@ var _ = Describe("controller", func() {
}).Should(Equal([]priorityQueueAddition{{
AddOpts: priorityqueue.AddOpts{
RateLimited: true,
Priority: 10,
Priority: ptr.To(10),
},
items: []reconcile.Request{request},
}}))
Expand Down