Skip to content

Commit 29780d9

Browse files
committed
implement priority queue for handlers
Signed-off-by: Troy Connor <[email protected]>
1 parent ba53477 commit 29780d9

File tree

9 files changed

+254
-48
lines changed

9 files changed

+254
-48
lines changed

pkg/handler/enqueue.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5252
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5353
return
5454
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
55+
56+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5657
Name: evt.Object.GetName(),
5758
Namespace: evt.Object.GetNamespace(),
58-
}})
59+
}}
60+
61+
addToQueueCreate(q, evt, item)
5962
}
6063

6164
// Update implements EventHandler.
6265
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6366
switch {
6467
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
68+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6669
Name: evt.ObjectNew.GetName(),
6770
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
71+
}}
72+
73+
addToQueueUpdate(q, evt, item)
6974
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
75+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7176
Name: evt.ObjectOld.GetName(),
7277
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
78+
}}
79+
80+
addToQueueUpdate(q, evt, item)
7481
default:
7582
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
7683
}

pkg/handler/enqueue_mapped.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
4646
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
4747
// objects and both sets of Requests are enqueue.
4848
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
49-
return TypedEnqueueRequestsFromMapFunc(fn)
49+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
5050
}
5151

5252
// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection

pkg/handler/enqueue_owner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
4848
//
4949
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
5050
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
51-
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
51+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
5252
}
5353

5454
// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created

pkg/handler/eventhandler.go

Lines changed: 100 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"time"
2222

23+
"k8s.io/apimachinery/pkg/types"
2324
"k8s.io/client-go/util/workqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/client"
2526
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
@@ -83,9 +84,73 @@ type TypedEventHandler[object any, request comparable] interface {
8384
}
8485

8586
var _ EventHandler = Funcs{}
87+
var _ EventHandler = TypedFuncs[client.Object, reconcile.Request]{}
88+
var _ TypedEventHandler[client.Object, reconcile.Request] = TypedFuncs[client.Object, reconcile.Request]{}
8689

8790
// Funcs implements eventhandler.
88-
type Funcs = TypedFuncs[client.Object, reconcile.Request]
91+
type Funcs = funcs[client.Object, reconcile.Request]
92+
93+
// funcs implements eventhandler
94+
type funcs[object client.Object, request reconcile.Request] struct{}
95+
96+
func (f funcs[T, R]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
97+
if isNil(evt.Object) {
98+
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
99+
return
100+
}
101+
102+
item := reconcile.Request{NamespacedName: types.NamespacedName{
103+
Name: evt.Object.GetName(),
104+
Namespace: evt.Object.GetNamespace(),
105+
}}
106+
addToQueueCreate(q, evt, item)
107+
}
108+
109+
// Update implements EventHandler.
110+
func (f funcs[T, R]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
111+
switch {
112+
case !isNil(evt.ObjectNew):
113+
item := reconcile.Request{NamespacedName: types.NamespacedName{
114+
Name: evt.ObjectNew.GetName(),
115+
Namespace: evt.ObjectNew.GetNamespace(),
116+
}}
117+
118+
addToQueueUpdate(q, evt, item)
119+
case !isNil(evt.ObjectOld):
120+
item := reconcile.Request{NamespacedName: types.NamespacedName{
121+
Name: evt.ObjectOld.GetName(),
122+
Namespace: evt.ObjectOld.GetNamespace(),
123+
}}
124+
125+
addToQueueUpdate(q, evt, item)
126+
default:
127+
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
128+
}
129+
}
130+
131+
// Delete implements EventHandler.
132+
func (f funcs[T, R]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
133+
if isNil(evt.Object) {
134+
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
135+
return
136+
}
137+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
138+
Name: evt.Object.GetName(),
139+
Namespace: evt.Object.GetNamespace(),
140+
}})
141+
}
142+
143+
// Generic implements EventHandler.
144+
func (f funcs[T, R]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
145+
if isNil(evt.Object) {
146+
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
147+
return
148+
}
149+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
150+
Name: evt.Object.GetName(),
151+
Namespace: evt.Object.GetNamespace(),
152+
}})
153+
}
89154

90155
// TypedFuncs implements eventhandler.
91156
//
@@ -149,33 +214,15 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
149214
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
150215
TypedRateLimitingInterface: trli,
151216
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
152-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
153-
if !isPriorityQueue {
154-
q.Add(item)
155-
return
156-
}
157-
var priority int
158-
if isObjectUnchanged(tce) {
159-
priority = LowPriority
160-
}
161-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
217+
addToQueueCreate(q, tce, item)
162218
},
163219
})
164220
},
165221
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
166222
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
167223
TypedRateLimitingInterface: trli,
168224
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
169-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
170-
if !isPriorityQueue {
171-
q.Add(item)
172-
return
173-
}
174-
var priority int
175-
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
176-
priority = LowPriority
177-
}
178-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
225+
addToQueueUpdate(q, tue, item)
179226
},
180227
})
181228
},
@@ -199,3 +246,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199246
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200247
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201248
}
249+
250+
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
251+
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
252+
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
253+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
254+
if !isPriorityQueue {
255+
q.Add(item)
256+
return
257+
}
258+
259+
var priority int
260+
if isObjectUnchanged(evt) {
261+
priority = LowPriority
262+
}
263+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
264+
}
265+
266+
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
267+
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
268+
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
269+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
270+
if !isPriorityQueue {
271+
q.Add(item)
272+
return
273+
}
274+
275+
var priority int
276+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
277+
priority = LowPriority
278+
}
279+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
280+
}

pkg/handler/eventhandler_test.go

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ var _ = Describe("Eventhandler", func() {
659659
})
660660

661661
Describe("Funcs", func() {
662-
failingFuncs := handler.Funcs{
662+
failingFuncs := handler.TypedFuncs[client.Object, reconcile.Request]{
663663
CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
664664
defer GinkgoRecover()
665665
Fail("Did not expect CreateEvent to be called.")
@@ -797,6 +797,27 @@ var _ = Describe("Eventhandler", func() {
797797
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
798798
})
799799

800+
It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
801+
actualOpts := priorityqueue.AddOpts{}
802+
var actualRequests []reconcile.Request
803+
wq := &fakePriorityQueue{
804+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
805+
actualOpts = o
806+
actualRequests = items
807+
},
808+
}
809+
810+
h := &handler.EnqueueRequestForObject{}
811+
h.Create(ctx, event.CreateEvent{
812+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
813+
Name: "my-pod",
814+
}},
815+
}, wq)
816+
817+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
818+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
819+
})
820+
800821
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
801822
actualOpts := priorityqueue.AddOpts{}
802823
var actualRequests []reconcile.Request
@@ -819,6 +840,28 @@ var _ = Describe("Eventhandler", func() {
819840
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
820841
})
821842

843+
It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
844+
actualOpts := priorityqueue.AddOpts{}
845+
var actualRequests []reconcile.Request
846+
wq := &fakePriorityQueue{
847+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
848+
actualOpts = o
849+
actualRequests = items
850+
},
851+
}
852+
853+
h := &handler.EnqueueRequestForObject{}
854+
h.Create(ctx, event.CreateEvent{
855+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
856+
Name: "my-pod",
857+
CreationTimestamp: metav1.Now(),
858+
}},
859+
}, wq)
860+
861+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
862+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
863+
})
864+
822865
It("should lower the priority of an update request with unchanged RV", func() {
823866
actualOpts := priorityqueue.AddOpts{}
824867
var actualRequests []reconcile.Request
@@ -843,6 +886,30 @@ var _ = Describe("Eventhandler", func() {
843886
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
844887
})
845888

889+
It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
890+
actualOpts := priorityqueue.AddOpts{}
891+
var actualRequests []reconcile.Request
892+
wq := &fakePriorityQueue{
893+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
894+
actualOpts = o
895+
actualRequests = items
896+
},
897+
}
898+
899+
h := &handler.EnqueueRequestForObject{}
900+
h.Update(ctx, event.UpdateEvent{
901+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
902+
Name: "my-pod",
903+
}},
904+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
905+
Name: "my-pod",
906+
}},
907+
}, wq)
908+
909+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
910+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
911+
})
912+
846913
It("should not lower the priority of an update request with changed RV", func() {
847914
actualOpts := priorityqueue.AddOpts{}
848915
var actualRequests []reconcile.Request
@@ -868,6 +935,31 @@ var _ = Describe("Eventhandler", func() {
868935
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
869936
})
870937

938+
It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
939+
actualOpts := priorityqueue.AddOpts{}
940+
var actualRequests []reconcile.Request
941+
wq := &fakePriorityQueue{
942+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
943+
actualOpts = o
944+
actualRequests = items
945+
},
946+
}
947+
948+
h := &handler.EnqueueRequestForObject{}
949+
h.Update(ctx, event.UpdateEvent{
950+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
951+
Name: "my-pod",
952+
}},
953+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
954+
Name: "my-pod",
955+
ResourceVersion: "1",
956+
}},
957+
}, wq)
958+
959+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
960+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
961+
})
962+
871963
It("should have no effect on create if the workqueue is not a priorityqueue", func() {
872964
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
873965
h.Create(ctx, event.CreateEvent{
@@ -881,6 +973,19 @@ var _ = Describe("Eventhandler", func() {
881973
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
882974
})
883975

976+
It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
977+
h := &handler.EnqueueRequestForObject{}
978+
h.Create(ctx, event.CreateEvent{
979+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
980+
Name: "my-pod",
981+
}},
982+
}, q)
983+
984+
Expect(q.Len()).To(Equal(1))
985+
item, _ := q.Get()
986+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
987+
})
988+
884989
It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
885990
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
886991
h.Update(ctx, event.UpdateEvent{
@@ -896,8 +1001,23 @@ var _ = Describe("Eventhandler", func() {
8961001
item, _ := q.Get()
8971002
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
8981003
})
899-
})
9001004

1005+
It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
1006+
h := &handler.EnqueueRequestForObject{}
1007+
h.Update(ctx, event.UpdateEvent{
1008+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1009+
Name: "my-pod",
1010+
}},
1011+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1012+
Name: "my-pod",
1013+
}},
1014+
}, q)
1015+
1016+
Expect(q.Len()).To(Equal(1))
1017+
item, _ := q.Get()
1018+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
1019+
})
1020+
})
9011021
})
9021022

9031023
type fakePriorityQueue struct {

pkg/internal/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ var _ = Describe("controller", func() {
266266

267267
ins := source.Channel(
268268
ch,
269-
handler.Funcs{
269+
handler.TypedFuncs[client.Object, reconcile.Request]{
270270
GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
271271
defer GinkgoRecover()
272272
close(processed)

0 commit comments

Comments
 (0)