Skip to content

Commit 82dd8e5

Browse files
committed
WithLowPriorityWhenUnchanged: Set Priority for all add methods
This change makes `WithLowPriorityWhenUnchanged` set the priority for AddAfter, AddRatelimited and AddWithOpts (if not already set) as well.
1 parent e4f9a24 commit 82dd8e5

File tree

2 files changed

+169
-47
lines changed

2 files changed

+169
-47
lines changed

pkg/handler/eventhandler.go

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ package handler
1919
import (
2020
"context"
2121
"reflect"
22+
"time"
2223

2324
"k8s.io/client-go/util/workqueue"
25+
"k8s.io/utils/ptr"
2426
"sigs.k8s.io/controller-runtime/pkg/client"
2527
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2628
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -126,20 +128,14 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr
126128
h.CreateFunc(ctx, e, q)
127129
return
128130
}
129-
wq := workqueueWithCustomAddFunc[request]{
130-
TypedRateLimitingInterface: q,
131+
132+
wq := workqueueWithDefaultPriority[request]{
131133
// We already know that we have a priority queue, that event.Object implements
132134
// client.Object and that its not nil
133-
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
134-
var priority int
135-
if e.IsInInitialList {
136-
priority = LowPriority
137-
}
138-
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
139-
priorityqueue.AddOpts{Priority: &priority},
140-
item,
141-
)
142-
},
135+
PriorityQueue: q.(priorityqueue.PriorityQueue[request]),
136+
}
137+
if e.IsInInitialList {
138+
wq.priority = ptr.To(LowPriority)
143139
}
144140
h.CreateFunc(ctx, e, wq)
145141
}
@@ -160,20 +156,13 @@ func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUp
160156
return
161157
}
162158

163-
wq := workqueueWithCustomAddFunc[request]{
164-
TypedRateLimitingInterface: q,
159+
wq := workqueueWithDefaultPriority[request]{
165160
// We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
166161
// client.Object and that they are not nil
167-
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
168-
var priority int
169-
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
170-
priority = LowPriority
171-
}
172-
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
173-
priorityqueue.AddOpts{Priority: &priority},
174-
item,
175-
)
176-
},
162+
PriorityQueue: q.(priorityqueue.PriorityQueue[request]),
163+
}
164+
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
165+
wq.priority = ptr.To(LowPriority)
177166
}
178167
h.UpdateFunc(ctx, e, wq)
179168
}
@@ -201,13 +190,28 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
201190
}
202191
}
203192

204-
type workqueueWithCustomAddFunc[request comparable] struct {
205-
workqueue.TypedRateLimitingInterface[request]
206-
addFunc func(item request, q workqueue.TypedRateLimitingInterface[request])
193+
type workqueueWithDefaultPriority[request comparable] struct {
194+
priorityqueue.PriorityQueue[request]
195+
priority *int
196+
}
197+
198+
func (w workqueueWithDefaultPriority[request]) Add(item request) {
199+
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: w.priority}, item)
207200
}
208201

209-
func (w workqueueWithCustomAddFunc[request]) Add(item request) {
210-
w.addFunc(item, w.TypedRateLimitingInterface)
202+
func (w workqueueWithDefaultPriority[request]) AddAfter(item request, after time.Duration) {
203+
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: w.priority, After: after}, item)
204+
}
205+
206+
func (w workqueueWithDefaultPriority[request]) AddRateLimited(item request) {
207+
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: w.priority, RateLimited: true}, item)
208+
}
209+
210+
func (w workqueueWithDefaultPriority[request]) AddWithOpts(o priorityqueue.AddOpts, items ...request) {
211+
if o.Priority == nil {
212+
o.Priority = w.priority
213+
}
214+
w.PriorityQueue.AddWithOpts(o, items...)
211215
}
212216

213217
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
@@ -219,11 +223,11 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate
219223
return
220224
}
221225

222-
var priority int
226+
var priority *int
223227
if evt.IsInInitialList {
224-
priority = LowPriority
228+
priority = ptr.To(LowPriority)
225229
}
226-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item)
230+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
227231
}
228232

229233
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
@@ -235,9 +239,9 @@ func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRate
235239
return
236240
}
237241

238-
var priority int
242+
var priority *int
239243
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
240-
priority = LowPriority
244+
priority = ptr.To(LowPriority)
241245
}
242-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item)
246+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
243247
}

pkg/handler/eventhandler_test.go

Lines changed: 130 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package handler_test
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
. "github.com/onsi/ginkgo/v2"
2324
. "github.com/onsi/gomega"
@@ -776,8 +777,11 @@ var _ = Describe("Eventhandler", func() {
776777

777778
Describe("WithLowPriorityWhenUnchanged", func() {
778779
handlerPriorityTests := []struct {
779-
name string
780-
handler func() handler.EventHandler
780+
name string
781+
handler func() handler.EventHandler
782+
after time.Duration
783+
ratelimited bool
784+
overridePriority int
781785
}{
782786
{
783787
name: "WithLowPriorityWhenUnchanged wrapper",
@@ -837,6 +841,103 @@ var _ = Describe("Eventhandler", func() {
837841
})
838842
},
839843
},
844+
{
845+
name: "WithLowPriorityWhenUnchanged - Add",
846+
handler: func() handler.EventHandler {
847+
return handler.WithLowPriorityWhenUnchanged(
848+
handler.TypedFuncs[client.Object, reconcile.Request]{
849+
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
850+
wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{
851+
Namespace: tce.Object.GetNamespace(),
852+
Name: tce.Object.GetName(),
853+
}})
854+
},
855+
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
856+
wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{
857+
Namespace: tue.ObjectNew.GetNamespace(),
858+
Name: tue.ObjectNew.GetName(),
859+
}})
860+
},
861+
})
862+
},
863+
},
864+
{
865+
name: "WithLowPriorityWhenUnchanged - AddAfter",
866+
handler: func() handler.EventHandler {
867+
return handler.WithLowPriorityWhenUnchanged(
868+
handler.TypedFuncs[client.Object, reconcile.Request]{
869+
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
870+
wq.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{
871+
Namespace: tce.Object.GetNamespace(),
872+
Name: tce.Object.GetName(),
873+
}}, time.Second)
874+
},
875+
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
876+
wq.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{
877+
Namespace: tue.ObjectNew.GetNamespace(),
878+
Name: tue.ObjectNew.GetName(),
879+
}}, time.Second)
880+
},
881+
})
882+
},
883+
after: time.Second,
884+
},
885+
{
886+
name: "WithLowPriorityWhenUnchanged - AddRateLimited",
887+
handler: func() handler.EventHandler {
888+
return handler.WithLowPriorityWhenUnchanged(
889+
handler.TypedFuncs[client.Object, reconcile.Request]{
890+
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
891+
wq.AddRateLimited(reconcile.Request{NamespacedName: types.NamespacedName{
892+
Namespace: tce.Object.GetNamespace(),
893+
Name: tce.Object.GetName(),
894+
}})
895+
},
896+
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
897+
wq.AddRateLimited(reconcile.Request{NamespacedName: types.NamespacedName{
898+
Namespace: tue.ObjectNew.GetNamespace(),
899+
Name: tue.ObjectNew.GetName(),
900+
}})
901+
},
902+
})
903+
},
904+
ratelimited: true,
905+
},
906+
{
907+
name: "WithLowPriorityWhenUnchanged - AddWithOpts priority is retained",
908+
handler: func() handler.EventHandler {
909+
return handler.WithLowPriorityWhenUnchanged(
910+
handler.TypedFuncs[client.Object, reconcile.Request]{
911+
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
912+
if pq, isPQ := wq.(priorityqueue.PriorityQueue[reconcile.Request]); isPQ {
913+
pq.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(100)}, reconcile.Request{NamespacedName: types.NamespacedName{
914+
Namespace: tce.Object.GetNamespace(),
915+
Name: tce.Object.GetName(),
916+
}})
917+
return
918+
}
919+
wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{
920+
Namespace: tce.Object.GetNamespace(),
921+
Name: tce.Object.GetName(),
922+
}})
923+
},
924+
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
925+
if pq, isPQ := wq.(priorityqueue.PriorityQueue[reconcile.Request]); isPQ {
926+
pq.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(100)}, reconcile.Request{NamespacedName: types.NamespacedName{
927+
Namespace: tue.ObjectNew.GetNamespace(),
928+
Name: tue.ObjectNew.GetName(),
929+
}})
930+
return
931+
}
932+
wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{
933+
Namespace: tue.ObjectNew.GetNamespace(),
934+
Name: tue.ObjectNew.GetName(),
935+
}})
936+
},
937+
})
938+
},
939+
overridePriority: 100,
940+
},
840941
}
841942
for _, test := range handlerPriorityTests {
842943
When("handler is "+test.name, func() {
@@ -862,7 +963,16 @@ var _ = Describe("Eventhandler", func() {
862963
IsInInitialList: true,
863964
}, wq)
864965

865-
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)}))
966+
expected := handler.LowPriority
967+
if test.overridePriority != 0 {
968+
expected = test.overridePriority
969+
}
970+
971+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{
972+
Priority: ptr.To(expected),
973+
After: test.after,
974+
RateLimited: test.ratelimited,
975+
}))
866976
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
867977
})
868978

@@ -888,10 +998,12 @@ var _ = Describe("Eventhandler", func() {
888998
IsInInitialList: false,
889999
}, wq)
8901000

891-
Expect(actualOpts).To(Or(
892-
Equal(priorityqueue.AddOpts{}),
893-
Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}),
894-
))
1001+
var expectedPriority *int
1002+
if test.overridePriority != 0 {
1003+
expectedPriority = &test.overridePriority
1004+
}
1005+
1006+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: expectedPriority}))
8951007
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
8961008
})
8971009

@@ -922,7 +1034,12 @@ var _ = Describe("Eventhandler", func() {
9221034
}},
9231035
}, wq)
9241036

925-
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)}))
1037+
expectedPriority := handler.LowPriority
1038+
if test.overridePriority != 0 {
1039+
expectedPriority = test.overridePriority
1040+
}
1041+
1042+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(expectedPriority)}))
9261043
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
9271044
})
9281045

@@ -954,10 +1071,11 @@ var _ = Describe("Eventhandler", func() {
9541071
}},
9551072
}, wq)
9561073

957-
Expect(actualOpts).To(Or(
958-
Equal(priorityqueue.AddOpts{}),
959-
Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}),
960-
))
1074+
var expectedPriority *int
1075+
if test.overridePriority != 0 {
1076+
expectedPriority = &test.overridePriority
1077+
}
1078+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: expectedPriority}))
9611079
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
9621080
})
9631081

0 commit comments

Comments
 (0)