Skip to content

Commit 8a89248

Browse files
committed
Fix missing notification when item is added
1 parent 506d130 commit 8a89248

File tree

2 files changed

+46
-1
lines changed

2 files changed

+46
-1
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
161161

162162
w.queue.ReplaceOrInsert(item)
163163
}
164+
165+
if len(items) > 0 {
166+
w.notifyItemOrWaiterAdded()
167+
}
164168
}
165169

166170
func (w *priorityqueue[T]) notifyItemOrWaiterAdded() {

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,31 @@ var _ = Describe("Controllerworkqueue", func() {
194194
Expect(metrics.retries["test"]).To(Equal(1))
195195
})
196196

197+
It("returns an item to a waiter as soon as it has one", func() {
198+
q, metrics := newQueue()
199+
defer q.ShutDown()
200+
201+
retrieved := make(chan struct{})
202+
go func() {
203+
defer GinkgoRecover()
204+
item, _, _ := q.GetWithPriority()
205+
Expect(item).To(Equal("foo"))
206+
close(retrieved)
207+
}()
208+
209+
// We are waiting for the GetWithPriority() call to be blocked
210+
// on retrieving an item. As golang doesn't provide a way to
211+
// check if something is listening on a channel without
212+
// sending them a message, I can't think of a way to do this
213+
// without sleeping.
214+
time.Sleep(time.Second)
215+
q.AddWithOpts(AddOpts{}, "foo")
216+
Eventually(retrieved).Should(BeClosed())
217+
218+
Expect(metrics.depth["test"]).To(Equal(0))
219+
Expect(metrics.adds["test"]).To(Equal(1))
220+
})
221+
197222
It("returns multiple items with after in correct order", func() {
198223
q, metrics := newQueue()
199224
defer q.ShutDown()
@@ -209,7 +234,23 @@ var _ = Describe("Controllerworkqueue", func() {
209234
return now
210235
}
211236
cwq.tick = func(d time.Duration) <-chan time.Time {
212-
Expect(d).To(Equal(200 * time.Millisecond))
237+
// What a bunch of bs. Deferring in here causes
238+
// ginkgo to deadlock, presumably because it
239+
// never returns after the defer. Not deferring
240+
// hides the actual assertion result and makes
241+
// it complain that there should be a defer.
242+
// Move the assertion into a goroutine just to
243+
// get around that mess.
244+
done := make(chan struct{})
245+
go func() {
246+
defer GinkgoRecover()
247+
defer close(done)
248+
249+
// This is not deterministic and depends on which of
250+
// Add() or Spin() gets the lock first.
251+
Expect(d).To(Or(Equal(200*time.Millisecond), Equal(time.Second)))
252+
}()
253+
<-done
213254
return tick
214255
}
215256

0 commit comments

Comments
 (0)