diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 8f9adf2629..72c5e49428 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -195,35 +195,41 @@ func (w *priorityqueue[T]) spin() { w.lockedLock.Lock() defer w.lockedLock.Unlock() - w.queue.Ascend(func(item *item[T]) bool { - if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways - return false - } + for continueLoop := true; continueLoop; { + continueLoop = false + w.queue.Ascend(func(item *item[T]) bool { + if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways + return false + } + + // No next element we can process + if item.readyAt != nil && item.readyAt.After(w.now()) { + readyAt := item.readyAt.Sub(w.now()) + if readyAt <= 0 { // Toctou race with the above check + readyAt = 1 + } + nextReady = w.tick(readyAt) + return false + } - // No next element we can process - if item.readyAt != nil && item.readyAt.After(w.now()) { - readyAt := item.readyAt.Sub(w.now()) - if readyAt <= 0 { // Toctou race with the above check - readyAt = 1 + // Item is locked, we can not hand it out + if w.locked.Has(item.key) { + return true } - nextReady = w.tick(readyAt) + + w.metrics.get(item.key) + w.locked.Insert(item.key) + w.waiters.Add(-1) + delete(w.items, item.key) + w.queue.Delete(item) + w.get <- *item + + // Return false because continuing with Ascend after deleting an item + // can lead to panics within Ascend. + continueLoop = true return false - } - - // Item is locked, we can not hand it out - if w.locked.Has(item.key) { - return true - } - - w.metrics.get(item.key) - w.locked.Insert(item.key) - w.waiters.Add(-1) - delete(w.items, item.key) - w.queue.Delete(item) - w.get <- *item - - return true - }) + }) + } }() } } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 13bd5fc8d3..e5b9ea893b 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -2,6 +2,7 @@ package priorityqueue import ( "fmt" + "math/rand/v2" "sync" "testing" "time" @@ -283,6 +284,41 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(0)) Expect(metrics.adds["test"]).To(Equal(2)) }) + + It("returns many items", func() { + // This test ensures the queue is able to drain a large queue without panic'ing. + // In a previous version of the code we were calling queue.Delete within q.Ascend + // which led to a panic in queue.Ascend > iterate: + // "panic: runtime error: index out of range [0] with length 0" + + q, _ := newQueue() + defer q.ShutDown() + + for range 20 { + for i := range 1000 { + rn := rand.N(100) + if rn < 10 { + q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i)) + } else { + q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i)) + } + } + + wg := sync.WaitGroup{} + for range 100 { // The panic only occurred relatively frequently with a high number of go routines. + wg.Add(1) + go func() { + defer wg.Done() + for range 10 { + obj, _, _ := q.GetWithPriority() + q.Done(obj) + } + }() + } + + wg.Wait() + } + }) }) func BenchmarkAddGetDone(b *testing.B) {