Skip to content

Commit 542acae

Browse files
authored
Merge pull request #3289 from alvaroaleman/ptr
⚠️ Priorityqueue: Make priority opt a pointer
2 parents 28871a1 + 03c44f5 commit 542acae

File tree

7 files changed

+41
-29
lines changed

7 files changed

+41
-29
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ import (
1919
type AddOpts struct {
2020
After time.Duration
2121
RateLimited bool
22-
Priority int
22+
// Priority is the priority of the item. Higher values
23+
// indicate higher priority.
24+
// Defaults to zero if unset.
25+
Priority *int
2326
}
2427

2528
// PriorityQueue is a priority queue for a controller. It
@@ -154,7 +157,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
154157
item := &item[T]{
155158
Key: key,
156159
AddedCounter: w.addedCounter,
157-
Priority: o.Priority,
160+
Priority: ptr.Deref(o.Priority, 0),
158161
ReadyAt: readyAt,
159162
}
160163
w.items[key] = item
@@ -169,12 +172,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
169172
// The b-tree de-duplicates based on ordering and any change here
170173
// will affect the order - Just delete and re-add.
171174
item, _ := w.queue.Delete(w.items[key])
172-
if o.Priority > item.Priority {
175+
if newPriority := ptr.Deref(o.Priority, 0); newPriority > item.Priority {
173176
// Update depth metric only if the item in the queue was already added to the depth metric.
174177
if item.ReadyAt == nil || w.becameReady.Has(key) {
175-
w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority)
178+
w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority)
176179
}
177-
item.Priority = o.Priority
180+
item.Priority = newPriority
178181
}
179182

180183
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
. "github.com/onsi/gomega"
1313
"k8s.io/apimachinery/pkg/util/sets"
1414
"k8s.io/client-go/util/workqueue"
15+
"k8s.io/utils/ptr"
1516
)
1617

1718
var _ = Describe("Controllerworkqueue", func() {
@@ -106,8 +107,8 @@ var _ = Describe("Controllerworkqueue", func() {
106107
q, metrics := newQueue()
107108
defer q.ShutDown()
108109

109-
q.AddWithOpts(AddOpts{Priority: 1}, "foo")
110-
q.AddWithOpts(AddOpts{Priority: 2}, "foo")
110+
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo")
111+
q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, "foo")
111112

112113
item, priority, _ := q.GetWithPriority()
113114
Expect(item).To(Equal("foo"))
@@ -126,7 +127,7 @@ var _ = Describe("Controllerworkqueue", func() {
126127
q.AddWithOpts(AddOpts{}, "foo")
127128
q.AddWithOpts(AddOpts{}, "bar")
128129
q.AddWithOpts(AddOpts{}, "baz")
129-
q.AddWithOpts(AddOpts{Priority: 1}, "baz")
130+
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "baz")
130131

131132
item, priority, _ := q.GetWithPriority()
132133
Expect(item).To(Equal("baz"))
@@ -381,7 +382,7 @@ var _ = Describe("Controllerworkqueue", func() {
381382
if rn < 10 {
382383
q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i))
383384
} else {
384-
q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i))
385+
q.AddWithOpts(AddOpts{Priority: &rn}, fmt.Sprintf("foo%d", i))
385386
}
386387
}
387388

@@ -623,8 +624,8 @@ func TestFuzzPriorityQueue(t *testing.T) {
623624
defer inQueueLock.Unlock()
624625

625626
q.AddWithOpts(opts, item)
626-
if existingPriority, exists := inQueue[item]; !exists || existingPriority < opts.Priority {
627-
inQueue[item] = opts.Priority
627+
if existingPriority, exists := inQueue[item]; !exists || existingPriority < ptr.Deref(opts.Priority, 0) {
628+
inQueue[item] = ptr.Deref(opts.Priority, 0)
628629
}
629630
}()
630631
}

pkg/handler/enqueue_mapped.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121

2222
"k8s.io/client-go/util/workqueue"
23+
"k8s.io/utils/ptr"
2324
"sigs.k8s.io/controller-runtime/pkg/client"
2425
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2526
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -141,7 +142,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
141142
if !ok {
142143
if lowPriority {
143144
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
144-
Priority: LowPriority,
145+
Priority: ptr.To(LowPriority),
145146
}, req)
146147
} else {
147148
q.Add(req)

pkg/handler/eventhandler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr
136136
priority = LowPriority
137137
}
138138
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
139-
priorityqueue.AddOpts{Priority: priority},
139+
priorityqueue.AddOpts{Priority: &priority},
140140
item,
141141
)
142142
},
@@ -170,7 +170,7 @@ func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUp
170170
priority = LowPriority
171171
}
172172
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
173-
priorityqueue.AddOpts{Priority: priority},
173+
priorityqueue.AddOpts{Priority: &priority},
174174
item,
175175
)
176176
},
@@ -223,7 +223,7 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate
223223
if evt.IsInInitialList {
224224
priority = LowPriority
225225
}
226-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
226+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item)
227227
}
228228

229229
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
@@ -239,5 +239,5 @@ func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRate
239239
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
240240
priority = LowPriority
241241
}
242-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
242+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &priority}, item)
243243
}

pkg/handler/eventhandler_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,7 @@ var _ = Describe("Eventhandler", func() {
862862
IsInInitialList: true,
863863
}, wq)
864864

865-
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
865+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)}))
866866
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
867867
})
868868

@@ -888,7 +888,10 @@ var _ = Describe("Eventhandler", func() {
888888
IsInInitialList: false,
889889
}, wq)
890890

891-
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
891+
Expect(actualOpts).To(Or(
892+
Equal(priorityqueue.AddOpts{}),
893+
Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}),
894+
))
892895
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
893896
})
894897

@@ -919,7 +922,7 @@ var _ = Describe("Eventhandler", func() {
919922
}},
920923
}, wq)
921924

922-
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
925+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)}))
923926
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
924927
})
925928

@@ -951,7 +954,10 @@ var _ = Describe("Eventhandler", func() {
951954
}},
952955
}, wq)
953956

954-
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
957+
Expect(actualOpts).To(Or(
958+
Equal(priorityqueue.AddOpts{}),
959+
Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}),
960+
))
955961
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
956962
})
957963

pkg/internal/controller/controller.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3131
"k8s.io/apimachinery/pkg/util/uuid"
3232
"k8s.io/client-go/util/workqueue"
33+
"k8s.io/utils/ptr"
3334

3435
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
3536
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -449,7 +450,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request,
449450
if errors.Is(err, reconcile.TerminalError(nil)) {
450451
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
451452
} else {
452-
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
453+
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req)
453454
}
454455
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
455456
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
@@ -464,11 +465,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request,
464465
// We need to drive to stable reconcile loops before queuing due
465466
// to result.RequestAfter
466467
c.Queue.Forget(req)
467-
c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req)
468+
c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: ptr.To(priority)}, req)
468469
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
469470
case result.Requeue: //nolint: staticcheck // We have to handle it until it is removed
470471
log.V(5).Info("Reconcile done, requeueing")
471-
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
472+
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req)
472473
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
473474
default:
474475
log.V(5).Info("Reconcile successful")

pkg/internal/controller/controller_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ var _ = Describe("controller", func() {
702702
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
703703
}()
704704

705-
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
705+
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request)
706706

707707
By("Invoking Reconciler which will request a requeue")
708708
fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil)
@@ -714,7 +714,7 @@ var _ = Describe("controller", func() {
714714
}).Should(Equal([]priorityQueueAddition{{
715715
AddOpts: priorityqueue.AddOpts{
716716
RateLimited: true,
717-
Priority: 10,
717+
Priority: ptr.To(10),
718718
},
719719
items: []reconcile.Request{request},
720720
}}))
@@ -761,7 +761,7 @@ var _ = Describe("controller", func() {
761761
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
762762
}()
763763

764-
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
764+
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request)
765765

766766
By("Invoking Reconciler which will ask for RequeueAfter")
767767
fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil)
@@ -773,7 +773,7 @@ var _ = Describe("controller", func() {
773773
}).Should(Equal([]priorityQueueAddition{{
774774
AddOpts: priorityqueue.AddOpts{
775775
After: time.Millisecond * 100,
776-
Priority: 10,
776+
Priority: ptr.To(10),
777777
},
778778
items: []reconcile.Request{request},
779779
}}))
@@ -819,7 +819,7 @@ var _ = Describe("controller", func() {
819819
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
820820
}()
821821

822-
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
822+
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request)
823823

824824
By("Invoking Reconciler which will return an error")
825825
fakeReconcile.AddResult(reconcile.Result{}, errors.New("oups, I did it again"))
@@ -831,7 +831,7 @@ var _ = Describe("controller", func() {
831831
}).Should(Equal([]priorityQueueAddition{{
832832
AddOpts: priorityqueue.AddOpts{
833833
RateLimited: true,
834-
Priority: 10,
834+
Priority: ptr.To(10),
835835
},
836836
items: []reconcile.Request{request},
837837
}}))

0 commit comments

Comments
 (0)