@@ -30,6 +30,13 @@ type AddOpts struct {
3030// internally de-duplicates all items that are added to
3131// it. It will use the max of the passed priorities and the
3232// min of possible durations.
33+ //
34+ // When an item that is already enqueued at a lower priority
35+ // is re-enqueued with a higher priority, it will be placed at
36+ // the end among items of the new priority, in order to
37+ // preserve FIFO semantics within each priority level.
38+ // The effective duration (i.e. the ready time) is still
39+ // computed as the minimum across all enqueues.
3340type PriorityQueue [T comparable ] interface {
3441 workqueue.TypedRateLimitingInterface [T ]
3542 AddWithOpts (o AddOpts , Items ... T )
@@ -161,12 +168,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
161168 Priority : ptr .Deref (o .Priority , 0 ),
162169 ReadyAt : readyAt ,
163170 }
171+ w .addedCounter ++
164172 w .items [key ] = item
165173 w .queue .ReplaceOrInsert (item )
166174 if item .ReadyAt == nil {
167175 w .metrics .add (key , item .Priority )
168176 }
169- w .addedCounter ++
170177 continue
171178 }
172179
@@ -179,6 +186,8 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
179186 w .metrics .updateDepthWithPriorityMetric (item .Priority , newPriority )
180187 }
181188 item .Priority = newPriority
189+ item .AddedCounter = w .addedCounter
190+ w .addedCounter ++
182191 }
183192
184193 if item .ReadyAt != nil && (readyAt == nil || readyAt .Before (* item .ReadyAt )) {
0 commit comments