Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type AddOpts struct {
// internally de-duplicates all items that are added to
// it. It will use the max of the passed priorities and the
// min of possible durations.
//
// When an item that is already enqueued at a lower priority
// is re-enqueued with a higher priority, it will be placed at
// the end among items of the new priority, in order to
// preserve FIFO semantics within each priority level.
// The effective duration (i.e. the ready time) is still
// computed as the minimum across all enqueues.
type PriorityQueue[T comparable] interface {
workqueue.TypedRateLimitingInterface[T]
AddWithOpts(o AddOpts, Items ...T)
Expand Down Expand Up @@ -161,12 +168,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
Priority: ptr.Deref(o.Priority, 0),
ReadyAt: readyAt,
}
w.addedCounter++
w.items[key] = item
w.queue.ReplaceOrInsert(item)
if item.ReadyAt == nil {
w.metrics.add(key, item.Priority)
}
w.addedCounter++
continue
}

Expand All @@ -179,6 +186,8 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority)
}
item.Priority = newPriority
item.AddedCounter = w.addedCounter
w.addedCounter++
}

if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,26 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(depth).To(Equal(0))
}
})

It("follows FIFO order in the new priority queue when item priority changes", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You write items should be ordered by priority first, and by enqueue time (FIFO) within the same priority but you don't provide any justification for that. We generally take the "best" of all Adds we've over observed for a given key, where "best" means whichever gets the item to the front of the queue fastest: Highest priority, lowest after. Why should the queue behave differently when it comes to the addedCounter?

/hold

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The most obvious benefit of the priority queue feature is to optimize controller startup by deferring low-priority events, thereby reducing externally perceived downtime. Based on this scenario, consider the following example.

Assume there is a single worker and 26 items, from A to Z. After the initial list completes, all items are enqueued into the priority queue in alphabetical order. While A is being processed, Z receives an update event, followed by Y, and then X. As a result, these items are moved into the high-priority queue, but the ordering becomes:

High priority: X, Y, Z
Low priority:  B, C, D, …, W

This behavior is unintuitive. Although the items trigger update events in the order Z, then Y, then X, the actual processing order is still dominated by the original alphabetical enqueue order rather than the update time.

From an external observer’s perspective, items in the low-priority queue are often ones that can reasonably be ignored, while the items that truly deserve attention are those that have triggered update events. However, for these important items, the controller does not behave in a FIFO manner. This is particularly surprising given that many users in the Kubernetes ecosystem are aware that controllers are internally implemented as queues and therefore naturally expect FIFO semantics.

Additionally, once the queue (including the low-priority queue) has been completely drained, and we later observe the same update sequence again, the ordering in queue becomes Z, Y, X, which is actually the expected FIFO order. However, this makes the controller’s behavior appear inconsistent to external observers, because the ordering differs significantly from the earlier case where low-priority items were still present in the queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stefan and I had a longer discussion around this and arrived at the conclusion that the change makes sense, because it unbreaks the use-case of "While items from initial list are being processed, incoming update events are processed in fifo order" (which I think you tried to allude to, but I didn't understand initially). This is currently broken, since the items in the initial list are not ordered by anything useful.

For consistency, one could argue it would also make sense to do the same for requeueAfter, but that would break the current "Resync resets after" and any other code that wants to lower after but doesn't know the current priority so we leave it as-is for the moment but can revisit later if someone comes up with a good reason.

Could you please add a go doc about this change and the motivating use-case somewhere?

/hold cancel

q, _ := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{Priority: ptr.To(0)}, "foo")
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "bar")
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo")
Expect(q.Len()).To(Equal(2))

item, priority, _ := q.GetWithPriority()
Expect(item).To(Equal("bar"))
Expect(priority).To(Equal(1))
Expect(q.Len()).To(Equal(1))

item, priority, _ = q.GetWithPriority()
Expect(item).To(Equal("foo"))
Expect(priority).To(Equal(1))
Expect(q.Len()).To(Equal(0))
})
})

func BenchmarkAddGetDone(b *testing.B) {
Expand Down
Loading