File tree Expand file tree Collapse file tree 2 files changed +43
-1
lines changed
pkg/controller/priorityqueue Expand file tree Collapse file tree 2 files changed +43
-1
lines changed Original file line number Diff line number Diff line change @@ -203,6 +203,9 @@ func (w *priorityqueue[T]) spin() {
203203 w .lockedLock .Lock ()
204204 defer w .lockedLock .Unlock ()
205205
206+ // manipulating the tree from within Ascend might lead to panics, so
207+ // track what we want to delete and do it after we are done ascending.
208+ var toDelete []* item [T ]
206209 w .queue .Ascend (func (item * item [T ]) bool {
207210 if item .readyAt != nil {
208211 if readyAt := item .readyAt .Sub (w .now ()); readyAt > 0 {
@@ -230,12 +233,16 @@ func (w *priorityqueue[T]) spin() {
230233 w .locked .Insert (item .key )
231234 w .waiters .Add (- 1 )
232235 delete (w .items , item .key )
233- w . queue . Delete ( item )
236+ toDelete = append ( toDelete , item )
234237 w .becameReady .Delete (item .key )
235238 w .get <- * item
236239
237240 return true
238241 })
242+
243+ for _ , item := range toDelete {
244+ w .queue .Delete (item )
245+ }
239246 }()
240247 }
241248}
Original file line number Diff line number Diff line change @@ -2,6 +2,7 @@ package priorityqueue
22
33import (
44 "fmt"
5+ "math/rand/v2"
56 "sync"
67 "testing"
78 "time"
@@ -344,6 +345,40 @@ var _ = Describe("Controllerworkqueue", func() {
344345 Expect (metrics .depth ["test" ]).To (Equal (4 ))
345346 metrics .mu .Unlock ()
346347 })
348+
349+ It ("returns many items" , func () {
350+ // This test ensures the queue is able to drain a large queue without panic'ing.
351+ // In a previous version of the code we were calling queue.Delete within q.Ascend
352+ // which led to a panic in queue.Ascend > iterate:
353+ // "panic: runtime error: index out of range [0] with length 0"
354+ q , _ := newQueue ()
355+ defer q .ShutDown ()
356+
357+ for range 20 {
358+ for i := range 1000 {
359+ rn := rand .N (100 ) //nolint:gosec // We don't need cryptographically secure entropy here
360+ if rn < 10 {
361+ q .AddWithOpts (AddOpts {After : time .Duration (rn ) * time .Millisecond }, fmt .Sprintf ("foo%d" , i ))
362+ } else {
363+ q .AddWithOpts (AddOpts {Priority : rn }, fmt .Sprintf ("foo%d" , i ))
364+ }
365+ }
366+
367+ wg := sync.WaitGroup {}
368+ for range 100 { // The panic only occurred relatively frequently with a high number of go routines.
369+ wg .Add (1 )
370+ go func () {
371+ defer wg .Done ()
372+ for range 10 {
373+ obj , _ , _ := q .GetWithPriority ()
374+ q .Done (obj )
375+ }
376+ }()
377+ }
378+
379+ wg .Wait ()
380+ }
381+ })
347382})
348383
349384func BenchmarkAddGetDone (b * testing.B ) {
You can’t perform that action at this time.
0 commit comments