@@ -57,9 +57,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5757 }
5858
5959 pq := & priorityqueue [T ]{
60- items : map [T ]* item [T ]{},
61- queue : btree .NewG (32 , less [T ]),
62- metrics : newQueueMetrics [T ](opts .MetricProvider , name , clock.RealClock {}),
60+ items : map [T ]* item [T ]{},
61+ queue : btree .NewG (32 , less [T ]),
62+ becameReady : sets.Set [T ]{},
63+ metrics : newQueueMetrics [T ](opts .MetricProvider , name , clock.RealClock {}),
6364 // itemOrWaiterAdded indicates that an item or
6465 // waiter was added. It must be buffered, because
6566 // if we currently process items we can't tell
@@ -83,16 +84,21 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
8384
8485type priorityqueue [T comparable ] struct {
8586 // lock has to be acquired for any access any of items, queue, addedCounter
86- // or metrics.
87- lock sync.Mutex
88- items map [T ]* item [T ]
89- queue bTree [* item [T ]]
90- metrics queueMetrics [T ]
87+ // or becameReady
88+ lock sync.Mutex
89+ items map [T ]* item [T ]
90+ queue bTree [* item [T ]]
9191
9292 // addedCounter is a counter of elements added, we need it
9393 // because unixNano is not guaranteed to be unique.
9494 addedCounter uint64
9595
96+ // becameReady holds items that are in the queue, were added
97+ // with non-zero after and became ready. We need it to call the
98+ // metrics add exactly once for them.
99+ becameReady sets.Set [T ]
100+ metrics queueMetrics [T ]
101+
96102 itemOrWaiterAdded chan struct {}
97103
98104 rateLimiter workqueue.TypedRateLimiter [T ]
@@ -142,7 +148,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
142148 }
143149 w .items [key ] = item
144150 w .queue .ReplaceOrInsert (item )
145- w .metrics .add (key )
151+ if item .readyAt == nil {
152+ w .metrics .add (key )
153+ }
146154 w .addedCounter ++
147155 continue
148156 }
@@ -195,19 +203,25 @@ func (w *priorityqueue[T]) spin() {
195203 w .lockedLock .Lock ()
196204 defer w .lockedLock .Unlock ()
197205
206+ // manipulating the tree from within Ascend might lead to panics, so
207+ // track what we want to delete and do it after we are done ascending.
208+ var toDelete []* item [T ]
198209 w .queue .Ascend (func (item * item [T ]) bool {
199- if w .waiters .Load () == 0 { // no waiters, return as we can not hand anything out anyways
200- return false
210+ if item .readyAt != nil {
211+ if readyAt := item .readyAt .Sub (w .now ()); readyAt > 0 {
212+ nextReady = w .tick (readyAt )
213+ return false
214+ }
215+ if ! w .becameReady .Has (item .key ) {
216+ w .metrics .add (item .key )
217+ w .becameReady .Insert (item .key )
218+ }
201219 }
202220
203- // No next element we can process
204- if item .readyAt != nil && item .readyAt .After (w .now ()) {
205- readyAt := item .readyAt .Sub (w .now ())
206- if readyAt <= 0 { // Toctou race with the above check
207- readyAt = 1
208- }
209- nextReady = w .tick (readyAt )
210- return false
221+ if w .waiters .Load () == 0 {
222+ // Have to keep iterating here to ensure we update metrics
223+ // for further items that became ready and set nextReady.
224+ return true
211225 }
212226
213227 // Item is locked, we can not hand it out
@@ -219,11 +233,16 @@ func (w *priorityqueue[T]) spin() {
219233 w .locked .Insert (item .key )
220234 w .waiters .Add (- 1 )
221235 delete (w .items , item .key )
222- w .queue .Delete (item )
236+ toDelete = append (toDelete , item )
237+ w .becameReady .Delete (item .key )
223238 w .get <- * item
224239
225240 return true
226241 })
242+
243+ for _ , item := range toDelete {
244+ w .queue .Delete (item )
245+ }
227246 }()
228247 }
229248}
@@ -279,22 +298,36 @@ func (w *priorityqueue[T]) ShutDown() {
279298 close (w .done )
280299}
281300
301+ // ShutDownWithDrain just calls ShutDown, as the draining
302+ // functionality is not used by controller-runtime.
282303func (w * priorityqueue [T ]) ShutDownWithDrain () {
283304 w .ShutDown ()
284305}
285306
307+ // Len returns the number of items that are ready to be
308+ // picked up. It does not include items that are not yet
309+ // ready.
286310func (w * priorityqueue [T ]) Len () int {
287311 w .lock .Lock ()
288312 defer w .lock .Unlock ()
289313
290- return w .queue .Len ()
314+ var result int
315+ w .queue .Ascend (func (item * item [T ]) bool {
316+ if item .readyAt == nil || item .readyAt .Compare (w .now ()) <= 0 {
317+ result ++
318+ return true
319+ }
320+ return false
321+ })
322+
323+ return result
291324}
292325
293326func less [T comparable ](a , b * item [T ]) bool {
294327 if a .readyAt == nil && b .readyAt != nil {
295328 return true
296329 }
297- if a .readyAt != nil && b .readyAt = = nil {
330+ if b .readyAt == nil && a .readyAt ! = nil {
298331 return false
299332 }
300333 if a .readyAt != nil && b .readyAt != nil && ! a .readyAt .Equal (* b .readyAt ) {
@@ -329,5 +362,4 @@ type bTree[T any] interface {
329362 ReplaceOrInsert (item T ) (_ T , _ bool )
330363 Delete (item T ) (T , bool )
331364 Ascend (iterator btree.ItemIteratorG [T ])
332- Len () int
333365}
0 commit comments