diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 71363f0d17..c0e291c2a1 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -30,6 +30,13 @@ type AddOpts struct { // internally de-duplicates all items that are added to // it. It will use the max of the passed priorities and the // min of possible durations. +// +// When an item that is already enqueued at a lower priority +// is re-enqueued with a higher priority, it will be placed at +// the end among items of the new priority, in order to +// preserve FIFO semantics within each priority level. +// The effective duration (i.e. the ready time) is still +// computed as the minimum across all enqueues. type PriorityQueue[T comparable] interface { workqueue.TypedRateLimitingInterface[T] AddWithOpts(o AddOpts, Items ...T) @@ -161,12 +168,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { Priority: ptr.Deref(o.Priority, 0), ReadyAt: readyAt, } + w.addedCounter++ w.items[key] = item w.queue.ReplaceOrInsert(item) if item.ReadyAt == nil { w.metrics.add(key, item.Priority) } - w.addedCounter++ continue } @@ -179,6 +186,8 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority) } item.Priority = newPriority + item.AddedCounter = w.addedCounter + w.addedCounter++ } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 8260126e12..989febdbcf 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -320,6 +320,26 @@ var _ = Describe("Controllerworkqueue", func() { Expect(depth).To(Equal(0)) } }) + + It("follows FIFO order in the new priority queue when item priority changes", func() { + q, _ := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{Priority: ptr.To(0)}, "foo") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "bar") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo") + Expect(q.Len()).To(Equal(2)) + + item, priority, _ := q.GetWithPriority() + Expect(item).To(Equal("bar")) + Expect(priority).To(Equal(1)) + Expect(q.Len()).To(Equal(1)) + + item, priority, _ = q.GetWithPriority() + Expect(item).To(Equal("foo")) + Expect(priority).To(Equal(1)) + Expect(q.Len()).To(Equal(0)) + }) }) func BenchmarkAddGetDone(b *testing.B) {