@@ -195,35 +195,39 @@ func (w *priorityqueue[T]) spin() {
195195 w .lockedLock .Lock ()
196196 defer w .lockedLock .Unlock ()
197197
198- w .queue .Ascend (func (item * item [T ]) bool {
199- if w .waiters .Load () == 0 { // no waiters, return as we can not hand anything out anyways
200- return false
201- }
198+ for w .waiters .Load () != 0 {
199+ w .queue .Ascend (func (item * item [T ]) bool {
200+ if w .waiters .Load () == 0 { // no waiters, return as we can not hand anything out anyways
201+ return false
202+ }
203+
204+ // No next element we can process
205+ if item .readyAt != nil && item .readyAt .After (w .now ()) {
206+ readyAt := item .readyAt .Sub (w .now ())
207+ if readyAt <= 0 { // Toctou race with the above check
208+ readyAt = 1
209+ }
210+ nextReady = w .tick (readyAt )
211+ return false
212+ }
202213
203- // No next element we can process
204- if item .readyAt != nil && item .readyAt .After (w .now ()) {
205- readyAt := item .readyAt .Sub (w .now ())
206- if readyAt <= 0 { // Toctou race with the above check
207- readyAt = 1
214+ // Item is locked, we can not hand it out
215+ if w .locked .Has (item .key ) {
216+ return true
208217 }
209- nextReady = w .tick (readyAt )
218+
219+ w .metrics .get (item .key )
220+ w .locked .Insert (item .key )
221+ w .waiters .Add (- 1 )
222+ delete (w .items , item .key )
223+ w .queue .Delete (item )
224+ w .get <- * item
225+
226+ // Return false because continuing with Ascend after deleting an item
227+ // can lead to panics within Ascend.
210228 return false
211- }
212-
213- // Item is locked, we can not hand it out
214- if w .locked .Has (item .key ) {
215- return true
216- }
217-
218- w .metrics .get (item .key )
219- w .locked .Insert (item .key )
220- w .waiters .Add (- 1 )
221- delete (w .items , item .key )
222- w .queue .Delete (item )
223- w .get <- * item
224-
225- return true
226- })
229+ })
230+ }
227231 }()
228232 }
229233}
0 commit comments