Skip to content

Commit 50cb8c3

Browse files
authored
Merge pull request #3336 from alvaroaleman/deduplicate
🌱 Prioriyqueue tests: Add and use newQueueWithTimeForwarder
2 parents c8a5a35 + c5d90e3 commit 50cb8c3

File tree

1 file changed

+47
-63
lines changed

1 file changed

+47
-63
lines changed

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 47 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,10 @@ var _ = Describe("Controllerworkqueue", func() {
9393
q.AddWithOpts(AddOpts{}, "foo")
9494
q.AddWithOpts(AddOpts{}, "foo")
9595

96-
Consistently(q.Len).Should(Equal(1))
96+
Expect(q.Len()).To(Equal(1))
9797

98-
cwq := q.(*priorityqueue[string])
99-
cwq.lockedLock.Lock()
100-
Expect(cwq.locked.Len()).To(Equal(0))
98+
q.lockedLock.Lock()
99+
Expect(q.locked.Len()).To(Equal(0))
101100

102101
Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1}))
103102
Expect(metrics.adds["test"]).To(Equal(1))
@@ -156,22 +155,13 @@ var _ = Describe("Controllerworkqueue", func() {
156155
})
157156

158157
It("returns an item only after after has passed", func() {
159-
q, metrics := newQueue()
158+
q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder()
160159
defer q.ShutDown()
161160

162-
now := time.Now().Round(time.Second)
163-
nowLock := sync.Mutex{}
164-
tick := make(chan time.Time)
165-
166-
cwq := q.(*priorityqueue[string])
167-
cwq.now = func() time.Time {
168-
nowLock.Lock()
169-
defer nowLock.Unlock()
170-
return now
171-
}
172-
cwq.tick = func(d time.Duration) <-chan time.Time {
161+
originalTick := q.tick
162+
q.tick = func(d time.Duration) <-chan time.Time {
173163
Expect(d).To(Equal(time.Second))
174-
return tick
164+
return originalTick(d)
175165
}
176166

177167
retrievedItem := make(chan struct{})
@@ -186,10 +176,7 @@ var _ = Describe("Controllerworkqueue", func() {
186176

187177
Consistently(retrievedItem).ShouldNot(BeClosed())
188178

189-
nowLock.Lock()
190-
now = now.Add(time.Second)
191-
nowLock.Unlock()
192-
tick <- now
179+
forwardQueueTimeBy(time.Second)
193180
Eventually(retrievedItem).Should(BeClosed())
194181

195182
Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0}))
@@ -223,20 +210,11 @@ var _ = Describe("Controllerworkqueue", func() {
223210
})
224211

225212
It("returns multiple items with after in correct order", func() {
226-
q, metrics := newQueue()
213+
q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder()
227214
defer q.ShutDown()
228215

229-
now := time.Now().Round(time.Second)
230-
nowLock := sync.Mutex{}
231-
tick := make(chan time.Time)
232-
233-
cwq := q.(*priorityqueue[string])
234-
cwq.now = func() time.Time {
235-
nowLock.Lock()
236-
defer nowLock.Unlock()
237-
return now
238-
}
239-
cwq.tick = func(d time.Duration) <-chan time.Time {
216+
originalTick := q.tick
217+
q.tick = func(d time.Duration) <-chan time.Time {
240218
// What a bunch of bs. Deferring in here causes
241219
// ginkgo to deadlock, presumably because it
242220
// never returns after the defer. Not deferring
@@ -254,7 +232,7 @@ var _ = Describe("Controllerworkqueue", func() {
254232
Expect(d).To(Or(Equal(200*time.Millisecond), Equal(time.Second)))
255233
}()
256234
<-done
257-
return tick
235+
return originalTick(d)
258236
}
259237

260238
retrievedItem := make(chan struct{})
@@ -276,10 +254,7 @@ var _ = Describe("Controllerworkqueue", func() {
276254

277255
Consistently(retrievedItem).ShouldNot(BeClosed())
278256

279-
nowLock.Lock()
280-
now = now.Add(time.Second)
281-
nowLock.Unlock()
282-
tick <- now
257+
forwardQueueTimeBy(time.Second)
283258
Eventually(retrievedItem).Should(BeClosed())
284259
Eventually(retrievedSecondItem).Should(BeClosed())
285260

@@ -462,21 +437,12 @@ var _ = Describe("Controllerworkqueue", func() {
462437
})
463438

464439
It("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items", func() {
465-
q, metrics := newQueue()
440+
q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder()
466441
defer q.ShutDown()
467442

468-
now := time.Now().Round(time.Second)
469-
nowLock := sync.Mutex{}
470-
tick := make(chan time.Time)
471-
472-
cwq := q.(*priorityqueue[string])
473-
cwq.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second)
474-
cwq.now = func() time.Time {
475-
nowLock.Lock()
476-
defer nowLock.Unlock()
477-
return now
478-
}
479-
cwq.tick = func(d time.Duration) <-chan time.Time {
443+
q.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second)
444+
originalTick := q.tick
445+
q.tick = func(d time.Duration) <-chan time.Time {
480446
done := make(chan struct{})
481447
go func() {
482448
defer GinkgoRecover()
@@ -485,7 +451,7 @@ var _ = Describe("Controllerworkqueue", func() {
485451
Expect(d).To(Or(Equal(5*time.Millisecond), Equal(635*time.Millisecond)))
486452
}()
487453
<-done
488-
return tick
454+
return originalTick(d)
489455
}
490456

491457
retrievedItem := make(chan struct{})
@@ -504,22 +470,16 @@ var _ = Describe("Controllerworkqueue", func() {
504470

505471
// after 7 calls, the next When("bar") call will return 640ms.
506472
for range 7 {
507-
cwq.rateLimiter.When("bar")
473+
q.rateLimiter.When("bar")
508474
}
509475
q.AddWithOpts(AddOpts{RateLimited: true}, "foo", "bar")
510476

511477
Consistently(retrievedItem).ShouldNot(BeClosed())
512-
nowLock.Lock()
513-
now = now.Add(5 * time.Millisecond)
514-
nowLock.Unlock()
515-
tick <- now
478+
forwardQueueTimeBy(5 * time.Millisecond)
516479
Eventually(retrievedItem).Should(BeClosed())
517480

518481
Consistently(retrievedSecondItem).ShouldNot(BeClosed())
519-
nowLock.Lock()
520-
now = now.Add(635 * time.Millisecond)
521-
nowLock.Unlock()
522-
tick <- now
482+
forwardQueueTimeBy(635 * time.Millisecond)
523483
Eventually(retrievedSecondItem).Should(BeClosed())
524484

525485
Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0}))
@@ -692,7 +652,31 @@ func TestFuzzPriorityQueue(t *testing.T) {
692652
wg.Wait()
693653
}
694654

695-
func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
655+
func newQueueWithTimeForwarder() (_ *priorityqueue[string], _ *fakeMetricsProvider, forwardQueueTime func(time.Duration)) {
656+
q, m := newQueue()
657+
658+
now := time.Now().Round(time.Second)
659+
nowLock := sync.Mutex{}
660+
tick := make(chan time.Time)
661+
662+
q.now = func() time.Time {
663+
nowLock.Lock()
664+
defer nowLock.Unlock()
665+
return now
666+
}
667+
q.tick = func(d time.Duration) <-chan time.Time {
668+
return tick
669+
}
670+
671+
return q, m, func(d time.Duration) {
672+
nowLock.Lock()
673+
now = now.Add(d)
674+
nowLock.Unlock()
675+
tick <- now
676+
}
677+
}
678+
679+
func newQueue() (*priorityqueue[string], *fakeMetricsProvider) {
696680
metrics := newFakeMetricsProvider()
697681
q := New("test", func(o *Opts[string]) {
698682
o.MetricProvider = metrics
@@ -710,7 +694,7 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
710694
}
711695
return upstreamTick(d)
712696
}
713-
return q, metrics
697+
return q.(*priorityqueue[string]), metrics
714698
}
715699

716700
type btreeInteractionValidator struct {

0 commit comments

Comments
 (0)