From ec504bf5b5eeb4081559264228d17a042d9d4dc8 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Fri, 26 Sep 2025 18:07:24 +0200 Subject: [PATCH 1/9] fix: adjust priority queue order and spin Co-authored-by: kstiehl --- pkg/controller/priorityqueue/priorityqueue.go | 90 ++++++++++++------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index f702600fc9..5fb6a1222a 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -1,6 +1,7 @@ package priorityqueue import ( + "math" "sync" "sync/atomic" "time" @@ -206,6 +207,7 @@ func (w *priorityqueue[T]) spin() { blockForever := make(chan time.Time) var nextReady <-chan time.Time nextReady = blockForever + var nextItemReadyAt time.Time for { select { @@ -213,10 +215,10 @@ func (w *priorityqueue[T]) spin() { return case <-w.itemOrWaiterAdded: case <-nextReady: + nextReady = blockForever + nextItemReadyAt = time.Time{} } - nextReady = blockForever - func() { w.lock.Lock() defer w.lock.Unlock() @@ -227,39 +229,62 @@ func (w *priorityqueue[T]) spin() { // manipulating the tree from within Ascend might lead to panics, so // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] - w.queue.Ascend(func(item *item[T]) bool { - if item.ReadyAt != nil { - if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { - nextReady = w.tick(readyAt) - return false + + var key T + pivot := item[T]{ + Key: key, + AddedCounter: 0, + Priority: math.MaxInt, + ReadyAt: nil, + } + + for { + pivotChange := false + + w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { + // Item is locked, we can not hand it out + if w.locked.Has(item.Key) { + return true } - if !w.becameReady.Has(item.Key) { - w.metrics.add(item.Key, item.Priority) - w.becameReady.Insert(item.Key) + + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { + if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() { + nextReady = w.tick(readyAt) + nextItemReadyAt = *item.ReadyAt + } + + pivot.Priority = item.Priority - 1 + pivotChange = true + return false + } + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key, item.Priority) + w.becameReady.Insert(item.Key) + } } - } - if w.waiters.Load() == 0 { - // Have to keep iterating here to ensure we update metrics - // for further items that became ready and set nextReady. - return true - } + if w.waiters.Load() == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true + } - // Item is locked, we can not hand it out - if w.locked.Has(item.Key) { - return true - } + w.metrics.get(item.Key, item.Priority) + w.locked.Insert(item.Key) + w.waiters.Add(-1) + delete(w.items, item.Key) + toDelete = append(toDelete, item) + w.becameReady.Delete(item.Key) + w.get <- *item - w.metrics.get(item.Key, item.Priority) - w.locked.Insert(item.Key) - w.waiters.Add(-1) - delete(w.items, item.Key) - toDelete = append(toDelete, item) - w.becameReady.Delete(item.Key) - w.get <- *item + return true + }) - return true - }) + if !pivotChange { + break + } + } for _, item := range toDelete { w.queue.Delete(item) @@ -387,6 +412,9 @@ func (w *priorityqueue[T]) logState() { } func less[T comparable](a, b *item[T]) bool { + if a.Priority != b.Priority { + return a.Priority > b.Priority + } if a.ReadyAt == nil && b.ReadyAt != nil { return true } @@ -396,9 +424,6 @@ func less[T comparable](a, b *item[T]) bool { if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { return a.ReadyAt.Before(*b.ReadyAt) } - if a.Priority != b.Priority { - return a.Priority > b.Priority - } return a.AddedCounter < b.AddedCounter } @@ -426,4 +451,5 @@ type bTree[T any] interface { ReplaceOrInsert(item T) (_ T, _ bool) Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) + AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T]) } From deda43ea89aed022866ac4bd097784cf8a118ec1 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Fri, 26 Sep 2025 18:07:24 +0200 Subject: [PATCH 2/9] fix: do not hand out item during metrics ascend Co-authored-by: kstiehl --- pkg/controller/priorityqueue/priorityqueue.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 5fb6a1222a..c615de3e79 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -239,6 +239,7 @@ func (w *priorityqueue[T]) spin() { } for { + metricsAscend := false pivotChange := false w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { @@ -264,7 +265,8 @@ func (w *priorityqueue[T]) spin() { } } - if w.waiters.Load() == 0 { + if w.waiters.Load() == 0 || metricsAscend { + metricsAscend = true // Have to keep iterating here to ensure we update metrics // for further items that became ready and set nextReady. return true From cc98601b0fed8a634060418d9ab446f2840ab4a1 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Fri, 26 Sep 2025 18:07:24 +0200 Subject: [PATCH 3/9] test: add test case Co-authored-by: kstiehl --- .../priorityqueue/priorityqueue_test.go | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 6127cd99ba..4ee17ddca6 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -197,6 +197,66 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) + It("returns high priority item that became ready before low priority items", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*priorityqueue[string]) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + cwq.tick = func(d time.Duration) <-chan time.Time { + Expect(d).To(Equal(time.Second)) + return tick + } + + retrievedItem := make(chan any) + getNext := make(chan any) + + go func() { + defer GinkgoRecover() + defer close(retrievedItem) + + key, prio, _ := q.GetWithPriority() + Expect(key).To(Equal("foo")) + Expect(prio).To(Equal(-100)) + + retrievedItem <- nil + <-getNext + + key, prio, _ = q.GetWithPriority() + Expect(key).To(Equal("prio")) + Expect(prio).To(Equal(0)) + }() + + lowPriority := -100 + highPriority := 0 + q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo") + q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "bar") + q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio") + + <-retrievedItem + + nowLock.Lock() + now = now.Add(time.Second) + nowLock.Unlock() + tick <- now + getNext <- nil + + Eventually(retrievedItem).Should(BeClosed()) + close(getNext) + + Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0})) + Expect(metrics.adds["test"]).To(Equal(3)) + Expect(metrics.retries["test"]).To(Equal(1)) + }) + It("returns an item to a waiter as soon as it has one", func() { q, metrics := newQueue() defer q.ShutDown() From 3a6a206d412668219c0d92b68ca6ef58d58e157c Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Thu, 2 Oct 2025 17:31:41 +0200 Subject: [PATCH 4/9] rm async from test --- .../priorityqueue/priorityqueue_test.go | 34 +++++-------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 4ee17ddca6..3769252580 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -197,13 +197,14 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) - It("returns high priority item that became ready before low priority items", func() { + It("returns high priority item that became ready before low priority item", func() { q, metrics := newQueue() defer q.ShutDown() now := time.Now().Round(time.Second) nowLock := sync.Mutex{} tick := make(chan time.Time) + tickSetup := make(chan any) cwq := q.(*priorityqueue[string]) cwq.now = func() time.Time { @@ -213,47 +214,28 @@ var _ = Describe("Controllerworkqueue", func() { } cwq.tick = func(d time.Duration) <-chan time.Time { Expect(d).To(Equal(time.Second)) + close(tickSetup) return tick } - retrievedItem := make(chan any) - getNext := make(chan any) - - go func() { - defer GinkgoRecover() - defer close(retrievedItem) - - key, prio, _ := q.GetWithPriority() - Expect(key).To(Equal("foo")) - Expect(prio).To(Equal(-100)) - - retrievedItem <- nil - <-getNext - - key, prio, _ = q.GetWithPriority() - Expect(key).To(Equal("prio")) - Expect(prio).To(Equal(0)) - }() - lowPriority := -100 highPriority := 0 q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo") - q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "bar") q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio") - <-retrievedItem + Eventually(tickSetup).Should(BeClosed()) nowLock.Lock() now = now.Add(time.Second) nowLock.Unlock() tick <- now - getNext <- nil - Eventually(retrievedItem).Should(BeClosed()) - close(getNext) + key, prio, _ := q.GetWithPriority() + Expect(key).To(Equal("prio")) + Expect(prio).To(Equal(0)) Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0})) - Expect(metrics.adds["test"]).To(Equal(3)) + Expect(metrics.adds["test"]).To(Equal(2)) Expect(metrics.retries["test"]).To(Equal(1)) }) From a22adbefdfdf0476690be8bdaaca70cdca561f39 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Thu, 2 Oct 2025 17:32:11 +0200 Subject: [PATCH 5/9] rm metricsAscend flag --- pkg/controller/priorityqueue/priorityqueue.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index c615de3e79..5fb6a1222a 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -239,7 +239,6 @@ func (w *priorityqueue[T]) spin() { } for { - metricsAscend := false pivotChange := false w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { @@ -265,8 +264,7 @@ func (w *priorityqueue[T]) spin() { } } - if w.waiters.Load() == 0 || metricsAscend { - metricsAscend = true + if w.waiters.Load() == 0 { // Have to keep iterating here to ensure we update metrics // for further items that became ready and set nextReady. return true From 5f2c4ea2e753d654a7b8ff859f2fc95aec3ff733 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:24:03 +0200 Subject: [PATCH 6/9] fix test Co-authored-by: kstiehl --- .../priorityqueue/priorityqueue_test.go | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 3769252580..11b7b7ceb7 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -198,24 +198,15 @@ var _ = Describe("Controllerworkqueue", func() { }) It("returns high priority item that became ready before low priority item", func() { - q, metrics := newQueue() + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() defer q.ShutDown() - now := time.Now().Round(time.Second) - nowLock := sync.Mutex{} - tick := make(chan time.Time) tickSetup := make(chan any) - - cwq := q.(*priorityqueue[string]) - cwq.now = func() time.Time { - nowLock.Lock() - defer nowLock.Unlock() - return now - } - cwq.tick = func(d time.Duration) <-chan time.Time { + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { Expect(d).To(Equal(time.Second)) close(tickSetup) - return tick + return originalTick(d) } lowPriority := -100 @@ -225,11 +216,7 @@ var _ = Describe("Controllerworkqueue", func() { Eventually(tickSetup).Should(BeClosed()) - nowLock.Lock() - now = now.Add(time.Second) - nowLock.Unlock() - tick <- now - + forwardQueueTimeBy(1 * time.Second) key, prio, _ := q.GetWithPriority() Expect(key).To(Equal("prio")) From fb1dd4f213e55de4b00919075b257a86df1f81f4 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:25:27 +0200 Subject: [PATCH 7/9] add comments Co-authored-by: kstiehl --- pkg/controller/priorityqueue/priorityqueue.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 5fb6a1222a..1c0f42de22 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -231,6 +231,10 @@ func (w *priorityqueue[T]) spin() { var toDelete []*item[T] var key T + + // Items in the queue tree are sorted first by priority and second by readiness, so + // items with a lower priority might be ready further down in the queue. + // In search for ready items we use the pivot item to skip through priorities without ascending the whole tree. pivot := item[T]{ Key: key, AddedCounter: 0, @@ -254,6 +258,7 @@ func (w *priorityqueue[T]) spin() { nextItemReadyAt = *item.ReadyAt } + // Adjusting the pivot item moves the ascend to the next lower priority pivot.Priority = item.Priority - 1 pivotChange = true return false From 55e3426e5f381d424fe7cceaab5599db85a01b66 Mon Sep 17 00:00:00 2001 From: Moritz <51033452+moritzmoe@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:57:40 +0200 Subject: [PATCH 8/9] Update pkg/controller/priorityqueue/priorityqueue.go Co-authored-by: Alvaro Aleman --- pkg/controller/priorityqueue/priorityqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 1c0f42de22..98df84c56b 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -234,7 +234,7 @@ func (w *priorityqueue[T]) spin() { // Items in the queue tree are sorted first by priority and second by readiness, so // items with a lower priority might be ready further down in the queue. - // In search for ready items we use the pivot item to skip through priorities without ascending the whole tree. + // We iterate through the priorities high to low until we find a ready item pivot := item[T]{ Key: key, AddedCounter: 0, From 7bb8ed28f4abe15e1640343cca9fd52c4a9b2c28 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Mon, 6 Oct 2025 11:48:24 -0400 Subject: [PATCH 9/9] Rebase for releasebranch without newQueueWithTimeForwarder --- .../priorityqueue/priorityqueue_test.go | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 11b7b7ceb7..d0cc51f7c5 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -198,15 +198,24 @@ var _ = Describe("Controllerworkqueue", func() { }) It("returns high priority item that became ready before low priority item", func() { - q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + q, metrics := newQueue() defer q.ShutDown() + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*priorityqueue[string]) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } tickSetup := make(chan any) - originalTick := q.tick - q.tick = func(d time.Duration) <-chan time.Time { + cwq.tick = func(d time.Duration) <-chan time.Time { Expect(d).To(Equal(time.Second)) close(tickSetup) - return originalTick(d) + return tick } lowPriority := -100 @@ -216,7 +225,10 @@ var _ = Describe("Controllerworkqueue", func() { Eventually(tickSetup).Should(BeClosed()) - forwardQueueTimeBy(1 * time.Second) + nowLock.Lock() + now = now.Add(time.Second) + nowLock.Unlock() + tick <- now key, prio, _ := q.GetWithPriority() Expect(key).To(Equal("prio"))