From 31d30b6696c75a8b6fc29a0b85b7686eefecca72 Mon Sep 17 00:00:00 2001 From: zach593 Date: Fri, 2 Jan 2026 11:02:03 +0800 Subject: [PATCH] fix priority queue ordering when item priority changes Signed-off-by: zach593 --- pkg/controller/priorityqueue/priorityqueue.go | 11 +++++++++- .../priorityqueue/priorityqueue_test.go | 20 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 71363f0d17..c0e291c2a1 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -30,6 +30,13 @@ type AddOpts struct { // internally de-duplicates all items that are added to // it. It will use the max of the passed priorities and the // min of possible durations. +// +// When an item that is already enqueued at a lower priority +// is re-enqueued with a higher priority, it will be placed at +// the end among items of the new priority, in order to +// preserve FIFO semantics within each priority level. +// The effective duration (i.e. the ready time) is still +// computed as the minimum across all enqueues. type PriorityQueue[T comparable] interface { workqueue.TypedRateLimitingInterface[T] AddWithOpts(o AddOpts, Items ...T) @@ -161,12 +168,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { Priority: ptr.Deref(o.Priority, 0), ReadyAt: readyAt, } + w.addedCounter++ w.items[key] = item w.queue.ReplaceOrInsert(item) if item.ReadyAt == nil { w.metrics.add(key, item.Priority) } - w.addedCounter++ continue } @@ -179,6 +186,8 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority) } item.Priority = newPriority + item.AddedCounter = w.addedCounter + w.addedCounter++ } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 8260126e12..989febdbcf 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -320,6 +320,26 @@ var _ = Describe("Controllerworkqueue", func() { Expect(depth).To(Equal(0)) } }) + + It("follows FIFO order in the new priority queue when item priority changes", func() { + q, _ := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{Priority: ptr.To(0)}, "foo") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "bar") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo") + Expect(q.Len()).To(Equal(2)) + + item, priority, _ := q.GetWithPriority() + Expect(item).To(Equal("bar")) + Expect(priority).To(Equal(1)) + Expect(q.Len()).To(Equal(1)) + + item, priority, _ = q.GetWithPriority() + Expect(item).To(Equal("foo")) + Expect(priority).To(Equal(1)) + Expect(q.Len()).To(Equal(0)) + }) }) func BenchmarkAddGetDone(b *testing.B) {