@@ -56,9 +56,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5656 opts .MetricProvider = metrics.WorkqueueMetricsProvider {}
5757 }
5858
59- cwq := & priorityqueue [T ]{
60- items : map [T ]* item [T ]{},
61- queue : btree .NewG (32 , less [T ]),
59+ pq := & priorityqueue [T ]{
60+ items : map [T ]* item [T ]{},
61+ queue : btree .NewG (32 , less [T ]),
62+ metrics : newQueueMetrics [T ](opts .MetricProvider , name , clock.RealClock {}),
6263 // itemOrWaiterAdded indicates that an item or
6364 // waiter was added. It must be buffered, because
6465 // if we currently process items we can't tell
@@ -72,25 +73,30 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
7273 tick : time .Tick ,
7374 }
7475
75- go cwq .spin ()
76+ go pq .spin ()
77+ if _ , ok := pq .metrics .(noMetrics [T ]); ! ok {
78+ go pq .updateUnfinishedWorkLoop ()
79+ }
7680
77- return wrapWithMetrics ( cwq , name , opts . MetricProvider )
81+ return pq
7882}
7983
8084type priorityqueue [T comparable ] struct {
81- // lock has to be acquired for any access to either items or queue
82- lock sync.Mutex
83- items map [T ]* item [T ]
84- queue * btree.BTreeG [* item [T ]]
85-
86- itemOrWaiterAdded chan struct {}
87-
88- rateLimiter workqueue.TypedRateLimiter [T ]
85+ // lock has to be acquired for any access any of items, queue, addedCounter
86+ // or metrics.
87+ lock sync.Mutex
88+ items map [T ]* item [T ]
89+ queue * btree.BTreeG [* item [T ]]
90+ metrics queueMetrics [T ]
8991
9092 // addedCounter is a counter of elements added, we need it
9193 // because unixNano is not guaranteed to be unique.
9294 addedCounter uint64
9395
96+ itemOrWaiterAdded chan struct {}
97+
98+ rateLimiter workqueue.TypedRateLimiter [T ]
99+
94100 // locked contains the keys we handed out through Get() and that haven't
95101 // yet been returned through Done().
96102 locked sets.Set [T ]
@@ -136,6 +142,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
136142 }
137143 w .items [key ] = item
138144 w .queue .ReplaceOrInsert (item )
145+ w .metrics .add (key )
139146 w .addedCounter ++
140147 continue
141148 }
@@ -204,11 +211,12 @@ func (w *priorityqueue[T]) spin() {
204211 return true
205212 }
206213
207- w .get <- * item
214+ w .metrics . get ( item . key )
208215 w .locked .Insert (item .key )
209216 w .waiters .Add (- 1 )
210217 delete (w .items , item .key )
211218 w .queue .Delete (item )
219+ w .get <- * item
212220
213221 return true
214222 })
@@ -258,6 +266,7 @@ func (w *priorityqueue[T]) Done(item T) {
258266 w .lockedLock .Lock ()
259267 defer w .lockedLock .Unlock ()
260268 w .locked .Delete (item )
269+ w .metrics .done (item )
261270 w .notifyItemOrWaiterAdded ()
262271}
263272
@@ -306,52 +315,13 @@ type item[T comparable] struct {
306315 readyAt * time.Time
307316}
308317
309- func wrapWithMetrics [T comparable ](q * priorityqueue [T ], name string , provider workqueue.MetricsProvider ) PriorityQueue [T ] {
310- mwq := & metricWrappedQueue [T ]{
311- priorityqueue : q ,
312- metrics : newQueueMetrics [T ](provider , name , clock.RealClock {}),
313- }
314-
315- go mwq .updateUnfinishedWorkLoop ()
316-
317- return mwq
318- }
319-
320- type metricWrappedQueue [T comparable ] struct {
321- * priorityqueue [T ]
322- metrics queueMetrics [T ]
323- }
324-
325- func (m * metricWrappedQueue [T ]) AddWithOpts (o AddOpts , items ... T ) {
326- for _ , item := range items {
327- m .metrics .add (item )
328- }
329- m .priorityqueue .AddWithOpts (o , items ... )
330- }
331-
332- func (m * metricWrappedQueue [T ]) GetWithPriority () (T , int , bool ) {
333- item , priority , shutdown := m .priorityqueue .GetWithPriority ()
334- m .metrics .get (item )
335- return item , priority , shutdown
336- }
337-
338- func (m * metricWrappedQueue [T ]) Get () (T , bool ) {
339- item , _ , shutdown := m .GetWithPriority ()
340- return item , shutdown
341- }
342-
343- func (m * metricWrappedQueue [T ]) Done (item T ) {
344- m .metrics .done (item )
345- m .priorityqueue .Done (item )
346- }
347-
348- func (m * metricWrappedQueue [T ]) updateUnfinishedWorkLoop () {
349- t := time .NewTicker (time .Millisecond )
318+ func (w * priorityqueue [T ]) updateUnfinishedWorkLoop () {
319+ t := time .NewTicker (500 * time .Millisecond ) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
350320 defer t .Stop ()
351321 for range t .C {
352- if m . priorityqueue . ShuttingDown () {
322+ if w . shutdown . Load () {
353323 return
354324 }
355- m .metrics .updateUnfinishedWork ()
325+ w .metrics .updateUnfinishedWork ()
356326 }
357327}
0 commit comments