Skip to content

✨ WithLowPriorityWhenUnchanged: Set Priority for all add methods #3290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
type AddOpts struct {
After time.Duration
RateLimited bool
Priority int
// Priority is the priority of the item. Higher values
// indicate higher priority.
// Defaults to zero if unset.
Priority *int
}

// PriorityQueue is a priority queue for a controller. It
Expand Down Expand Up @@ -154,7 +157,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
item := &item[T]{
Key: key,
AddedCounter: w.addedCounter,
Priority: o.Priority,
Priority: ptr.Deref(o.Priority, 0),
ReadyAt: readyAt,
}
w.items[key] = item
Expand All @@ -169,12 +172,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
// The b-tree de-duplicates based on ordering and any change here
// will affect the order - Just delete and re-add.
item, _ := w.queue.Delete(w.items[key])
if o.Priority > item.Priority {
if newPriority := ptr.Deref(o.Priority, 0); newPriority > item.Priority {
// Update depth metric only if the item in the queue was already added to the depth metric.
if item.ReadyAt == nil || w.becameReady.Has(key) {
w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority)
w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority)
}
item.Priority = o.Priority
item.Priority = newPriority
}

if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
Expand Down
13 changes: 7 additions & 6 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
)

var _ = Describe("Controllerworkqueue", func() {
Expand Down Expand Up @@ -106,8 +107,8 @@ var _ = Describe("Controllerworkqueue", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{Priority: 1}, "foo")
q.AddWithOpts(AddOpts{Priority: 2}, "foo")
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo")
q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, "foo")

item, priority, _ := q.GetWithPriority()
Expect(item).To(Equal("foo"))
Expand All @@ -126,7 +127,7 @@ var _ = Describe("Controllerworkqueue", func() {
q.AddWithOpts(AddOpts{}, "foo")
q.AddWithOpts(AddOpts{}, "bar")
q.AddWithOpts(AddOpts{}, "baz")
q.AddWithOpts(AddOpts{Priority: 1}, "baz")
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "baz")

item, priority, _ := q.GetWithPriority()
Expect(item).To(Equal("baz"))
Expand Down Expand Up @@ -381,7 +382,7 @@ var _ = Describe("Controllerworkqueue", func() {
if rn < 10 {
q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i))
} else {
q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i))
q.AddWithOpts(AddOpts{Priority: &rn}, fmt.Sprintf("foo%d", i))
}
}

Expand Down Expand Up @@ -623,8 +624,8 @@ func TestFuzzPriorityQueue(t *testing.T) {
defer inQueueLock.Unlock()

q.AddWithOpts(opts, item)
if existingPriority, exists := inQueue[item]; !exists || existingPriority < opts.Priority {
inQueue[item] = opts.Priority
if existingPriority, exists := inQueue[item]; !exists || existingPriority < ptr.Deref(opts.Priority, 0) {
inQueue[item] = ptr.Deref(opts.Priority, 0)
}
}()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -141,7 +142,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
if !ok {
if lowPriority {
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
Priority: LowPriority,
Priority: ptr.To(LowPriority),
}, req)
} else {
q.Add(req)
Expand Down
65 changes: 34 additions & 31 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package handler
import (
"context"
"reflect"
"time"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -126,20 +127,14 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr
h.CreateFunc(ctx, e, q)
return
}
wq := workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: q,

wq := workqueueWithDefaultPriority[request]{
// We already know that we have a priority queue, that event.Object implements
// client.Object and that its not nil
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
var priority int
if e.IsInInitialList {
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
priorityqueue.AddOpts{Priority: priority},
item,
)
},
PriorityQueue: q.(priorityqueue.PriorityQueue[request]),
}
if e.IsInInitialList {
wq.priority = LowPriority
}
h.CreateFunc(ctx, e, wq)
}
Expand All @@ -160,20 +155,13 @@ func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUp
return
}

wq := workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: q,
wq := workqueueWithDefaultPriority[request]{
// We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
// client.Object and that they are not nil
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
var priority int
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
priorityqueue.AddOpts{Priority: priority},
item,
)
},
PriorityQueue: q.(priorityqueue.PriorityQueue[request]),
}
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
wq.priority = LowPriority
}
h.UpdateFunc(ctx, e, wq)
}
Expand Down Expand Up @@ -201,13 +189,28 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
}
}

type workqueueWithCustomAddFunc[request comparable] struct {
workqueue.TypedRateLimitingInterface[request]
addFunc func(item request, q workqueue.TypedRateLimitingInterface[request])
type workqueueWithDefaultPriority[request comparable] struct {
priorityqueue.PriorityQueue[request]
priority int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should also make this a pointer

So that we only call AddWithOpts with a priority if it is actually set (and not &0 if unset)

I know that currently for our priority queue implementation it doesn't matter (as far as I can tell), still wondering if it would be better

}

func (w workqueueWithDefaultPriority[request]) Add(item request) {
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority}, item)
}

func (w workqueueWithCustomAddFunc[request]) Add(item request) {
w.addFunc(item, w.TypedRateLimitingInterface)
func (w workqueueWithDefaultPriority[request]) AddAfter(item request, after time.Duration) {
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority, After: after}, item)
}

func (w workqueueWithDefaultPriority[request]) AddRateLimited(item request) {
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority, RateLimited: true}, item)
}

func (w workqueueWithDefaultPriority[request]) AddWithOpts(o priorityqueue.AddOpts, items ...request) {
if o.Priority == nil {
o.Priority = &w.priority
}
w.PriorityQueue.AddWithOpts(o, items...)
}

// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
Expand All @@ -223,7 +226,7 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate
if evt.IsInInitialList {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here. Wondering if we should just play it safe

	var priority *int
	if evt.IsInInitialList {
		priority = ptr.To(LowPriority)
	}
	priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)

(same in l.245)

}

// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
Expand All @@ -239,5 +242,5 @@ func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRate
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item)
}
Loading