Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,35 +195,41 @@ func (w *priorityqueue[T]) spin() {
w.lockedLock.Lock()
defer w.lockedLock.Unlock()

w.queue.Ascend(func(item *item[T]) bool {
if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways
return false
}
for continueLoop := true; continueLoop; {
continueLoop = false
w.queue.Ascend(func(item *item[T]) bool {
if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways
return false
}

// No next element we can process
if item.readyAt != nil && item.readyAt.After(w.now()) {
readyAt := item.readyAt.Sub(w.now())
if readyAt <= 0 { // Toctou race with the above check
readyAt = 1
}
nextReady = w.tick(readyAt)
return false
}

// No next element we can process
if item.readyAt != nil && item.readyAt.After(w.now()) {
readyAt := item.readyAt.Sub(w.now())
if readyAt <= 0 { // Toctou race with the above check
readyAt = 1
// Item is locked, we can not hand it out
if w.locked.Has(item.key) {
return true
}
nextReady = w.tick(readyAt)

w.metrics.get(item.key)
w.locked.Insert(item.key)
w.waiters.Add(-1)
delete(w.items, item.key)
w.queue.Delete(item)
w.get <- *item

// Return false because continuing with Ascend after deleting an item
// can lead to panics within Ascend.
continueLoop = true
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a super strong opinion but how do you feel about instead appending to a toDelete slice in Ascend and then calling Delete after being done with Ascend? It should be safe because we are holding the lock so a concurrent routine seeing the item when it is supposed to be deleted shouldn't be possible.

The reason is that even if it works this way now, I don't think manipluting the tree while iterating is an expected usage, even if it seems to work now there could be more bugs or it could stop working in a future version of the lib.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up using that approach + your testcase in #3060 as this issue made CI fail there, hope that is okay

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. You're right, seems safer to just not delete within Ascend. I'll take a look at your PR. Thx for taking this over

}

// Item is locked, we can not hand it out
if w.locked.Has(item.key) {
return true
}

w.metrics.get(item.key)
w.locked.Insert(item.key)
w.waiters.Add(-1)
delete(w.items, item.key)
w.queue.Delete(item)
w.get <- *item

return true
})
})
}
}()
}
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"fmt"
"math/rand/v2"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -283,6 +284,41 @@
Expect(metrics.depth["test"]).To(Equal(0))
Expect(metrics.adds["test"]).To(Equal(2))
})

It("returns many items", func() {
Copy link
Member Author

@sbueringer sbueringer Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to verify that the spin goroutine didn't panic.

Locally with Intellij, I've hit a panic break point and after continuing the test was shown as successful (even with the panic). I'm not sure if the same happens in CI

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe that there was a breakpoint caused go test to not recognize it properly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jup maybe

// This test ensures the queue is able to drain a large queue without panic'ing.
// In a previous version of the code we were calling queue.Delete within q.Ascend
// which led to a panic in queue.Ascend > iterate:
// "panic: runtime error: index out of range [0] with length 0"

q, _ := newQueue()
defer q.ShutDown()

for range 20 {
for i := range 1000 {
rn := rand.N(100)

Check failure on line 299 in pkg/controller/priorityqueue/priorityqueue_test.go

View workflow job for this annotation

GitHub Actions / lint

G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand) (gosec)
if rn < 10 {
q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i))
} else {
q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i))
}
}

wg := sync.WaitGroup{}
for range 100 { // The panic only occurred relatively frequently with a high number of go routines.
wg.Add(1)
go func() {
defer wg.Done()
for range 10 {
obj, _, _ := q.GetWithPriority()
q.Done(obj)
}
}()
}

wg.Wait()
}
})
})

func BenchmarkAddGetDone(b *testing.B) {
Expand Down
Loading