@@ -184,6 +184,66 @@ var _ = Describe("Controllerworkqueue", func() {
184
184
Expect (metrics .retries ["test" ]).To (Equal (1 ))
185
185
})
186
186
187
+ It ("returns high priority item that became ready before low priority items" , func () {
188
+ q , metrics := newQueue ()
189
+ defer q .ShutDown ()
190
+
191
+ now := time .Now ().Round (time .Second )
192
+ nowLock := sync.Mutex {}
193
+ tick := make (chan time.Time )
194
+
195
+ cwq := q .(* priorityqueue [string ])
196
+ cwq .now = func () time.Time {
197
+ nowLock .Lock ()
198
+ defer nowLock .Unlock ()
199
+ return now
200
+ }
201
+ cwq .tick = func (d time.Duration ) <- chan time.Time {
202
+ Expect (d ).To (Equal (time .Second ))
203
+ return tick
204
+ }
205
+
206
+ retrievedItem := make (chan any )
207
+ getNext := make (chan any )
208
+
209
+ go func () {
210
+ defer GinkgoRecover ()
211
+ defer close (retrievedItem )
212
+
213
+ key , prio , _ := q .GetWithPriority ()
214
+ Expect (key ).To (Equal ("foo" ))
215
+ Expect (prio ).To (Equal (- 100 ))
216
+
217
+ retrievedItem <- nil
218
+ <- getNext
219
+
220
+ key , prio , _ = q .GetWithPriority ()
221
+ Expect (key ).To (Equal ("prio" ))
222
+ Expect (prio ).To (Equal (0 ))
223
+ }()
224
+
225
+ lowPriority := - 100
226
+ highPriority := 0
227
+ q .AddWithOpts (AddOpts {After : 0 , Priority : & lowPriority }, "foo" )
228
+ q .AddWithOpts (AddOpts {After : 0 , Priority : & lowPriority }, "bar" )
229
+ q .AddWithOpts (AddOpts {After : time .Second , Priority : & highPriority }, "prio" )
230
+
231
+ <- retrievedItem
232
+
233
+ nowLock .Lock ()
234
+ now = now .Add (time .Second )
235
+ nowLock .Unlock ()
236
+ tick <- now
237
+ getNext <- nil
238
+
239
+ Eventually (retrievedItem ).Should (BeClosed ())
240
+ close (getNext )
241
+
242
+ Expect (metrics .depth ["test" ]).To (Equal (map [int ]int {- 100 : 1 , 0 : 0 }))
243
+ Expect (metrics .adds ["test" ]).To (Equal (3 ))
244
+ Expect (metrics .retries ["test" ]).To (Equal (1 ))
245
+ })
246
+
187
247
It ("returns an item to a waiter as soon as it has one" , func () {
188
248
q , metrics := newQueue ()
189
249
defer q .ShutDown ()
0 commit comments