Skip to content

Commit 3d06304

Browse files
committed
Fixed the priority queue implementation
Signed-off-by: michaelawyu <[email protected]>
1 parent 83bc664 commit 3d06304

File tree

6 files changed

+917
-176
lines changed

6 files changed

+917
-176
lines changed

cmd/memberagent/main.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
373373
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
374374
return err
375375
}
376-
// create the work controller, so we can pass it to the internal member cluster reconciler
376+
// Set up the work applier. Note that it is referenced by the InternalMemberCluster controller.
377377

378378
// Set up the requeue rate limiter for the work applier.
379379
//
@@ -413,7 +413,8 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
413413
*workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs,
414414
)
415415

416-
workController := workapplier.NewReconciler(
416+
workObjAgeForPrioritizedProcessing := time.Minute * time.Duration(*watchWorkReconcileAgeMinutes)
417+
workApplier := workapplier.NewReconciler(
417418
hubMgr.GetClient(),
418419
targetNS,
419420
spokeDynamicClient,
@@ -426,12 +427,12 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
426427
// Use the default worker count (4) for parallelized manifest processing.
427428
parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
428429
time.Minute*time.Duration(*deletionWaitTime),
429-
*watchWorkWithPriorityQueue,
430-
*watchWorkReconcileAgeMinutes,
431430
requeueRateLimiter,
431+
*watchWorkWithPriorityQueue,
432+
workObjAgeForPrioritizedProcessing,
432433
)
433434

434-
if err = workController.SetupWithManager(hubMgr); err != nil {
435+
if err = workApplier.SetupWithManager(hubMgr); err != nil {
435436
klog.ErrorS(err, "Failed to create v1beta1 controller", "controller", "work")
436437
return err
437438
}
@@ -459,7 +460,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
459460
ctx,
460461
hubMgr.GetClient(),
461462
memberMgr.GetConfig(), memberMgr.GetClient(),
462-
workController,
463+
workApplier,
463464
pp)
464465
if err != nil {
465466
klog.ErrorS(err, "Failed to create InternalMemberCluster v1beta1 reconciler")

pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {
379379

380380
// This controller is created for testing purposes only; no reconciliation loop is actually
381381
// run.
382-
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
382+
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0)
383383

384384
propertyProvider1 = &manuallyUpdatedProvider{}
385385
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
@@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {
402402

403403
// This controller is created for testing purposes only; no reconciliation loop is actually
404404
// run.
405-
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
405+
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0)
406406

407407
member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
408408
Expect(err).NotTo(HaveOccurred())

pkg/controllers/workapplier/controller.go

Lines changed: 67 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package workapplier
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"time"
2324

2425
"go.uber.org/atomic"
25-
"k8s.io/apimachinery/pkg/api/equality"
2626
apierrors "k8s.io/apimachinery/pkg/api/errors"
2727
"k8s.io/apimachinery/pkg/api/meta"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -41,149 +41,30 @@ import (
4141
ctrloption "sigs.k8s.io/controller-runtime/pkg/controller"
4242
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
4343
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
44-
"sigs.k8s.io/controller-runtime/pkg/event"
4544
"sigs.k8s.io/controller-runtime/pkg/predicate"
4645
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4746

4847
fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
49-
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
5048
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
5149
"github.com/kubefleet-dev/kubefleet/pkg/utils/defaulter"
5250
parallelizerutil "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
5351
)
5452

5553
const (
5654
patchDetailPerObjLimit = 100
55+
56+
minWorkObjAgeForPrioritizedQueueing = time.Minute * 30
5757
)
5858

5959
const (
6060
workFieldManagerName = "work-api-agent"
6161
)
6262

63-
var (
64-
workAgeToReconcile = 1 * time.Hour
65-
)
66-
67-
// Custom type to hold a reconcile.Request and a priority value
68-
type priorityQueueItem struct {
69-
reconcile.Request
70-
Priority int
71-
}
72-
73-
// PriorityQueueEventHandler is a custom event handler for adding objects to the priority queue.
74-
type PriorityQueueEventHandler struct {
75-
Queue priorityqueue.PriorityQueue[priorityQueueItem] // The priority queue to manage events
76-
Client client.Client // store the client to make API calls
77-
}
78-
79-
// Implement priorityqueue.Item interface for priorityQueueItem
80-
func (i priorityQueueItem) GetPriority() int {
81-
return i.Priority
82-
}
83-
84-
func (h *PriorityQueueEventHandler) WorkPendingApply(ctx context.Context, obj client.Object) bool {
85-
var work fleetv1beta1.Work
86-
ns := obj.GetNamespace()
87-
name := obj.GetName()
88-
err := h.Client.Get(ctx, client.ObjectKey{
89-
Namespace: ns,
90-
Name: name,
91-
}, &work)
92-
if err != nil {
93-
// Log and return
94-
klog.ErrorS(err, "Failed to get the work", "name", name, "ns", ns)
95-
return true
96-
}
97-
availCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)
98-
appliedCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied)
99-
100-
if availCond != nil && appliedCond != nil {
101-
// check if the object has been recently modified
102-
availCondLastUpdatedTime := availCond.LastTransitionTime.Time
103-
appliedCondLastUpdatedTime := appliedCond.LastTransitionTime.Time
104-
if time.Since(availCondLastUpdatedTime) < workAgeToReconcile || time.Since(appliedCondLastUpdatedTime) < workAgeToReconcile {
105-
return true
106-
}
107-
}
108-
109-
if condition.IsConditionStatusTrue(availCond, work.GetGeneration()) &&
110-
condition.IsConditionStatusTrue(appliedCond, work.GetGeneration()) {
111-
return false
112-
}
113-
114-
// Work not yet applied
115-
return true
116-
}
117-
118-
func (h *PriorityQueueEventHandler) AddToPriorityQueue(ctx context.Context, obj client.Object, alwaysAdd bool) {
119-
req := reconcile.Request{
120-
NamespacedName: types.NamespacedName{
121-
Namespace: obj.GetNamespace(),
122-
Name: obj.GetName(),
123-
},
124-
}
125-
126-
objAge := time.Since(obj.GetCreationTimestamp().Time)
127-
128-
var objPriority int
129-
if alwaysAdd || objAge < workAgeToReconcile || h.WorkPendingApply(ctx, obj) {
130-
// Newer or pending objects get higher priority
131-
// Negate the Unix timestamp to give higher priority to newer timestamps
132-
objPriority = -int(time.Now().Unix())
133-
} else {
134-
// skip adding older objects with no changes
135-
klog.V(2).InfoS("adding old item to priorityQueueItem", "obj", req.Name, "age", objAge)
136-
objPriority = int(obj.GetCreationTimestamp().Unix())
137-
}
138-
139-
// Create the custom priorityQueueItem with the request and priority
140-
item := priorityQueueItem{
141-
Request: req,
142-
Priority: objPriority,
143-
}
144-
145-
h.Queue.Add(item)
146-
klog.V(2).InfoS("Created PriorityQueueItem", "priority", objPriority, "obj", req.Name, "queue size", h.Queue.Len())
147-
}
148-
149-
func (h *PriorityQueueEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
150-
h.AddToPriorityQueue(ctx, evt.Object, false)
151-
}
152-
153-
func (h *PriorityQueueEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
154-
h.AddToPriorityQueue(ctx, evt.Object, true)
155-
}
156-
157-
func (h *PriorityQueueEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
158-
// Ignore updates where only the status changed
159-
oldObj := evt.ObjectOld.DeepCopyObject()
160-
newObj := evt.ObjectNew.DeepCopyObject()
161-
162-
// Zero out the status
163-
if oldWork, ok := oldObj.(*fleetv1beta1.Work); ok {
164-
oldWork.Status = fleetv1beta1.WorkStatus{}
165-
}
166-
if newWork, ok := newObj.(*fleetv1beta1.Work); ok {
167-
newWork.Status = fleetv1beta1.WorkStatus{}
168-
}
169-
170-
if !equality.Semantic.DeepEqual(oldObj, newObj) {
171-
// ignore status changes to prevent noise
172-
h.AddToPriorityQueue(ctx, evt.ObjectNew, true)
173-
return
174-
}
175-
klog.V(4).InfoS("ignoring update event with only status change", "work", evt.ObjectNew.GetName())
176-
}
177-
178-
func (h *PriorityQueueEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
179-
h.AddToPriorityQueue(ctx, evt.Object, false)
180-
}
181-
18263
var defaultRequeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter = NewRequeueMultiStageWithExponentialBackoffRateLimiter(
18364
// Allow 1 attempt of fixed delay; this helps give objects a bit of headroom to get available (or have
18465
// diffs reported).
18566
1,
186-
// Use a fixed delay of 5 seconds for the first two attempts.
67+
// Use a fixed delay of 5 seconds for the first attempt.
18768
//
18869
// Important (chenyu1): before the introduction of the requeue rate limiter, the work
18970
// applier uses static requeue intervals, specifically 5 seconds (if the work object is unavailable),
@@ -216,19 +97,24 @@ var defaultRequeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimite
21697

21798
// Reconciler reconciles a Work object.
21899
type Reconciler struct {
219-
hubClient client.Client
220-
workNameSpace string
221-
spokeDynamicClient dynamic.Interface
222-
spokeClient client.Client
223-
restMapper meta.RESTMapper
224-
recorder record.EventRecorder
225-
concurrentReconciles int
226-
watchWorkWithPriorityQueue bool
227-
watchWorkReconcileAgeMinutes int
228-
deletionWaitTime time.Duration
229-
joined *atomic.Bool
230-
parallelizer parallelizerutil.Parallelizer
231-
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter
100+
hubClient client.Client
101+
workNameSpace string
102+
spokeDynamicClient dynamic.Interface
103+
spokeClient client.Client
104+
restMapper meta.RESTMapper
105+
recorder record.EventRecorder
106+
concurrentReconciles int
107+
deletionWaitTime time.Duration
108+
joined *atomic.Bool
109+
parallelizer parallelizerutil.Parallelizer
110+
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter
111+
usePriorityQueue bool
112+
workObjAgeForPrioritizedProcessing time.Duration
113+
// The custom priority queue in use if the option watchWorkWithPriorityQueue is enabled.
114+
//
115+
// Note that this variable is set only after the controller starts.
116+
pq priorityqueue.PriorityQueue[reconcile.Request]
117+
pqSetupOnce sync.Once
232118
}
233119

234120
// NewReconciler returns a new Work object reconciler for the work applier.
@@ -239,9 +125,9 @@ func NewReconciler(
239125
concurrentReconciles int,
240126
parallelizer parallelizerutil.Parallelizer,
241127
deletionWaitTime time.Duration,
242-
watchWorkWithPriorityQueue bool,
243-
watchWorkReconcileAgeMinutes int,
244128
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter,
129+
usePriorityQueue bool,
130+
workObjAgeForPrioritizedProcessing time.Duration,
245131
) *Reconciler {
246132
if requeueRateLimiter == nil {
247133
klog.V(2).InfoS("requeue rate limiter is not set; using the default rate limiter")
@@ -252,23 +138,37 @@ func NewReconciler(
252138
parallelizer = parallelizerutil.NewParallelizer(1)
253139
}
254140

141+
woAgeForPrioritizedProcessing := workObjAgeForPrioritizedProcessing
142+
if usePriorityQueue && woAgeForPrioritizedProcessing < minWorkObjAgeForPrioritizedQueueing {
143+
klog.V(2).InfoS("Work object age for prioritized processing is too short; set to the longer default", "workObjAgeForPrioritizedProcessing", woAgeForPrioritizedProcessing)
144+
woAgeForPrioritizedProcessing = minWorkObjAgeForPrioritizedQueueing
145+
}
146+
255147
return &Reconciler{
256-
hubClient: hubClient,
257-
spokeDynamicClient: spokeDynamicClient,
258-
spokeClient: spokeClient,
259-
restMapper: restMapper,
260-
recorder: recorder,
261-
concurrentReconciles: concurrentReconciles,
262-
parallelizer: parallelizer,
263-
watchWorkWithPriorityQueue: watchWorkWithPriorityQueue,
264-
watchWorkReconcileAgeMinutes: watchWorkReconcileAgeMinutes,
265-
workNameSpace: workNameSpace,
266-
joined: atomic.NewBool(false),
267-
deletionWaitTime: deletionWaitTime,
268-
requeueRateLimiter: requeueRateLimiter,
148+
hubClient: hubClient,
149+
spokeDynamicClient: spokeDynamicClient,
150+
spokeClient: spokeClient,
151+
restMapper: restMapper,
152+
recorder: recorder,
153+
concurrentReconciles: concurrentReconciles,
154+
parallelizer: parallelizer,
155+
workNameSpace: workNameSpace,
156+
joined: atomic.NewBool(false),
157+
deletionWaitTime: deletionWaitTime,
158+
requeueRateLimiter: requeueRateLimiter,
159+
usePriorityQueue: usePriorityQueue,
160+
workObjAgeForPrioritizedProcessing: woAgeForPrioritizedProcessing,
269161
}
270162
}
271163

164+
// PriorityQueue returns the priority queue (if any) in use by the reconciler.
165+
//
166+
// Note that the priority queue is only set after the reconciler starts (i.e., the work applier
167+
// has been set up with the controller manager).
168+
func (r *Reconciler) PriorityQueue() priorityqueue.PriorityQueue[reconcile.Request] {
169+
return r.pq
170+
}
171+
272172
type ManifestProcessingApplyOrReportDiffResultType string
273173

274174
const (
@@ -728,22 +628,29 @@ func (r *Reconciler) Leave(ctx context.Context) error {
728628

729629
// SetupWithManager wires up the controller.
730630
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
731-
// Create the priority queue using the rate limiter and a queue name
732-
queue := priorityqueue.New[priorityQueueItem]("apply-work-queue")
631+
if r.usePriorityQueue {
632+
eventHandler := &priorityBasedWorkObjEventHandler{
633+
qm: r,
634+
workObjAgeForPrioritizedProcessing: r.workObjAgeForPrioritizedProcessing,
635+
}
733636

734-
// Create the event handler that uses the priority queue
735-
eventHandler := &PriorityQueueEventHandler{
736-
Queue: queue, // Attach the priority queue to the event handler
737-
Client: r.hubClient,
738-
}
637+
newPQ := func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
638+
withRateLimiterOpt := func(opts *priorityqueue.Opts[reconcile.Request]) {
639+
opts.RateLimiter = rateLimiter
640+
}
641+
r.pqSetupOnce.Do(func() {
642+
r.pq = priorityqueue.New(controllerName, withRateLimiterOpt)
643+
})
644+
return r.pq
645+
}
739646

740-
if r.watchWorkWithPriorityQueue {
741-
workAgeToReconcile = time.Duration(r.watchWorkReconcileAgeMinutes) * time.Minute
742647
return ctrl.NewControllerManagedBy(mgr).Named("work-applier-controller").
743648
WithOptions(ctrloption.Options{
744649
MaxConcurrentReconciles: r.concurrentReconciles,
650+
NewQueue: newPQ,
745651
}).
746652
For(&fleetv1beta1.Work{}).
653+
// Use custom event handler to allow access to the priority queue interface.
747654
Watches(&fleetv1beta1.Work{}, eventHandler).
748655
Complete(r)
749656
}

0 commit comments

Comments
 (0)