Skip to content

Commit 6923df8

Browse files
committed
implement priority queue for handlers
Signed-off-by: Troy Connor <[email protected]>
1 parent ba53477 commit 6923df8

File tree

9 files changed

+273
-39
lines changed

9 files changed

+273
-39
lines changed

pkg/handler/enqueue.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"k8s.io/apimachinery/pkg/types"
2424
"k8s.io/client-go/util/workqueue"
2525
"sigs.k8s.io/controller-runtime/pkg/client"
26+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2627
"sigs.k8s.io/controller-runtime/pkg/event"
2728
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2829
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -52,25 +53,47 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5253
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5354
return
5455
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
56+
57+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5658
Name: evt.Object.GetName(),
5759
Namespace: evt.Object.GetNamespace(),
58-
}})
60+
}}
61+
62+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
63+
if !isPriorityQueue {
64+
q.Add(item)
65+
return
66+
}
67+
addToQueueCreate(priorityQueue, evt, item)
5968
}
6069

6170
// Update implements EventHandler.
6271
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6372
switch {
6473
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
74+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6675
Name: evt.ObjectNew.GetName(),
6776
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
77+
}}
78+
79+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
80+
if !isPriorityQueue {
81+
q.Add(item)
82+
return
83+
}
84+
addToQueueUpdate(priorityQueue, evt, item)
6985
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
86+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7187
Name: evt.ObjectOld.GetName(),
7288
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
89+
}}
90+
91+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
92+
if !isPriorityQueue {
93+
q.Add(item)
94+
return
95+
}
96+
addToQueueUpdate(priorityQueue, evt, item)
7497
default:
7598
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
7699
}

pkg/handler/enqueue_mapped.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
4646
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
4747
// objects and both sets of Requests are enqueue.
4848
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
49-
return TypedEnqueueRequestsFromMapFunc(fn)
49+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
5050
}
5151

5252
// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection

pkg/handler/enqueue_owner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
4848
//
4949
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
5050
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
51-
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
51+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
5252
}
5353

5454
// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created

pkg/handler/eventhandler.go

Lines changed: 103 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"time"
2222

23+
"k8s.io/apimachinery/pkg/types"
2324
"k8s.io/client-go/util/workqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/client"
2526
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
@@ -82,10 +83,88 @@ type TypedEventHandler[object any, request comparable] interface {
8283
Generic(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
8384
}
8485

85-
var _ EventHandler = Funcs{}
86+
var _ EventHandler = &Funcs{}
8687

8788
// Funcs implements eventhandler.
88-
type Funcs = TypedFuncs[client.Object, reconcile.Request]
89+
type Funcs = funcs[client.Object, reconcile.Request]
90+
91+
// funcs implements eventhandler
92+
type funcs[object client.Object, request reconcile.Request] struct{}
93+
94+
func (f *funcs[T, R]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
95+
if isNil(evt.Object) {
96+
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
97+
return
98+
}
99+
100+
item := reconcile.Request{NamespacedName: types.NamespacedName{
101+
Name: evt.Object.GetName(),
102+
Namespace: evt.Object.GetNamespace(),
103+
}}
104+
105+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
106+
if !isPriorityQueue {
107+
q.Add(item)
108+
return
109+
}
110+
addToQueueCreate(priorityQueue, evt, item)
111+
}
112+
113+
// Update implements EventHandler.
114+
func (f *funcs[T, R]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
115+
switch {
116+
case !isNil(evt.ObjectNew):
117+
item := reconcile.Request{NamespacedName: types.NamespacedName{
118+
Name: evt.ObjectNew.GetName(),
119+
Namespace: evt.ObjectNew.GetNamespace(),
120+
}}
121+
122+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
123+
if !isPriorityQueue {
124+
q.Add(item)
125+
return
126+
}
127+
addToQueueUpdate(priorityQueue, evt, item)
128+
case !isNil(evt.ObjectOld):
129+
item := reconcile.Request{NamespacedName: types.NamespacedName{
130+
Name: evt.ObjectOld.GetName(),
131+
Namespace: evt.ObjectOld.GetNamespace(),
132+
}}
133+
134+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
135+
if !isPriorityQueue {
136+
q.Add(item)
137+
return
138+
}
139+
addToQueueUpdate(priorityQueue, evt, item)
140+
default:
141+
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
142+
}
143+
}
144+
145+
// Delete implements EventHandler.
146+
func (f *funcs[T, R]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
147+
if isNil(evt.Object) {
148+
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
149+
return
150+
}
151+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
152+
Name: evt.Object.GetName(),
153+
Namespace: evt.Object.GetNamespace(),
154+
}})
155+
}
156+
157+
// Generic implements EventHandler.
158+
func (f *funcs[T, R]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
159+
if isNil(evt.Object) {
160+
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
161+
return
162+
}
163+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
164+
Name: evt.Object.GetName(),
165+
Namespace: evt.Object.GetNamespace(),
166+
}})
167+
}
89168

90169
// TypedFuncs implements eventhandler.
91170
//
@@ -154,11 +233,7 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
154233
q.Add(item)
155234
return
156235
}
157-
var priority int
158-
if isObjectUnchanged(tce) {
159-
priority = LowPriority
160-
}
161-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
236+
addToQueueCreate(priorityQueue, tce, item)
162237
},
163238
})
164239
},
@@ -171,11 +246,7 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
171246
q.Add(item)
172247
return
173248
}
174-
var priority int
175-
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
176-
priority = LowPriority
177-
}
178-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
249+
addToQueueUpdate(priorityQueue, tue, item)
179250
},
180251
})
181252
},
@@ -199,3 +270,23 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199270
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200271
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201272
}
273+
274+
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
275+
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
276+
func addToQueueCreate[T client.Object, request comparable](q priorityqueue.PriorityQueue[request], evt event.TypedCreateEvent[T], item request) {
277+
var priority int
278+
if isObjectUnchanged(evt) {
279+
priority = LowPriority
280+
}
281+
q.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
282+
}
283+
284+
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
285+
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
286+
func addToQueueUpdate[T client.Object, request comparable](q priorityqueue.PriorityQueue[request], evt event.TypedUpdateEvent[T], item request) {
287+
var priority int
288+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
289+
priority = LowPriority
290+
}
291+
q.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
292+
}

0 commit comments

Comments
 (0)