11package controllerworkqueue
22
33import (
4- "sort"
54 "sync"
65 "sync/atomic"
76 "time"
87
8+ "github.com/google/btree"
99 "k8s.io/apimachinery/pkg/util/sets"
1010 "k8s.io/client-go/util/workqueue"
1111 "k8s.io/utils/clock"
@@ -58,7 +58,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5858
5959 cwq := & controllerworkqueue [T ]{
6060 items : map [T ]* item [T ]{},
61- queue : queue [T ]{} ,
61+ queue : btree . NewG ( 32 , less [T ]) ,
6262 tryPush : make (chan struct {}, 1 ),
6363 rateLimiter : opts .RateLimiter ,
6464 locked : sets.Set [T ]{},
@@ -77,12 +77,16 @@ type controllerworkqueue[T comparable] struct {
7777 // lock has to be acquired for any access to either items or queue
7878 lock sync.Mutex
7979 items map [T ]* item [T ]
80- queue queue [ T ]
80+ queue * btree. BTreeG [ * item [ T ] ]
8181
8282 tryPush chan struct {}
8383
8484 rateLimiter workqueue.TypedRateLimiter [T ]
8585
86+ // addedCounter is a counter of elements added, we need it
87+ // because unixNano is not guaranteed to be unique.
88+ addedCounter uint64
89+
8690 // locked contains the keys we handed out through Get() and that haven't
8791 // yet been returned through Done().
8892 locked sets.Set [T ]
@@ -106,7 +110,6 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
106110 w .lock .Lock ()
107111 defer w .lock .Unlock ()
108112
109- var hadChanges bool
110113 for _ , key := range items {
111114 if o .RateLimited {
112115 after := w .rateLimiter .When (key )
@@ -121,30 +124,30 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
121124 }
122125 if _ , ok := w .items [key ]; ! ok {
123126 item := & item [T ]{
124- key : key ,
125- priority : o .Priority ,
126- readyAt : readyAt ,
127+ key : key ,
128+ addedAtUnixNano : w .now ().UnixNano (),
129+ addedCounter : w .addedCounter ,
130+ priority : o .Priority ,
131+ readyAt : readyAt ,
127132 }
128133 w .items [key ] = item
129- w .queue = append ( w . queue , item )
130- hadChanges = true
134+ w .queue . ReplaceOrInsert ( item )
135+ w . addedCounter ++
131136 continue
132137 }
133138
134- if o .Priority > w .items [key ].priority {
135- w .items [key ].priority = o .Priority
136- hadChanges = true
139+ // The b-tree de-duplicates based on ordering and any change here
140+ // will affect the order - Just delete and re-add.
141+ item , _ := w .queue .Delete (w .items [key ])
142+ if o .Priority > item .priority {
143+ item .priority = o .Priority
137144 }
138145
139- if w .items [key ].readyAt != nil && (readyAt == nil || readyAt .Before (* w .items [key ].readyAt )) {
140- w .items [key ].readyAt = readyAt
141- hadChanges = true
146+ if item .readyAt != nil && (readyAt == nil || readyAt .Before (* item .readyAt )) {
147+ item .readyAt = readyAt
142148 }
143- }
144149
145- if hadChanges {
146- sort .Stable (w .queue )
147- w .doTryPush ()
150+ w .queue .ReplaceOrInsert (item )
148151 }
149152}
150153
@@ -176,42 +179,30 @@ func (w *controllerworkqueue[T]) spin() {
176179 w .lockedLock .Lock ()
177180 defer w .lockedLock .Unlock ()
178181
179- // toRemove is a list of indexes to remove from the queue.
180- // We can not do it in-place as we would be manipulating the
181- // slice we are iterating over. We have to do it backwards, as
182- // otherwise the indexes become invalid.
183- var toRemove []int
184- defer func () {
185- for i := len (toRemove ) - 1 ; i >= 0 ; i -- {
186- idxToRemove := toRemove [i ]
187- if idxToRemove == len (w .queue )- 1 {
188- w .queue = w .queue [:idxToRemove ]
189- } else {
190- w .queue = append (w .queue [:idxToRemove ], w .queue [idxToRemove + 1 :]... )
191- }
192- }
193- }()
194- for idx , item := range w .queue {
182+ w .queue .Ascend (func (item * item [T ]) bool {
195183 if w .waiters .Load () == 0 { // no waiters, return as we can not hand anything out anyways
196- return
184+ return false
197185 }
186+
198187 // No next element we can process
199- if w . queue [ 0 ]. readyAt != nil && w . queue [ 0 ] .readyAt .After (w .now ()) {
200- nextReady = w .tick (w . queue [ 0 ] .readyAt .Sub (w .now ()))
201- return
188+ if item . readyAt != nil && item .readyAt .After (w .now ()) {
189+ nextReady = w .tick (item .readyAt .Sub (w .now ()))
190+ return false
202191 }
203192
204193 // Item is locked, we can not hand it out
205194 if w .locked .Has (item .key ) {
206- continue
195+ return true
207196 }
208197
209198 w .get <- * item
210199 w .locked .Insert (item .key )
211- delete (w .items , item .key )
212200 w .waiters .Add (- 1 )
213- toRemove = append (toRemove , idx )
214- }
201+ delete (w .items , item .key )
202+ w .queue .Delete (item )
203+
204+ return true
205+ })
215206 }()
216207 }
217208}
@@ -274,37 +265,36 @@ func (w *controllerworkqueue[T]) Len() int {
274265 w .lock .Lock ()
275266 defer w .lock .Unlock ()
276267
277- return len ( w .queue )
268+ return w .queue . Len ( )
278269}
279270
280- // queue is the actual queue. It implements heap.Interface.
281- type queue [T comparable ] []* item [T ]
282-
283- func (q queue [T ]) Len () int {
284- return len (q )
285- }
286-
287- func (q queue [T ]) Less (i , j int ) bool {
288- switch {
289- case q [i ].readyAt == nil && q [j ].readyAt != nil :
271+ func less [T comparable ](a , b * item [T ]) bool {
272+ if a .readyAt == nil && b .readyAt != nil {
290273 return true
291- case q [i ].readyAt != nil && q [j ].readyAt == nil :
274+ }
275+ if a .readyAt != nil && b .readyAt == nil {
292276 return false
293- case q [i ].readyAt != nil && q [j ].readyAt != nil :
294- return q [i ].readyAt .Before (* q [j ].readyAt )
277+ }
278+ if a .readyAt != nil && b .readyAt != nil && ! a .readyAt .Equal (* b .readyAt ) {
279+ return a .readyAt .Before (* b .readyAt )
280+ }
281+ if a .priority != b .priority {
282+ return a .priority > b .priority
295283 }
296284
297- return q [i ].priority > q [j ].priority
298- }
285+ if a .addedAtUnixNano != b .addedAtUnixNano {
286+ return a .addedAtUnixNano < b .addedAtUnixNano
287+ }
299288
300- func (q queue [T ]) Swap (i , j int ) {
301- q [i ], q [j ] = q [j ], q [i ]
289+ return a .addedCounter < b .addedCounter
302290}
303291
304292type item [T comparable ] struct {
305- key T
306- priority int
307- readyAt * time.Time
293+ key T
294+ addedAtUnixNano int64
295+ addedCounter uint64
296+ priority int
297+ readyAt * time.Time
308298}
309299
310300func wrapWithMetrics [T comparable ](q * controllerworkqueue [T ], name string , provider workqueue.MetricsProvider ) PriorityQueue [T ] {
0 commit comments