Skip to content

Commit cc01508

Browse files
committed
Introduce hook points for the PriorityQueue
This change introduces an interface defining hook points for the PriorityQueue. This helps enabling advanced use cases where reacting to internal events of the priority queue may be necessary, like the first hook point being introduced: OnBecameReady. OnBecameReady is called when items become ready for consumption, i.e. can be handed to GetWithPriority calls. Signed-off-by: Matheus Pimenta <[email protected]>
1 parent f48fe1c commit cc01508

File tree

5 files changed

+315
-1
lines changed

5 files changed

+315
-1
lines changed

pkg/controller/controller.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ type TypedOptions[request comparable] struct {
8181
// Only use a custom NewQueue if you know what you are doing.
8282
NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]
8383

84+
// PriorityQueueOptions are options for the priority queue.
85+
// Only used if UsePriorityQueue is true and NewQueue is not set.
86+
//
87+
// NOTE: LOW LEVEL PRIMITIVE!
88+
// Only pass custom options to the priority queue if you know what you are doing.
89+
PriorityQueueOptions []priorityqueue.Opt[request]
90+
8491
// Logger will be used to build a default LogConstructor if unset.
8592
Logger logr.Logger
8693

@@ -260,10 +267,13 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
260267
if options.NewQueue == nil {
261268
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
262269
if ptr.Deref(options.UsePriorityQueue, true) {
263-
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
270+
var opts []priorityqueue.Opt[request]
271+
opts = append(opts, func(o *priorityqueue.Opts[request]) {
264272
o.Log = options.Logger.WithValues("controller", controllerName)
265273
o.RateLimiter = rateLimiter
266274
})
275+
opts = append(opts, options.PriorityQueueOptions...)
276+
return priorityqueue.New(controllerName, opts...)
267277
}
268278
return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
269279
Name: controllerName,

pkg/controller/controller_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
. "github.com/onsi/gomega"
2525
"go.uber.org/goleak"
2626
corev1 "k8s.io/api/core/v1"
27+
"k8s.io/apimachinery/pkg/types"
2728
"k8s.io/client-go/util/workqueue"
2829
"k8s.io/utils/ptr"
2930

@@ -38,6 +39,22 @@ import (
3839
"sigs.k8s.io/controller-runtime/pkg/source"
3940
)
4041

42+
type mockHooks struct {
43+
onBecameReadyCalls []onBecameReadyCall
44+
}
45+
46+
type onBecameReadyCall struct {
47+
item reconcile.Request
48+
priority int
49+
}
50+
51+
func (m *mockHooks) OnBecameReady(item reconcile.Request, priority int) {
52+
m.onBecameReadyCalls = append(m.onBecameReadyCalls, onBecameReadyCall{
53+
item: item,
54+
priority: priority,
55+
})
56+
}
57+
4158
var _ = Describe("controller.Controller", func() {
4259
rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
4360
return reconcile.Result{}, nil
@@ -476,6 +493,47 @@ var _ = Describe("controller.Controller", func() {
476493
Expect(ok).To(BeFalse())
477494
})
478495

496+
It("should use the PriorityQueueOptions if specified and NewQueue is not specified", func() {
497+
m, err := manager.New(cfg, manager.Options{})
498+
Expect(err).NotTo(HaveOccurred())
499+
500+
customRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Millisecond, 1000*time.Second)
501+
502+
hooks := &mockHooks{}
503+
504+
c, err := controller.New("new-controller-18", m, controller.Options{
505+
Reconciler: reconcile.Func(nil),
506+
RateLimiter: customRateLimiter,
507+
PriorityQueueOptions: []priorityqueue.Opt[reconcile.Request]{
508+
func(o *priorityqueue.Opts[reconcile.Request]) {
509+
o.Hooks = hooks
510+
},
511+
},
512+
})
513+
Expect(err).NotTo(HaveOccurred())
514+
515+
ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request])
516+
Expect(ok).To(BeTrue())
517+
518+
Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter))
519+
q := ctrl.NewQueue("controller-pq-hooks", nil)
520+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
521+
Name: "foo",
522+
Namespace: "bar",
523+
}})
524+
item, shutdown := q.Get()
525+
Expect(shutdown).To(BeFalse())
526+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{
527+
Name: "foo",
528+
Namespace: "bar",
529+
}}))
530+
Expect(hooks.onBecameReadyCalls).To(HaveLen(1))
531+
Expect(hooks.onBecameReadyCalls[0].item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{
532+
Name: "foo",
533+
Namespace: "bar",
534+
}}))
535+
})
536+
479537
It("should set EnableWarmup correctly", func() {
480538
m, err := manager.New(cfg, manager.Options{})
481539
Expect(err).NotTo(HaveOccurred())
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package priorityqueue
2+
3+
// Hooks represents a set of hooks that can be implemented to
4+
// customize the behavior of the priority queue for elements
5+
// of type T.
6+
//
7+
// NOTE: LOW LEVEL PRIMITIVE!
8+
// Implementations must be goroutine-safe and considerate
9+
// of the time spent in each hook, as they may be called
10+
// in performance-sensitive paths. It's recommended to
11+
// use non-blocking operations or offload heavy processing
12+
// to separate goroutines through the use of channels or
13+
// context-aware mechanisms.
14+
type Hooks[T comparable] interface {
15+
// OnBecameReady is called when an item becomes ready to be processed.
16+
// For AddWithOpts() calls that result in the item being added with
17+
// a delay, this hook is called only when the item becomes ready
18+
// after the delay has elapsed.
19+
OnBecameReady(item T, priority int)
20+
}
21+
22+
// hooks is a wrapper around Hooks to allow optional implementation.
23+
type hooks[T comparable] struct {
24+
Hooks[T]
25+
}
26+
27+
func (h hooks[T]) OnBecameReady(item T, priority int) {
28+
if h.Hooks != nil {
29+
h.Hooks.OnBecameReady(item, priority)
30+
}
31+
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package priorityqueue
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
"k8s.io/utils/ptr"
10+
)
11+
12+
type mockHooks struct {
13+
mu sync.Mutex
14+
onBecameReadyCalls []onBecameReadyCall
15+
}
16+
17+
type onBecameReadyCall struct {
18+
item int
19+
priority int
20+
}
21+
22+
func (m *mockHooks) OnBecameReady(item int, priority int) {
23+
m.mu.Lock()
24+
defer m.mu.Unlock()
25+
m.onBecameReadyCalls = append(m.onBecameReadyCalls, onBecameReadyCall{
26+
item: item,
27+
priority: priority,
28+
})
29+
}
30+
31+
func (m *mockHooks) getCalls() []onBecameReadyCall {
32+
m.mu.Lock()
33+
defer m.mu.Unlock()
34+
result := make([]onBecameReadyCall, len(m.onBecameReadyCalls))
35+
copy(result, m.onBecameReadyCalls)
36+
return result
37+
}
38+
39+
var _ = Describe("Hooks", func() {
40+
It("works with nil hooks", func() {
41+
q := New[int]("test")
42+
defer q.ShutDown()
43+
44+
q.Add(10)
45+
item, shutdown := q.Get()
46+
Expect(shutdown).To(BeFalse())
47+
Expect(item).To(Equal(10))
48+
})
49+
50+
It("calls OnBecameReady when item is added without delay", func() {
51+
hooks := &mockHooks{}
52+
q := New("test", func(o *Opts[int]) {
53+
o.Hooks = hooks
54+
})
55+
defer q.ShutDown()
56+
57+
q.AddWithOpts(AddOpts{Priority: ptr.To(5)}, 10)
58+
59+
item, priority, shutdown := q.GetWithPriority()
60+
Expect(shutdown).To(BeFalse())
61+
Expect(item).To(Equal(10))
62+
Expect(priority).To(Equal(5))
63+
64+
calls := hooks.getCalls()
65+
Expect(calls).To(HaveLen(1))
66+
Expect(calls[0]).To(Equal(onBecameReadyCall{item: 10, priority: 5}))
67+
})
68+
69+
It("calls OnBecameReady only once for duplicate items", func() {
70+
hooks := &mockHooks{}
71+
q := New("test", func(o *Opts[int]) {
72+
o.Hooks = hooks
73+
})
74+
defer q.ShutDown()
75+
76+
q.Add(10)
77+
q.Add(10)
78+
q.Add(10)
79+
80+
item, shutdown := q.Get()
81+
Expect(shutdown).To(BeFalse())
82+
Expect(item).To(Equal(10))
83+
84+
calls := hooks.getCalls()
85+
Expect(calls).To(HaveLen(1))
86+
Expect(calls[0].item).To(Equal(10))
87+
})
88+
89+
It("calls OnBecameReady when priority is increased for existing item", func() {
90+
hooks := &mockHooks{}
91+
q := New("test", func(o *Opts[int]) {
92+
o.Hooks = hooks
93+
})
94+
defer q.ShutDown()
95+
96+
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, 10)
97+
q.AddWithOpts(AddOpts{Priority: ptr.To(5)}, 10)
98+
99+
item, priority, shutdown := q.GetWithPriority()
100+
Expect(shutdown).To(BeFalse())
101+
Expect(item).To(Equal(10))
102+
Expect(priority).To(Equal(5))
103+
104+
calls := hooks.getCalls()
105+
Expect(calls).To(HaveLen(1))
106+
Expect(calls[0]).To(Equal(onBecameReadyCall{item: 10, priority: 1}))
107+
})
108+
109+
It("does not call OnBecameReady when item is added with delay", func() {
110+
hooks := &mockHooks{}
111+
q := New("test", func(o *Opts[int]) {
112+
o.Hooks = hooks
113+
})
114+
defer q.ShutDown()
115+
116+
q.AddWithOpts(AddOpts{After: time.Hour}, 10)
117+
118+
Consistently(func() []onBecameReadyCall {
119+
return hooks.getCalls()
120+
}, "100ms").Should(BeEmpty())
121+
})
122+
123+
It("calls OnBecameReady when delayed item becomes ready", func() {
124+
hooks := &mockHooks{}
125+
q := New("test", func(o *Opts[int]) {
126+
o.Hooks = hooks
127+
})
128+
defer q.ShutDown()
129+
130+
pq := q.(*priorityqueue[int])
131+
now := time.Now().Round(time.Second)
132+
nowLock := sync.Mutex{}
133+
tick := make(chan time.Time)
134+
135+
pq.now = func() time.Time {
136+
nowLock.Lock()
137+
defer nowLock.Unlock()
138+
return now
139+
}
140+
pq.tick = func(d time.Duration) <-chan time.Time {
141+
return tick
142+
}
143+
144+
q.AddWithOpts(AddOpts{After: time.Second, Priority: ptr.To(3)}, 10)
145+
146+
Consistently(func() []onBecameReadyCall {
147+
return hooks.getCalls()
148+
}, "100ms").Should(BeEmpty())
149+
150+
// Forward time
151+
nowLock.Lock()
152+
now = now.Add(time.Second)
153+
nowLock.Unlock()
154+
tick <- now
155+
156+
Eventually(func() []onBecameReadyCall {
157+
return hooks.getCalls()
158+
}).Should(HaveLen(1))
159+
160+
calls := hooks.getCalls()
161+
Expect(calls[0]).To(Equal(onBecameReadyCall{item: 10, priority: 3}))
162+
})
163+
164+
It("calls OnBecameReady when delayed item is re-added without delay", func() {
165+
hooks := &mockHooks{}
166+
q := New("test", func(o *Opts[int]) {
167+
o.Hooks = hooks
168+
})
169+
defer q.ShutDown()
170+
171+
q.AddWithOpts(AddOpts{After: time.Hour}, 10)
172+
Expect(hooks.getCalls()).To(BeEmpty())
173+
174+
// Re-add without delay
175+
q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, 10)
176+
177+
Eventually(func() []onBecameReadyCall {
178+
return hooks.getCalls()
179+
}).Should(HaveLen(1))
180+
181+
calls := hooks.getCalls()
182+
Expect(calls[0]).To(Equal(onBecameReadyCall{item: 10, priority: 2}))
183+
})
184+
185+
It("calls OnBecameReady for each unique item", func() {
186+
hooks := &mockHooks{}
187+
q := New("test", func(o *Opts[int]) {
188+
o.Hooks = hooks
189+
})
190+
defer q.ShutDown()
191+
192+
q.Add(10)
193+
q.Add(20)
194+
q.Add(30)
195+
196+
Eventually(func() int {
197+
return len(hooks.getCalls())
198+
}).Should(Equal(3))
199+
200+
calls := hooks.getCalls()
201+
items := make([]int, len(calls))
202+
for i, call := range calls {
203+
items[i] = call.item
204+
}
205+
Expect(items).To(ConsistOf(10, 20, 30))
206+
})
207+
})

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Opts[T comparable] struct {
4343
RateLimiter workqueue.TypedRateLimiter[T]
4444
MetricProvider workqueue.MetricsProvider
4545
Log logr.Logger
46+
Hooks Hooks[T]
4647
}
4748

4849
// Opt allows to configure a PriorityQueue.
@@ -80,6 +81,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
8081
get: make(chan item[T]),
8182
now: time.Now,
8283
tick: time.Tick,
84+
hooks: hooks[T]{opts.Hooks},
8385
}
8486

8587
go pq.spin()
@@ -130,6 +132,9 @@ type priorityqueue[T comparable] struct {
130132
// Configurable for testing
131133
now func() time.Time
132134
tick func(time.Duration) <-chan time.Time
135+
136+
// Hooks to customize behavior
137+
hooks hooks[T]
133138
}
134139

135140
func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
@@ -165,6 +170,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
165170
w.queue.ReplaceOrInsert(item)
166171
if item.ReadyAt == nil {
167172
w.metrics.add(key, item.Priority)
173+
w.hooks.OnBecameReady(key, item.Priority)
168174
}
169175
w.addedCounter++
170176
continue
@@ -184,6 +190,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
184190
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
185191
if readyAt == nil && !w.becameReady.Has(key) {
186192
w.metrics.add(key, item.Priority)
193+
w.hooks.OnBecameReady(key, item.Priority)
187194
}
188195
item.ReadyAt = readyAt
189196
}
@@ -266,6 +273,7 @@ func (w *priorityqueue[T]) spin() {
266273
if !w.becameReady.Has(item.Key) {
267274
w.metrics.add(item.Key, item.Priority)
268275
w.becameReady.Insert(item.Key)
276+
w.hooks.OnBecameReady(item.Key, item.Priority)
269277
}
270278
}
271279

0 commit comments

Comments
 (0)