Skip to content

Commit ee91144

Browse files
moritzmoekstiehl
andcommitted
fix: adjust priority queue order and spin
Co-authored-by: kstiehl <[email protected]>
1 parent b0b0b0f commit ee91144

File tree

1 file changed

+58
-32
lines changed

1 file changed

+58
-32
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package priorityqueue
22

33
import (
4+
"math"
45
"sync"
56
"sync/atomic"
67
"time"
@@ -206,17 +207,18 @@ func (w *priorityqueue[T]) spin() {
206207
blockForever := make(chan time.Time)
207208
var nextReady <-chan time.Time
208209
nextReady = blockForever
210+
var nextItemReadyAt time.Time
209211

210212
for {
211213
select {
212214
case <-w.done:
213215
return
214216
case <-w.itemOrWaiterAdded:
215217
case <-nextReady:
218+
nextReady = blockForever
219+
nextItemReadyAt = time.Time{}
216220
}
217221

218-
nextReady = blockForever
219-
220222
func() {
221223
w.lock.Lock()
222224
defer w.lock.Unlock()
@@ -227,39 +229,62 @@ func (w *priorityqueue[T]) spin() {
227229
// manipulating the tree from within Ascend might lead to panics, so
228230
// track what we want to delete and do it after we are done ascending.
229231
var toDelete []*item[T]
230-
w.queue.Ascend(func(item *item[T]) bool {
231-
if item.ReadyAt != nil {
232-
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
233-
nextReady = w.tick(readyAt)
234-
return false
232+
233+
var key T
234+
pivot := item[T]{
235+
Key: key,
236+
AddedCounter: 0,
237+
Priority: math.MaxInt,
238+
ReadyAt: nil,
239+
}
240+
241+
for {
242+
pivotChange := false
243+
244+
w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool {
245+
// Item is locked, we can not hand it out
246+
if w.locked.Has(item.Key) {
247+
return true
235248
}
236-
if !w.becameReady.Has(item.Key) {
237-
w.metrics.add(item.Key, item.Priority)
238-
w.becameReady.Insert(item.Key)
249+
250+
if item.ReadyAt != nil {
251+
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
252+
if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() {
253+
nextReady = w.tick(readyAt)
254+
nextItemReadyAt = *item.ReadyAt
255+
}
256+
257+
pivot.Priority = item.Priority - 1
258+
pivotChange = true
259+
return false
260+
}
261+
if !w.becameReady.Has(item.Key) {
262+
w.metrics.add(item.Key, item.Priority)
263+
w.becameReady.Insert(item.Key)
264+
}
239265
}
240-
}
241266

242-
if w.waiters.Load() == 0 {
243-
// Have to keep iterating here to ensure we update metrics
244-
// for further items that became ready and set nextReady.
245-
return true
246-
}
267+
if w.waiters.Load() == 0 {
268+
// Have to keep iterating here to ensure we update metrics
269+
// for further items that became ready and set nextReady.
270+
return true
271+
}
247272

248-
// Item is locked, we can not hand it out
249-
if w.locked.Has(item.Key) {
250-
return true
251-
}
273+
w.metrics.get(item.Key, item.Priority)
274+
w.locked.Insert(item.Key)
275+
w.waiters.Add(-1)
276+
delete(w.items, item.Key)
277+
toDelete = append(toDelete, item)
278+
w.becameReady.Delete(item.Key)
279+
w.get <- *item
252280

253-
w.metrics.get(item.Key, item.Priority)
254-
w.locked.Insert(item.Key)
255-
w.waiters.Add(-1)
256-
delete(w.items, item.Key)
257-
toDelete = append(toDelete, item)
258-
w.becameReady.Delete(item.Key)
259-
w.get <- *item
281+
return true
282+
})
260283

261-
return true
262-
})
284+
if !pivotChange {
285+
break
286+
}
287+
}
263288

264289
for _, item := range toDelete {
265290
w.queue.Delete(item)
@@ -387,6 +412,9 @@ func (w *priorityqueue[T]) logState() {
387412
}
388413

389414
func less[T comparable](a, b *item[T]) bool {
415+
if a.Priority != b.Priority {
416+
return a.Priority > b.Priority
417+
}
390418
if a.ReadyAt == nil && b.ReadyAt != nil {
391419
return true
392420
}
@@ -396,9 +424,6 @@ func less[T comparable](a, b *item[T]) bool {
396424
if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) {
397425
return a.ReadyAt.Before(*b.ReadyAt)
398426
}
399-
if a.Priority != b.Priority {
400-
return a.Priority > b.Priority
401-
}
402427

403428
return a.AddedCounter < b.AddedCounter
404429
}
@@ -426,4 +451,5 @@ type bTree[T any] interface {
426451
ReplaceOrInsert(item T) (_ T, _ bool)
427452
Delete(item T) (T, bool)
428453
Ascend(iterator btree.ItemIteratorG[T])
454+
AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T])
429455
}

0 commit comments

Comments
 (0)