Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 63 additions & 32 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package priorityqueue

import (
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -206,17 +207,18 @@ func (w *priorityqueue[T]) spin() {
blockForever := make(chan time.Time)
var nextReady <-chan time.Time
nextReady = blockForever
var nextItemReadyAt time.Time

for {
select {
case <-w.done:
return
case <-w.itemOrWaiterAdded:
case <-nextReady:
nextReady = blockForever
nextItemReadyAt = time.Time{}
}

nextReady = blockForever

func() {
w.lock.Lock()
defer w.lock.Unlock()
Expand All @@ -227,39 +229,67 @@ func (w *priorityqueue[T]) spin() {
// manipulating the tree from within Ascend might lead to panics, so
// track what we want to delete and do it after we are done ascending.
var toDelete []*item[T]
w.queue.Ascend(func(item *item[T]) bool {
if item.ReadyAt != nil {
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
nextReady = w.tick(readyAt)
return false

var key T

// Items in the queue tree are sorted first by priority and second by readiness, so
// items with a lower priority might be ready further down in the queue.
// We iterate through the priorities high to low until we find a ready item
pivot := item[T]{
Key: key,
AddedCounter: 0,
Priority: math.MaxInt,
ReadyAt: nil,
}

for {
pivotChange := false

w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool {
// Item is locked, we can not hand it out
if w.locked.Has(item.Key) {
return true
}
if !w.becameReady.Has(item.Key) {
w.metrics.add(item.Key, item.Priority)
w.becameReady.Insert(item.Key)

if item.ReadyAt != nil {
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() {
nextReady = w.tick(readyAt)
nextItemReadyAt = *item.ReadyAt
}

// Adjusting the pivot item moves the ascend to the next lower priority
pivot.Priority = item.Priority - 1
pivotChange = true
return false
}
if !w.becameReady.Has(item.Key) {
w.metrics.add(item.Key, item.Priority)
w.becameReady.Insert(item.Key)
}
}
}

if w.waiters.Load() == 0 {
// Have to keep iterating here to ensure we update metrics
// for further items that became ready and set nextReady.
return true
}
if w.waiters.Load() == 0 {
// Have to keep iterating here to ensure we update metrics
// for further items that became ready and set nextReady.
return true
}

// Item is locked, we can not hand it out
if w.locked.Has(item.Key) {
return true
}
w.metrics.get(item.Key, item.Priority)
w.locked.Insert(item.Key)
w.waiters.Add(-1)
delete(w.items, item.Key)
toDelete = append(toDelete, item)
w.becameReady.Delete(item.Key)
w.get <- *item

w.metrics.get(item.Key, item.Priority)
w.locked.Insert(item.Key)
w.waiters.Add(-1)
delete(w.items, item.Key)
toDelete = append(toDelete, item)
w.becameReady.Delete(item.Key)
w.get <- *item
return true
})

return true
})
if !pivotChange {
break
}
}

for _, item := range toDelete {
w.queue.Delete(item)
Expand Down Expand Up @@ -387,6 +417,9 @@ func (w *priorityqueue[T]) logState() {
}

func less[T comparable](a, b *item[T]) bool {
if a.Priority != b.Priority {
return a.Priority > b.Priority
}
if a.ReadyAt == nil && b.ReadyAt != nil {
return true
}
Expand All @@ -396,9 +429,6 @@ func less[T comparable](a, b *item[T]) bool {
if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) {
return a.ReadyAt.Before(*b.ReadyAt)
}
if a.Priority != b.Priority {
return a.Priority > b.Priority
}

return a.AddedCounter < b.AddedCounter
}
Expand Down Expand Up @@ -426,4 +456,5 @@ type bTree[T any] interface {
ReplaceOrInsert(item T) (_ T, _ bool)
Delete(item T) (T, bool)
Ascend(iterator btree.ItemIteratorG[T])
AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T])
}
29 changes: 29 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,35 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(metrics.retries["test"]).To(Equal(1))
})

It("returns high priority item that became ready before low priority item", func() {
q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder()
defer q.ShutDown()

tickSetup := make(chan any)
originalTick := q.tick
q.tick = func(d time.Duration) <-chan time.Time {
Expect(d).To(Equal(time.Second))
close(tickSetup)
return originalTick(d)
}

lowPriority := -100
highPriority := 0
q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo")
q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio")

Eventually(tickSetup).Should(BeClosed())

forwardQueueTimeBy(1 * time.Second)
key, prio, _ := q.GetWithPriority()

Expect(key).To(Equal("prio"))
Expect(prio).To(Equal(0))
Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0}))
Expect(metrics.adds["test"]).To(Equal(2))
Expect(metrics.retries["test"]).To(Equal(1))
})

It("returns an item to a waiter as soon as it has one", func() {
q, metrics := newQueue()
defer q.ShutDown()
Expand Down