@@ -57,9 +57,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5757 }
5858
5959 pq := & priorityqueue [T ]{
60- items : map [T ]* item [T ]{},
61- queue : btree .NewG (32 , less [T ]),
62- metrics : newQueueMetrics [T ](opts .MetricProvider , name , clock.RealClock {}),
60+ items : map [T ]* item [T ]{},
61+ queue : btree .NewG (32 , less [T ]),
62+ becameReady : sets.Set [T ]{},
63+ metrics : newQueueMetrics [T ](opts .MetricProvider , name , clock.RealClock {}),
6364 // itemOrWaiterAdded indicates that an item or
6465 // waiter was added. It must be buffered, because
6566 // if we currently process items we can't tell
@@ -83,16 +84,21 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
8384
8485type priorityqueue [T comparable ] struct {
8586 // lock has to be acquired for any access any of items, queue, addedCounter
86- // or metrics.
87- lock sync.Mutex
88- items map [T ]* item [T ]
89- queue bTree [* item [T ]]
90- metrics queueMetrics [T ]
87+ // or becameReady
88+ lock sync.Mutex
89+ items map [T ]* item [T ]
90+ queue bTree [* item [T ]]
9191
9292 // addedCounter is a counter of elements added, we need it
9393 // because unixNano is not guaranteed to be unique.
9494 addedCounter uint64
9595
96+ // becameReady holds items that that are in the queue, were added
97+ // with non-zero after and became ready. We need it to call the
98+ // metrics add exactly once for them.
99+ becameReady sets.Set [T ]
100+ metrics queueMetrics [T ]
101+
96102 itemOrWaiterAdded chan struct {}
97103
98104 rateLimiter workqueue.TypedRateLimiter [T ]
@@ -142,7 +148,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
142148 }
143149 w .items [key ] = item
144150 w .queue .ReplaceOrInsert (item )
145- w .metrics .add (key )
151+ if item .readyAt == nil {
152+ w .metrics .add (key )
153+ }
146154 w .addedCounter ++
147155 continue
148156 }
@@ -196,18 +204,21 @@ func (w *priorityqueue[T]) spin() {
196204 defer w .lockedLock .Unlock ()
197205
198206 w .queue .Ascend (func (item * item [T ]) bool {
199- if w .waiters .Load () == 0 { // no waiters, return as we can not hand anything out anyways
200- return false
207+ if item .readyAt != nil {
208+ if readyAt := item .readyAt .Sub (w .now ()); readyAt > 0 {
209+ nextReady = w .tick (readyAt )
210+ return false
211+ }
212+ if ! w .becameReady .Has (item .key ) {
213+ w .metrics .add (item .key )
214+ w .becameReady .Insert (item .key )
215+ }
201216 }
202217
203- // No next element we can process
204- if item .readyAt != nil && item .readyAt .After (w .now ()) {
205- readyAt := item .readyAt .Sub (w .now ())
206- if readyAt <= 0 { // Toctou race with the above check
207- readyAt = 1
208- }
209- nextReady = w .tick (readyAt )
210- return false
218+ if w .waiters .Load () == 0 {
219+ // Have to keep iterating here to ensure we update metrics
220+ // for further items that became ready and set nextReady.
221+ return true
211222 }
212223
213224 // Item is locked, we can not hand it out
@@ -220,6 +231,7 @@ func (w *priorityqueue[T]) spin() {
220231 w .waiters .Add (- 1 )
221232 delete (w .items , item .key )
222233 w .queue .Delete (item )
234+ w .becameReady .Delete (item .key )
223235 w .get <- * item
224236
225237 return true
@@ -279,22 +291,36 @@ func (w *priorityqueue[T]) ShutDown() {
279291 close (w .done )
280292}
281293
294+ // ShutDownWithDrain just calls ShutDown, as the draining
295+ // functionality is not used by controller-runtime.
282296func (w * priorityqueue [T ]) ShutDownWithDrain () {
283297 w .ShutDown ()
284298}
285299
300+ // Len returns the number of items that are ready to be
301+ // picked up. It does not include items that are not yet
302+ // ready.
286303func (w * priorityqueue [T ]) Len () int {
287304 w .lock .Lock ()
288305 defer w .lock .Unlock ()
289306
290- return w .queue .Len ()
307+ var result int
308+ w .queue .Ascend (func (item * item [T ]) bool {
309+ if item .readyAt == nil || item .readyAt .Compare (w .now ()) <= 0 {
310+ result ++
311+ return true
312+ }
313+ return false
314+ })
315+
316+ return result
291317}
292318
293319func less [T comparable ](a , b * item [T ]) bool {
294320 if a .readyAt == nil && b .readyAt != nil {
295321 return true
296322 }
297- if a .readyAt != nil && b .readyAt = = nil {
323+ if b .readyAt == nil && a .readyAt ! = nil {
298324 return false
299325 }
300326 if a .readyAt != nil && b .readyAt != nil && ! a .readyAt .Equal (* b .readyAt ) {
@@ -329,5 +355,4 @@ type bTree[T any] interface {
329355 ReplaceOrInsert (item T ) (_ T , _ bool )
330356 Delete (item T ) (T , bool )
331357 Ascend (iterator btree.ItemIteratorG [T ])
332- Len () int
333358}
0 commit comments