Skip to content

Commit 975716b

Browse files
moritzmoekstiehlalvaroaleman
authored
🐛 Fix a bug where the priorityqueue would sometimes not return high-priority items first (#3330)
* fix: adjust priority queue order and spin Co-authored-by: kstiehl <[email protected]> * fix: do not hand out item during metrics ascend Co-authored-by: kstiehl <[email protected]> * test: add test case Co-authored-by: kstiehl <[email protected]> * rm async from test * rm metricsAscend flag * fix test Co-authored-by: kstiehl <[email protected]> * add comments Co-authored-by: kstiehl <[email protected]> * Update pkg/controller/priorityqueue/priorityqueue.go Co-authored-by: Alvaro Aleman <[email protected]> --------- Co-authored-by: kstiehl <[email protected]> Co-authored-by: Alvaro Aleman <[email protected]>
1 parent b0b0b0f commit 975716b

File tree

2 files changed

+92
-32
lines changed

2 files changed

+92
-32
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package priorityqueue
22

33
import (
4+
"math"
45
"sync"
56
"sync/atomic"
67
"time"
@@ -206,17 +207,18 @@ func (w *priorityqueue[T]) spin() {
206207
blockForever := make(chan time.Time)
207208
var nextReady <-chan time.Time
208209
nextReady = blockForever
210+
var nextItemReadyAt time.Time
209211

210212
for {
211213
select {
212214
case <-w.done:
213215
return
214216
case <-w.itemOrWaiterAdded:
215217
case <-nextReady:
218+
nextReady = blockForever
219+
nextItemReadyAt = time.Time{}
216220
}
217221

218-
nextReady = blockForever
219-
220222
func() {
221223
w.lock.Lock()
222224
defer w.lock.Unlock()
@@ -227,39 +229,67 @@ func (w *priorityqueue[T]) spin() {
227229
// manipulating the tree from within Ascend might lead to panics, so
228230
// track what we want to delete and do it after we are done ascending.
229231
var toDelete []*item[T]
230-
w.queue.Ascend(func(item *item[T]) bool {
231-
if item.ReadyAt != nil {
232-
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
233-
nextReady = w.tick(readyAt)
234-
return false
232+
233+
var key T
234+
235+
// Items in the queue tree are sorted first by priority and second by readiness, so
236+
// items with a lower priority might be ready further down in the queue.
237+
// We iterate through the priorities high to low until we find a ready item
238+
pivot := item[T]{
239+
Key: key,
240+
AddedCounter: 0,
241+
Priority: math.MaxInt,
242+
ReadyAt: nil,
243+
}
244+
245+
for {
246+
pivotChange := false
247+
248+
w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool {
249+
// Item is locked, we can not hand it out
250+
if w.locked.Has(item.Key) {
251+
return true
235252
}
236-
if !w.becameReady.Has(item.Key) {
237-
w.metrics.add(item.Key, item.Priority)
238-
w.becameReady.Insert(item.Key)
253+
254+
if item.ReadyAt != nil {
255+
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
256+
if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() {
257+
nextReady = w.tick(readyAt)
258+
nextItemReadyAt = *item.ReadyAt
259+
}
260+
261+
// Adjusting the pivot item moves the ascend to the next lower priority
262+
pivot.Priority = item.Priority - 1
263+
pivotChange = true
264+
return false
265+
}
266+
if !w.becameReady.Has(item.Key) {
267+
w.metrics.add(item.Key, item.Priority)
268+
w.becameReady.Insert(item.Key)
269+
}
239270
}
240-
}
241271

242-
if w.waiters.Load() == 0 {
243-
// Have to keep iterating here to ensure we update metrics
244-
// for further items that became ready and set nextReady.
245-
return true
246-
}
272+
if w.waiters.Load() == 0 {
273+
// Have to keep iterating here to ensure we update metrics
274+
// for further items that became ready and set nextReady.
275+
return true
276+
}
247277

248-
// Item is locked, we can not hand it out
249-
if w.locked.Has(item.Key) {
250-
return true
251-
}
278+
w.metrics.get(item.Key, item.Priority)
279+
w.locked.Insert(item.Key)
280+
w.waiters.Add(-1)
281+
delete(w.items, item.Key)
282+
toDelete = append(toDelete, item)
283+
w.becameReady.Delete(item.Key)
284+
w.get <- *item
252285

253-
w.metrics.get(item.Key, item.Priority)
254-
w.locked.Insert(item.Key)
255-
w.waiters.Add(-1)
256-
delete(w.items, item.Key)
257-
toDelete = append(toDelete, item)
258-
w.becameReady.Delete(item.Key)
259-
w.get <- *item
286+
return true
287+
})
260288

261-
return true
262-
})
289+
if !pivotChange {
290+
break
291+
}
292+
}
263293

264294
for _, item := range toDelete {
265295
w.queue.Delete(item)
@@ -387,6 +417,9 @@ func (w *priorityqueue[T]) logState() {
387417
}
388418

389419
func less[T comparable](a, b *item[T]) bool {
420+
if a.Priority != b.Priority {
421+
return a.Priority > b.Priority
422+
}
390423
if a.ReadyAt == nil && b.ReadyAt != nil {
391424
return true
392425
}
@@ -396,9 +429,6 @@ func less[T comparable](a, b *item[T]) bool {
396429
if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) {
397430
return a.ReadyAt.Before(*b.ReadyAt)
398431
}
399-
if a.Priority != b.Priority {
400-
return a.Priority > b.Priority
401-
}
402432

403433
return a.AddedCounter < b.AddedCounter
404434
}
@@ -426,4 +456,5 @@ type bTree[T any] interface {
426456
ReplaceOrInsert(item T) (_ T, _ bool)
427457
Delete(item T) (T, bool)
428458
Ascend(iterator btree.ItemIteratorG[T])
459+
AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T])
429460
}

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,35 @@ var _ = Describe("Controllerworkqueue", func() {
184184
Expect(metrics.retries["test"]).To(Equal(1))
185185
})
186186

187+
It("returns high priority item that became ready before low priority item", func() {
188+
q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder()
189+
defer q.ShutDown()
190+
191+
tickSetup := make(chan any)
192+
originalTick := q.tick
193+
q.tick = func(d time.Duration) <-chan time.Time {
194+
Expect(d).To(Equal(time.Second))
195+
close(tickSetup)
196+
return originalTick(d)
197+
}
198+
199+
lowPriority := -100
200+
highPriority := 0
201+
q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo")
202+
q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio")
203+
204+
Eventually(tickSetup).Should(BeClosed())
205+
206+
forwardQueueTimeBy(1 * time.Second)
207+
key, prio, _ := q.GetWithPriority()
208+
209+
Expect(key).To(Equal("prio"))
210+
Expect(prio).To(Equal(0))
211+
Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0}))
212+
Expect(metrics.adds["test"]).To(Equal(2))
213+
Expect(metrics.retries["test"]).To(Equal(1))
214+
})
215+
187216
It("returns an item to a waiter as soon as it has one", func() {
188217
q, metrics := newQueue()
189218
defer q.ShutDown()

0 commit comments

Comments
 (0)