diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 9c708e982b..f9544e4cb9 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -3,6 +3,7 @@ package priorityqueue import ( "fmt" "math/rand/v2" + "strconv" "sync" "testing" "testing/synctest" @@ -103,6 +104,30 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.adds["test"]).To(Equal(1)) }) + It("enqueues a locked item", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{}, "foo") + Expect(q.Len()).To(Equal(1)) + item, priority, shutdown := q.GetWithPriority() + Expect(item).To(Equal("foo")) + Expect(priority).To(Equal(0)) + Expect(shutdown).To(BeFalse()) + + q.AddWithOpts(AddOpts{}, "foo") + Expect(q.Len()).To(Equal(1)) + q.Done("foo") + + item, priority, shutdown = q.GetWithPriority() + Expect(item).To(Equal("foo")) + Expect(priority).To(Equal(0)) + Expect(shutdown).To(BeFalse()) + + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + Expect(metrics.adds["test"]).To(Equal(2)) + }) + It("retains the highest priority", func() { q, metrics := newQueue() defer q.ShutDown() @@ -120,6 +145,23 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.adds["test"]).To(Equal(1)) }) + It("will not decrease the priority", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, "foo") + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo") + + item, priority, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + Expect(priority).To(Equal(2)) + + Expect(q.Len()).To(Equal(0)) + + Expect(metrics.depth["test"]).To(Equal(map[int]int{2: 0})) + Expect(metrics.adds["test"]).To(Equal(1)) + }) + It("gets pushed to the front if the priority increases", func() { q, metrics := newQueue() defer q.ShutDown() @@ -251,6 +293,35 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) metrics.mu.Unlock() }) + + It("updates metrics correctly when an item with after is re-added with higher priority and no after", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: time.Hour, Priority: ptr.To(0)}, "foo") + Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(map[int]int{})) + metrics.mu.Unlock() + + q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo") + + Expect(q.Len()).To(Equal(1)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(map[int]int{1: 1})) + metrics.mu.Unlock() + + item, priority, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + Expect(priority).To(Equal(1)) + Expect(q.Len()).To(Equal(0)) + + metrics.mu.Lock() + Expect(metrics.depth["test"][1]).To(Equal(0)) + for _, depth := range metrics.depth["test"] { + Expect(depth).To(Equal(0)) + } + }) }) func BenchmarkAddGetDone(b *testing.B) { @@ -443,56 +514,69 @@ func newQueueWithTimeForwarder() (_ *priorityqueue[string], _ *fakeMetricsProvid func TestHighPriorityItemsAreReturnedBeforeLowPriorityItemMultipleTimes(t *testing.T) { t.Parallel() - synctest.Test(t, func(t *testing.T) { - g := NewWithT(t) - - q, metrics := newQueue() - defer q.ShutDown() - - const itemsPerPriority = 1000 - lowPriority := 0 - lowMiddlePriority := 5 - middlePriority := 10 - upperMiddlePriority := 15 - highPriority := 20 - for i := range itemsPerPriority { - q.AddWithOpts(AddOpts{Priority: &highPriority}, fmt.Sprintf("high-%d", i)) - q.AddWithOpts(AddOpts{Priority: &upperMiddlePriority}, fmt.Sprintf("upperMiddle-%d", i)) - q.AddWithOpts(AddOpts{Priority: &middlePriority}, fmt.Sprintf("middle-%d", i)) - q.AddWithOpts(AddOpts{Priority: &lowMiddlePriority}, fmt.Sprintf("lowMiddle-%d", i)) - q.AddWithOpts(AddOpts{Priority: &lowPriority}, fmt.Sprintf("low-%d", i)) - } - synctest.Wait() + for _, after := range []time.Duration{-time.Second, 0, time.Second} { + t.Run(fmt.Sprintf("after=%v", after), func(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + defer q.ShutDown() + + const itemsPerPriority = 1000 + lowPriority := -10 + lowMiddlePriority := -5 + middlePriority := 0 + upperMiddlePriority := 5 + highPriority := 10 + for i := range itemsPerPriority { + q.AddWithOpts(AddOpts{Priority: &highPriority, After: after}, fmt.Sprintf("high-%d", i)) + q.AddWithOpts(AddOpts{Priority: &upperMiddlePriority, After: after}, fmt.Sprintf("upperMiddle-%d", i)) + q.AddWithOpts(AddOpts{Priority: &middlePriority, After: after}, fmt.Sprintf("middle-%d", i)) + q.AddWithOpts(AddOpts{Priority: &lowMiddlePriority, After: after}, fmt.Sprintf("lowMiddle-%d", i)) + q.AddWithOpts(AddOpts{Priority: &lowPriority, After: after}, fmt.Sprintf("low-%d", i)) + } + synctest.Wait() + if after > 0 { + forwardQueueTimeBy(after) + synctest.Wait() + } - for range itemsPerPriority { - key, prio, _ := q.GetWithPriority() - g.Expect(prio).To(Equal(highPriority)) - g.Expect(key).To(HavePrefix("high-")) - } - for range itemsPerPriority { - key, prio, _ := q.GetWithPriority() - g.Expect(prio).To(Equal(upperMiddlePriority)) - g.Expect(key).To(HavePrefix("upperMiddle-")) - } - for range itemsPerPriority { - key, prio, _ := q.GetWithPriority() - g.Expect(prio).To(Equal(middlePriority)) - g.Expect(key).To(HavePrefix("middle-")) - } - for range itemsPerPriority { - key, prio, _ := q.GetWithPriority() - g.Expect(prio).To(Equal(lowMiddlePriority)) - g.Expect(key).To(HavePrefix("lowMiddle-")) - } - for range itemsPerPriority { - key, prio, _ := q.GetWithPriority() - g.Expect(prio).To(Equal(lowPriority)) - g.Expect(key).To(HavePrefix("low-")) - } - g.Expect(metrics.depth["test"]).To(Equal(map[int]int{10: 0, 5: 0, 0: 0, 20: 0, 15: 0})) - g.Expect(metrics.adds["test"]).To(Equal(itemsPerPriority * 5)) - g.Expect(metrics.retries["test"]).To(Equal(0)) - }) + for i := range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(highPriority)) + g.Expect(key).To(Equal("high-" + strconv.Itoa(i))) + } + for i := range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(upperMiddlePriority)) + g.Expect(key).To(Equal("upperMiddle-" + strconv.Itoa(i))) + } + for i := range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(middlePriority)) + g.Expect(key).To(Equal("middle-" + strconv.Itoa(i))) + } + for i := range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(lowMiddlePriority)) + g.Expect(key).To(Equal("lowMiddle-" + strconv.Itoa(i))) + } + for i := range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(lowPriority)) + g.Expect(key).To(Equal("low-" + strconv.Itoa(i))) + } + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{-10: 0, -5: 0, 0: 0, 5: 0, 10: 0})) + g.Expect(metrics.adds["test"]).To(Equal(itemsPerPriority * 5)) + expectedRetries := 0 + if after > 0 { + expectedRetries = itemsPerPriority * 5 + } + g.Expect(metrics.retries["test"]).To(Equal(expectedRetries)) + }) + }) + } } func newQueue() (*priorityqueue[string], *fakeMetricsProvider) {