@@ -93,11 +93,10 @@ var _ = Describe("Controllerworkqueue", func() {
93
93
q .AddWithOpts (AddOpts {}, "foo" )
94
94
q .AddWithOpts (AddOpts {}, "foo" )
95
95
96
- Consistently (q .Len ). Should (Equal (1 ))
96
+ Expect (q .Len ()). To (Equal (1 ))
97
97
98
- cwq := q .(* priorityqueue [string ])
99
- cwq .lockedLock .Lock ()
100
- Expect (cwq .locked .Len ()).To (Equal (0 ))
98
+ q .lockedLock .Lock ()
99
+ Expect (q .locked .Len ()).To (Equal (0 ))
101
100
102
101
Expect (metrics .depth ["test" ]).To (Equal (map [int ]int {0 : 1 }))
103
102
Expect (metrics .adds ["test" ]).To (Equal (1 ))
@@ -156,22 +155,13 @@ var _ = Describe("Controllerworkqueue", func() {
156
155
})
157
156
158
157
It ("returns an item only after after has passed" , func () {
159
- q , metrics := newQueue ()
158
+ q , metrics , forwardQueueTimeBy := newQueueWithTimeForwarder ()
160
159
defer q .ShutDown ()
161
160
162
- now := time .Now ().Round (time .Second )
163
- nowLock := sync.Mutex {}
164
- tick := make (chan time.Time )
165
-
166
- cwq := q .(* priorityqueue [string ])
167
- cwq .now = func () time.Time {
168
- nowLock .Lock ()
169
- defer nowLock .Unlock ()
170
- return now
171
- }
172
- cwq .tick = func (d time.Duration ) <- chan time.Time {
161
+ originalTick := q .tick
162
+ q .tick = func (d time.Duration ) <- chan time.Time {
173
163
Expect (d ).To (Equal (time .Second ))
174
- return tick
164
+ return originalTick ( d )
175
165
}
176
166
177
167
retrievedItem := make (chan struct {})
@@ -186,10 +176,7 @@ var _ = Describe("Controllerworkqueue", func() {
186
176
187
177
Consistently (retrievedItem ).ShouldNot (BeClosed ())
188
178
189
- nowLock .Lock ()
190
- now = now .Add (time .Second )
191
- nowLock .Unlock ()
192
- tick <- now
179
+ forwardQueueTimeBy (time .Second )
193
180
Eventually (retrievedItem ).Should (BeClosed ())
194
181
195
182
Expect (metrics .depth ["test" ]).To (Equal (map [int ]int {0 : 0 }))
@@ -223,20 +210,11 @@ var _ = Describe("Controllerworkqueue", func() {
223
210
})
224
211
225
212
It ("returns multiple items with after in correct order" , func () {
226
- q , metrics := newQueue ()
213
+ q , metrics , forwardQueueTimeBy := newQueueWithTimeForwarder ()
227
214
defer q .ShutDown ()
228
215
229
- now := time .Now ().Round (time .Second )
230
- nowLock := sync.Mutex {}
231
- tick := make (chan time.Time )
232
-
233
- cwq := q .(* priorityqueue [string ])
234
- cwq .now = func () time.Time {
235
- nowLock .Lock ()
236
- defer nowLock .Unlock ()
237
- return now
238
- }
239
- cwq .tick = func (d time.Duration ) <- chan time.Time {
216
+ originalTick := q .tick
217
+ q .tick = func (d time.Duration ) <- chan time.Time {
240
218
// What a bunch of bs. Deferring in here causes
241
219
// ginkgo to deadlock, presumably because it
242
220
// never returns after the defer. Not deferring
@@ -254,7 +232,7 @@ var _ = Describe("Controllerworkqueue", func() {
254
232
Expect (d ).To (Or (Equal (200 * time .Millisecond ), Equal (time .Second )))
255
233
}()
256
234
<- done
257
- return tick
235
+ return originalTick ( d )
258
236
}
259
237
260
238
retrievedItem := make (chan struct {})
@@ -276,10 +254,7 @@ var _ = Describe("Controllerworkqueue", func() {
276
254
277
255
Consistently (retrievedItem ).ShouldNot (BeClosed ())
278
256
279
- nowLock .Lock ()
280
- now = now .Add (time .Second )
281
- nowLock .Unlock ()
282
- tick <- now
257
+ forwardQueueTimeBy (time .Second )
283
258
Eventually (retrievedItem ).Should (BeClosed ())
284
259
Eventually (retrievedSecondItem ).Should (BeClosed ())
285
260
@@ -462,21 +437,12 @@ var _ = Describe("Controllerworkqueue", func() {
462
437
})
463
438
464
439
It ("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items" , func () {
465
- q , metrics := newQueue ()
440
+ q , metrics , forwardQueueTimeBy := newQueueWithTimeForwarder ()
466
441
defer q .ShutDown ()
467
442
468
- now := time .Now ().Round (time .Second )
469
- nowLock := sync.Mutex {}
470
- tick := make (chan time.Time )
471
-
472
- cwq := q .(* priorityqueue [string ])
473
- cwq .rateLimiter = workqueue .NewTypedItemExponentialFailureRateLimiter [string ](5 * time .Millisecond , 1000 * time .Second )
474
- cwq .now = func () time.Time {
475
- nowLock .Lock ()
476
- defer nowLock .Unlock ()
477
- return now
478
- }
479
- cwq .tick = func (d time.Duration ) <- chan time.Time {
443
+ q .rateLimiter = workqueue .NewTypedItemExponentialFailureRateLimiter [string ](5 * time .Millisecond , 1000 * time .Second )
444
+ originalTick := q .tick
445
+ q .tick = func (d time.Duration ) <- chan time.Time {
480
446
done := make (chan struct {})
481
447
go func () {
482
448
defer GinkgoRecover ()
@@ -485,7 +451,7 @@ var _ = Describe("Controllerworkqueue", func() {
485
451
Expect (d ).To (Or (Equal (5 * time .Millisecond ), Equal (635 * time .Millisecond )))
486
452
}()
487
453
<- done
488
- return tick
454
+ return originalTick ( d )
489
455
}
490
456
491
457
retrievedItem := make (chan struct {})
@@ -504,22 +470,16 @@ var _ = Describe("Controllerworkqueue", func() {
504
470
505
471
// after 7 calls, the next When("bar") call will return 640ms.
506
472
for range 7 {
507
- cwq .rateLimiter .When ("bar" )
473
+ q .rateLimiter .When ("bar" )
508
474
}
509
475
q .AddWithOpts (AddOpts {RateLimited : true }, "foo" , "bar" )
510
476
511
477
Consistently (retrievedItem ).ShouldNot (BeClosed ())
512
- nowLock .Lock ()
513
- now = now .Add (5 * time .Millisecond )
514
- nowLock .Unlock ()
515
- tick <- now
478
+ forwardQueueTimeBy (5 * time .Millisecond )
516
479
Eventually (retrievedItem ).Should (BeClosed ())
517
480
518
481
Consistently (retrievedSecondItem ).ShouldNot (BeClosed ())
519
- nowLock .Lock ()
520
- now = now .Add (635 * time .Millisecond )
521
- nowLock .Unlock ()
522
- tick <- now
482
+ forwardQueueTimeBy (635 * time .Millisecond )
523
483
Eventually (retrievedSecondItem ).Should (BeClosed ())
524
484
525
485
Expect (metrics .depth ["test" ]).To (Equal (map [int ]int {0 : 0 }))
@@ -692,7 +652,31 @@ func TestFuzzPriorityQueue(t *testing.T) {
692
652
wg .Wait ()
693
653
}
694
654
695
- func newQueue () (PriorityQueue [string ], * fakeMetricsProvider ) {
655
+ func newQueueWithTimeForwarder () (_ * priorityqueue [string ], _ * fakeMetricsProvider , forwardQueueTime func (time.Duration )) {
656
+ q , m := newQueue ()
657
+
658
+ now := time .Now ().Round (time .Second )
659
+ nowLock := sync.Mutex {}
660
+ tick := make (chan time.Time )
661
+
662
+ q .now = func () time.Time {
663
+ nowLock .Lock ()
664
+ defer nowLock .Unlock ()
665
+ return now
666
+ }
667
+ q .tick = func (d time.Duration ) <- chan time.Time {
668
+ return tick
669
+ }
670
+
671
+ return q , m , func (d time.Duration ) {
672
+ nowLock .Lock ()
673
+ now = now .Add (d )
674
+ nowLock .Unlock ()
675
+ tick <- now
676
+ }
677
+ }
678
+
679
+ func newQueue () (* priorityqueue [string ], * fakeMetricsProvider ) {
696
680
metrics := newFakeMetricsProvider ()
697
681
q := New ("test" , func (o * Opts [string ]) {
698
682
o .MetricProvider = metrics
@@ -710,7 +694,7 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
710
694
}
711
695
return upstreamTick (d )
712
696
}
713
- return q , metrics
697
+ return q .( * priorityqueue [ string ]) , metrics
714
698
}
715
699
716
700
type btreeInteractionValidator struct {
0 commit comments