From ee91144a923307a87431106205978e4cca76b953 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Fri, 26 Sep 2025 18:07:24 +0200 Subject: [PATCH 1/8] fix: adjust priority queue order and spin Co-authored-by: kstiehl --- pkg/controller/priorityqueue/priorityqueue.go | 90 ++++++++++++------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index f702600fc9..5fb6a1222a 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -1,6 +1,7 @@ package priorityqueue import ( + "math" "sync" "sync/atomic" "time" @@ -206,6 +207,7 @@ func (w *priorityqueue[T]) spin() { blockForever := make(chan time.Time) var nextReady <-chan time.Time nextReady = blockForever + var nextItemReadyAt time.Time for { select { @@ -213,10 +215,10 @@ func (w *priorityqueue[T]) spin() { return case <-w.itemOrWaiterAdded: case <-nextReady: + nextReady = blockForever + nextItemReadyAt = time.Time{} } - nextReady = blockForever - func() { w.lock.Lock() defer w.lock.Unlock() @@ -227,39 +229,62 @@ func (w *priorityqueue[T]) spin() { // manipulating the tree from within Ascend might lead to panics, so // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] - w.queue.Ascend(func(item *item[T]) bool { - if item.ReadyAt != nil { - if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { - nextReady = w.tick(readyAt) - return false + + var key T + pivot := item[T]{ + Key: key, + AddedCounter: 0, + Priority: math.MaxInt, + ReadyAt: nil, + } + + for { + pivotChange := false + + w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { + // Item is locked, we can not hand it out + if w.locked.Has(item.Key) { + return true } - if !w.becameReady.Has(item.Key) { - w.metrics.add(item.Key, item.Priority) - w.becameReady.Insert(item.Key) + + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { + if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() { + nextReady = w.tick(readyAt) + nextItemReadyAt = *item.ReadyAt + } + + pivot.Priority = item.Priority - 1 + pivotChange = true + return false + } + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key, item.Priority) + w.becameReady.Insert(item.Key) + } } - } - if w.waiters.Load() == 0 { - // Have to keep iterating here to ensure we update metrics - // for further items that became ready and set nextReady. - return true - } + if w.waiters.Load() == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true + } - // Item is locked, we can not hand it out - if w.locked.Has(item.Key) { - return true - } + w.metrics.get(item.Key, item.Priority) + w.locked.Insert(item.Key) + w.waiters.Add(-1) + delete(w.items, item.Key) + toDelete = append(toDelete, item) + w.becameReady.Delete(item.Key) + w.get <- *item - w.metrics.get(item.Key, item.Priority) - w.locked.Insert(item.Key) - w.waiters.Add(-1) - delete(w.items, item.Key) - toDelete = append(toDelete, item) - w.becameReady.Delete(item.Key) - w.get <- *item + return true + }) - return true - }) + if !pivotChange { + break + } + } for _, item := range toDelete { w.queue.Delete(item) @@ -387,6 +412,9 @@ func (w *priorityqueue[T]) logState() { } func less[T comparable](a, b *item[T]) bool { + if a.Priority != b.Priority { + return a.Priority > b.Priority + } if a.ReadyAt == nil && b.ReadyAt != nil { return true } @@ -396,9 +424,6 @@ func less[T comparable](a, b *item[T]) bool { if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { return a.ReadyAt.Before(*b.ReadyAt) } - if a.Priority != b.Priority { - return a.Priority > b.Priority - } return a.AddedCounter < b.AddedCounter } @@ -426,4 +451,5 @@ type bTree[T any] interface { ReplaceOrInsert(item T) (_ T, _ bool) Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) + AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T]) } From 5318c5dd2299506f21d57dd06ddaa6ef3b916942 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Fri, 26 Sep 2025 18:07:24 +0200 Subject: [PATCH 2/8] fix: do not hand out item during metrics ascend Co-authored-by: kstiehl --- pkg/controller/priorityqueue/priorityqueue.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 5fb6a1222a..c615de3e79 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -239,6 +239,7 @@ func (w *priorityqueue[T]) spin() { } for { + metricsAscend := false pivotChange := false w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { @@ -264,7 +265,8 @@ func (w *priorityqueue[T]) spin() { } } - if w.waiters.Load() == 0 { + if w.waiters.Load() == 0 || metricsAscend { + metricsAscend = true // Have to keep iterating here to ensure we update metrics // for further items that became ready and set nextReady. return true From 2b0bb57d1bb92d4e53b6f9cfa34e9c02f76de7f1 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Fri, 26 Sep 2025 18:07:24 +0200 Subject: [PATCH 3/8] test: add test case Co-authored-by: kstiehl --- .../priorityqueue/priorityqueue_test.go | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 653f770043..341df84a55 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -184,6 +184,66 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) + It("returns high priority item that became ready before low priority items", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*priorityqueue[string]) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + cwq.tick = func(d time.Duration) <-chan time.Time { + Expect(d).To(Equal(time.Second)) + return tick + } + + retrievedItem := make(chan any) + getNext := make(chan any) + + go func() { + defer GinkgoRecover() + defer close(retrievedItem) + + key, prio, _ := q.GetWithPriority() + Expect(key).To(Equal("foo")) + Expect(prio).To(Equal(-100)) + + retrievedItem <- nil + <-getNext + + key, prio, _ = q.GetWithPriority() + Expect(key).To(Equal("prio")) + Expect(prio).To(Equal(0)) + }() + + lowPriority := -100 + highPriority := 0 + q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo") + q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "bar") + q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio") + + <-retrievedItem + + nowLock.Lock() + now = now.Add(time.Second) + nowLock.Unlock() + tick <- now + getNext <- nil + + Eventually(retrievedItem).Should(BeClosed()) + close(getNext) + + Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0})) + Expect(metrics.adds["test"]).To(Equal(3)) + Expect(metrics.retries["test"]).To(Equal(1)) + }) + It("returns an item to a waiter as soon as it has one", func() { q, metrics := newQueue() defer q.ShutDown() From 571109fcb936faa5982b37b7432cdef1a2333b3f Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Thu, 2 Oct 2025 17:31:41 +0200 Subject: [PATCH 4/8] rm async from test --- .../priorityqueue/priorityqueue_test.go | 34 +++++-------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 341df84a55..419e319e1c 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -184,13 +184,14 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) - It("returns high priority item that became ready before low priority items", func() { + It("returns high priority item that became ready before low priority item", func() { q, metrics := newQueue() defer q.ShutDown() now := time.Now().Round(time.Second) nowLock := sync.Mutex{} tick := make(chan time.Time) + tickSetup := make(chan any) cwq := q.(*priorityqueue[string]) cwq.now = func() time.Time { @@ -200,47 +201,28 @@ var _ = Describe("Controllerworkqueue", func() { } cwq.tick = func(d time.Duration) <-chan time.Time { Expect(d).To(Equal(time.Second)) + close(tickSetup) return tick } - retrievedItem := make(chan any) - getNext := make(chan any) - - go func() { - defer GinkgoRecover() - defer close(retrievedItem) - - key, prio, _ := q.GetWithPriority() - Expect(key).To(Equal("foo")) - Expect(prio).To(Equal(-100)) - - retrievedItem <- nil - <-getNext - - key, prio, _ = q.GetWithPriority() - Expect(key).To(Equal("prio")) - Expect(prio).To(Equal(0)) - }() - lowPriority := -100 highPriority := 0 q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo") - q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "bar") q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio") - <-retrievedItem + Eventually(tickSetup).Should(BeClosed()) nowLock.Lock() now = now.Add(time.Second) nowLock.Unlock() tick <- now - getNext <- nil - Eventually(retrievedItem).Should(BeClosed()) - close(getNext) + key, prio, _ := q.GetWithPriority() + Expect(key).To(Equal("prio")) + Expect(prio).To(Equal(0)) Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0})) - Expect(metrics.adds["test"]).To(Equal(3)) + Expect(metrics.adds["test"]).To(Equal(2)) Expect(metrics.retries["test"]).To(Equal(1)) }) From d131e29503cc726620308428a2e5b3a3dfbd21ee Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Thu, 2 Oct 2025 17:32:11 +0200 Subject: [PATCH 5/8] rm metricsAscend flag --- pkg/controller/priorityqueue/priorityqueue.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index c615de3e79..5fb6a1222a 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -239,7 +239,6 @@ func (w *priorityqueue[T]) spin() { } for { - metricsAscend := false pivotChange := false w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { @@ -265,8 +264,7 @@ func (w *priorityqueue[T]) spin() { } } - if w.waiters.Load() == 0 || metricsAscend { - metricsAscend = true + if w.waiters.Load() == 0 { // Have to keep iterating here to ensure we update metrics // for further items that became ready and set nextReady. return true From bd24bb38eeab2ad79516f97d4e1e11cc84ad9187 Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:24:03 +0200 Subject: [PATCH 6/8] fix test Co-authored-by: kstiehl --- .../priorityqueue/priorityqueue_test.go | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 419e319e1c..e18a6393eb 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -185,24 +185,15 @@ var _ = Describe("Controllerworkqueue", func() { }) It("returns high priority item that became ready before low priority item", func() { - q, metrics := newQueue() + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() defer q.ShutDown() - now := time.Now().Round(time.Second) - nowLock := sync.Mutex{} - tick := make(chan time.Time) tickSetup := make(chan any) - - cwq := q.(*priorityqueue[string]) - cwq.now = func() time.Time { - nowLock.Lock() - defer nowLock.Unlock() - return now - } - cwq.tick = func(d time.Duration) <-chan time.Time { + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { Expect(d).To(Equal(time.Second)) close(tickSetup) - return tick + return originalTick(d) } lowPriority := -100 @@ -212,11 +203,7 @@ var _ = Describe("Controllerworkqueue", func() { Eventually(tickSetup).Should(BeClosed()) - nowLock.Lock() - now = now.Add(time.Second) - nowLock.Unlock() - tick <- now - + forwardQueueTimeBy(1 * time.Second) key, prio, _ := q.GetWithPriority() Expect(key).To(Equal("prio")) From d244c7c9fca6d86d76ff1a83641ee050c87d9c8b Mon Sep 17 00:00:00 2001 From: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:25:27 +0200 Subject: [PATCH 7/8] add comments Co-authored-by: kstiehl --- pkg/controller/priorityqueue/priorityqueue.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 5fb6a1222a..1c0f42de22 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -231,6 +231,10 @@ func (w *priorityqueue[T]) spin() { var toDelete []*item[T] var key T + + // Items in the queue tree are sorted first by priority and second by readiness, so + // items with a lower priority might be ready further down in the queue. + // In search for ready items we use the pivot item to skip through priorities without ascending the whole tree. pivot := item[T]{ Key: key, AddedCounter: 0, @@ -254,6 +258,7 @@ func (w *priorityqueue[T]) spin() { nextItemReadyAt = *item.ReadyAt } + // Adjusting the pivot item moves the ascend to the next lower priority pivot.Priority = item.Priority - 1 pivotChange = true return false From 8c5382f4ffba2f71f17a8b11cd0ebccb61ea9c74 Mon Sep 17 00:00:00 2001 From: Moritz <51033452+moritzmoe@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:57:40 +0200 Subject: [PATCH 8/8] Update pkg/controller/priorityqueue/priorityqueue.go Co-authored-by: Alvaro Aleman --- pkg/controller/priorityqueue/priorityqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 1c0f42de22..98df84c56b 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -234,7 +234,7 @@ func (w *priorityqueue[T]) spin() { // Items in the queue tree are sorted first by priority and second by readiness, so // items with a lower priority might be ready further down in the queue. - // In search for ready items we use the pivot item to skip through priorities without ascending the whole tree. + // We iterate through the priorities high to low until we find a ready item pivot := item[T]{ Key: key, AddedCounter: 0,