1- package controllerworkqueue
1+ package priorityqueue
22
33import (
44 "sync"
@@ -56,7 +56,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5656 opts .MetricProvider = metrics.WorkqueueMetricsProvider {}
5757 }
5858
59- cwq := & controllerworkqueue [T ]{
59+ cwq := & priorityqueue [T ]{
6060 items : map [T ]* item [T ]{},
6161 queue : btree .NewG (32 , less [T ]),
6262 // itemOrWaiterAdded indicates that an item or
@@ -77,7 +77,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
7777 return wrapWithMetrics (cwq , name , opts .MetricProvider )
7878}
7979
80- type controllerworkqueue [T comparable ] struct {
80+ type priorityqueue [T comparable ] struct {
8181 // lock has to be acquired for any access to either items or queue
8282 lock sync.Mutex
8383 items map [T ]* item [T ]
@@ -110,7 +110,7 @@ type controllerworkqueue[T comparable] struct {
110110 tick func (time.Duration ) <- chan time.Time
111111}
112112
113- func (w * controllerworkqueue [T ]) AddWithOpts (o AddOpts , items ... T ) {
113+ func (w * priorityqueue [T ]) AddWithOpts (o AddOpts , items ... T ) {
114114 w .lock .Lock ()
115115 defer w .lock .Unlock ()
116116
@@ -155,14 +155,14 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
155155 }
156156}
157157
158- func (w * controllerworkqueue [T ]) notifyItemOrWaiterAdded () {
158+ func (w * priorityqueue [T ]) notifyItemOrWaiterAdded () {
159159 select {
160160 case w .itemOrWaiterAdded <- struct {}{}:
161161 default :
162162 }
163163}
164164
165- func (w * controllerworkqueue [T ]) spin () {
165+ func (w * priorityqueue [T ]) spin () {
166166 blockForever := make (chan time.Time )
167167 var nextReady <- chan time.Time
168168 nextReady = blockForever
@@ -216,19 +216,19 @@ func (w *controllerworkqueue[T]) spin() {
216216 }
217217}
218218
219- func (w * controllerworkqueue [T ]) Add (item T ) {
219+ func (w * priorityqueue [T ]) Add (item T ) {
220220 w .AddWithOpts (AddOpts {}, item )
221221}
222222
223- func (w * controllerworkqueue [T ]) AddAfter (item T , after time.Duration ) {
223+ func (w * priorityqueue [T ]) AddAfter (item T , after time.Duration ) {
224224 w .AddWithOpts (AddOpts {After : after }, item )
225225}
226226
227- func (w * controllerworkqueue [T ]) AddRateLimited (item T ) {
227+ func (w * priorityqueue [T ]) AddRateLimited (item T ) {
228228 w .AddWithOpts (AddOpts {RateLimited : true }, item )
229229}
230230
231- func (w * controllerworkqueue [T ]) GetWithPriority () (_ T , priority int , shutdown bool ) {
231+ func (w * priorityqueue [T ]) GetWithPriority () (_ T , priority int , shutdown bool ) {
232232 w .waiters .Add (1 )
233233
234234 w .notifyItemOrWaiterAdded ()
@@ -237,40 +237,40 @@ func (w *controllerworkqueue[T]) GetWithPriority() (_ T, priority int, shutdown
237237 return item .key , item .priority , w .shutdown .Load ()
238238}
239239
240- func (w * controllerworkqueue [T ]) Get () (item T , shutdown bool ) {
240+ func (w * priorityqueue [T ]) Get () (item T , shutdown bool ) {
241241 key , _ , shutdown := w .GetWithPriority ()
242242 return key , shutdown
243243}
244244
245- func (w * controllerworkqueue [T ]) Forget (item T ) {
245+ func (w * priorityqueue [T ]) Forget (item T ) {
246246 w .rateLimiter .Forget (item )
247247}
248248
249- func (w * controllerworkqueue [T ]) NumRequeues (item T ) int {
249+ func (w * priorityqueue [T ]) NumRequeues (item T ) int {
250250 return w .rateLimiter .NumRequeues (item )
251251}
252252
253- func (w * controllerworkqueue [T ]) ShuttingDown () bool {
253+ func (w * priorityqueue [T ]) ShuttingDown () bool {
254254 return w .shutdown .Load ()
255255}
256256
257- func (w * controllerworkqueue [T ]) Done (item T ) {
257+ func (w * priorityqueue [T ]) Done (item T ) {
258258 w .lockedLock .Lock ()
259259 defer w .lockedLock .Unlock ()
260260 w .locked .Delete (item )
261261 w .notifyItemOrWaiterAdded ()
262262}
263263
264- func (w * controllerworkqueue [T ]) ShutDown () {
264+ func (w * priorityqueue [T ]) ShutDown () {
265265 w .shutdown .Store (true )
266266 close (w .done )
267267}
268268
269- func (w * controllerworkqueue [T ]) ShutDownWithDrain () {
269+ func (w * priorityqueue [T ]) ShutDownWithDrain () {
270270 w .ShutDown ()
271271}
272272
273- func (w * controllerworkqueue [T ]) Len () int {
273+ func (w * priorityqueue [T ]) Len () int {
274274 w .lock .Lock ()
275275 defer w .lock .Unlock ()
276276
@@ -306,10 +306,10 @@ type item[T comparable] struct {
306306 readyAt * time.Time
307307}
308308
309- func wrapWithMetrics [T comparable ](q * controllerworkqueue [T ], name string , provider workqueue.MetricsProvider ) PriorityQueue [T ] {
309+ func wrapWithMetrics [T comparable ](q * priorityqueue [T ], name string , provider workqueue.MetricsProvider ) PriorityQueue [T ] {
310310 mwq := & metricWrappedQueue [T ]{
311- controllerworkqueue : q ,
312- metrics : newQueueMetrics [T ](provider , name , clock.RealClock {}),
311+ priorityqueue : q ,
312+ metrics : newQueueMetrics [T ](provider , name , clock.RealClock {}),
313313 }
314314
315315 go mwq .updateUnfinishedWorkLoop ()
@@ -318,19 +318,19 @@ func wrapWithMetrics[T comparable](q *controllerworkqueue[T], name string, provi
318318}
319319
320320type metricWrappedQueue [T comparable ] struct {
321- * controllerworkqueue [T ]
321+ * priorityqueue [T ]
322322 metrics queueMetrics [T ]
323323}
324324
325325func (m * metricWrappedQueue [T ]) AddWithOpts (o AddOpts , items ... T ) {
326326 for _ , item := range items {
327327 m .metrics .add (item )
328328 }
329- m .controllerworkqueue .AddWithOpts (o , items ... )
329+ m .priorityqueue .AddWithOpts (o , items ... )
330330}
331331
332332func (m * metricWrappedQueue [T ]) GetWithPriority () (T , int , bool ) {
333- item , priority , shutdown := m .controllerworkqueue .GetWithPriority ()
333+ item , priority , shutdown := m .priorityqueue .GetWithPriority ()
334334 m .metrics .get (item )
335335 return item , priority , shutdown
336336}
@@ -342,14 +342,14 @@ func (m *metricWrappedQueue[T]) Get() (T, bool) {
342342
343343func (m * metricWrappedQueue [T ]) Done (item T ) {
344344 m .metrics .done (item )
345- m .controllerworkqueue .Done (item )
345+ m .priorityqueue .Done (item )
346346}
347347
348348func (m * metricWrappedQueue [T ]) updateUnfinishedWorkLoop () {
349349 t := time .NewTicker (time .Millisecond )
350350 defer t .Stop ()
351351 for range t .C {
352- if m .controllerworkqueue .ShuttingDown () {
352+ if m .priorityqueue .ShuttingDown () {
353353 return
354354 }
355355 m .metrics .updateUnfinishedWork ()
0 commit comments