@@ -195,35 +195,42 @@ func (w *priorityqueue[T]) spin() {
195195 w .lockedLock .Lock ()
196196 defer w .lockedLock .Unlock ()
197197
198- w .queue .Ascend (func (item * item [T ]) bool {
199- if w .waiters .Load () == 0 { // no waiters, return as we can not hand anything out anyways
200- return false
201- }
198+ breakLoop := false
199+ for ! breakLoop {
200+ w .queue .Ascend (func (item * item [T ]) bool {
201+ if w .waiters .Load () == 0 { // no waiters, return as we can not hand anything out anyways
202+ breakLoop = true
203+ return false
204+ }
205+
206+ // No next element we can process
207+ if item .readyAt != nil && item .readyAt .After (w .now ()) {
208+ readyAt := item .readyAt .Sub (w .now ())
209+ if readyAt <= 0 { // Toctou race with the above check
210+ readyAt = 1
211+ }
212+ nextReady = w .tick (readyAt )
213+ breakLoop = true
214+ return false
215+ }
202216
203- // No next element we can process
204- if item .readyAt != nil && item .readyAt .After (w .now ()) {
205- readyAt := item .readyAt .Sub (w .now ())
206- if readyAt <= 0 { // Toctou race with the above check
207- readyAt = 1
217+ // Item is locked, we can not hand it out
218+ if w .locked .Has (item .key ) {
219+ return true
208220 }
209- nextReady = w .tick (readyAt )
221+
222+ w .metrics .get (item .key )
223+ w .locked .Insert (item .key )
224+ w .waiters .Add (- 1 )
225+ delete (w .items , item .key )
226+ w .queue .Delete (item )
227+ w .get <- * item
228+
229+ // Return false because continuing with Ascend after deleting an item
230+ // can lead to panics within Ascend.
210231 return false
211- }
212-
213- // Item is locked, we can not hand it out
214- if w .locked .Has (item .key ) {
215- return true
216- }
217-
218- w .metrics .get (item .key )
219- w .locked .Insert (item .key )
220- w .waiters .Add (- 1 )
221- delete (w .items , item .key )
222- w .queue .Delete (item )
223- w .get <- * item
224-
225- return true
226- })
232+ })
233+ }
227234 }()
228235 }
229236}
0 commit comments