Skip to content

Commit 17c07aa

Browse files
authored
Merge pull request #6965 from ctripcloud/feat-enable-pq
enable asyncPriorityWorker in all controllers
2 parents cc53d86 + 003554e commit 17c07aa

File tree

18 files changed

+265
-153
lines changed

18 files changed

+265
-153
lines changed

pkg/clusterdiscovery/clusterapi/clusterapi.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
secretutil "sigs.k8s.io/cluster-api/util/secret"
3535
"sigs.k8s.io/controller-runtime/pkg/client"
3636

37+
"github.com/karmada-io/karmada/pkg/features"
3738
"github.com/karmada-io/karmada/pkg/karmadactl/join"
3839
"github.com/karmada-io/karmada/pkg/karmadactl/options"
3940
"github.com/karmada-io/karmada/pkg/karmadactl/unjoin"
@@ -62,7 +63,7 @@ type ClusterDetector struct {
6263
ClusterAPIClient client.Client
6364
InformerManager genericmanager.SingleClusterInformerManager
6465
EventHandler cache.ResourceEventHandler
65-
Processor util.AsyncWorker
66+
Processor util.AsyncPriorityWorker
6667
ConcurrentReconciles int
6768
}
6869

@@ -72,9 +73,10 @@ func (d *ClusterDetector) Start(ctx context.Context) error {
7273

7374
d.EventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
7475
workerOptions := util.Options{
75-
Name: "cluster-api cluster detector",
76-
KeyFunc: ClusterWideKeyFunc,
77-
ReconcileFunc: d.Reconcile,
76+
Name: "cluster-api cluster detector",
77+
KeyFunc: ClusterWideKeyFunc,
78+
ReconcileFunc: d.Reconcile,
79+
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
7880
}
7981
d.Processor = util.NewAsyncWorker(workerOptions)
8082
d.Processor.Run(ctx, d.ConcurrentReconciles)
@@ -97,17 +99,18 @@ func (d *ClusterDetector) discoveryCluster() {
9799
}
98100

99101
// OnAdd handles object add event and push the object to queue.
100-
func (d *ClusterDetector) OnAdd(obj interface{}) {
102+
func (d *ClusterDetector) OnAdd(obj interface{}, isInInitialList bool) {
101103
runtimeObj, ok := obj.(runtime.Object)
102104
if !ok {
103105
return
104106
}
105-
d.Processor.Enqueue(runtimeObj)
107+
priority := util.ItemPriorityIfInInitialList(isInInitialList)
108+
d.Processor.EnqueueWithOpts(util.AddOpts{Priority: priority}, runtimeObj)
106109
}
107110

108111
// OnUpdate handles object update event and push the object to queue.
109112
func (d *ClusterDetector) OnUpdate(_, newObj interface{}) {
110-
d.OnAdd(newObj)
113+
d.OnAdd(newObj, false)
111114
}
112115

113116
// OnDelete handles object delete event and push the object to queue.
@@ -119,7 +122,7 @@ func (d *ClusterDetector) OnDelete(obj interface{}) {
119122
return
120123
}
121124
}
122-
d.OnAdd(obj)
125+
d.OnAdd(obj, false)
123126
}
124127

125128
// Reconcile performs a full reconciliation for the object referred to by the key.

pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"sigs.k8s.io/controller-runtime/pkg/builder"
2727
"sigs.k8s.io/controller-runtime/pkg/controller"
2828

29+
"github.com/karmada-io/karmada/pkg/features"
2930
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
3031
"github.com/karmada-io/karmada/pkg/util"
3132
)
@@ -42,15 +43,16 @@ type HpaScaleTargetMarker struct {
4243
DynamicClient dynamic.Interface
4344
RESTMapper meta.RESTMapper
4445

45-
scaleTargetWorker util.AsyncWorker
46+
scaleTargetWorker util.AsyncPriorityWorker
4647
RateLimiterOptions ratelimiterflag.Options
4748
}
4849

4950
// SetupWithManager creates a controller and register to controller manager.
5051
func (r *HpaScaleTargetMarker) SetupWithManager(mgr controllerruntime.Manager) error {
5152
scaleTargetWorkerOptions := util.Options{
52-
Name: "scale target worker",
53-
ReconcileFunc: r.reconcileScaleRef,
53+
Name: "scale target worker",
54+
ReconcileFunc: r.reconcileScaleRef,
55+
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
5456
}
5557
r.scaleTargetWorker = util.NewAsyncWorker(scaleTargetWorkerOptions)
5658
r.scaleTargetWorker.Run(context.Background(), scaleTargetWorkerNum)

pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_predicate.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sigs.k8s.io/controller-runtime/pkg/predicate"
2626

2727
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
28+
"github.com/karmada-io/karmada/pkg/util"
2829
)
2930

3031
var _ predicate.Predicate = &HpaScaleTargetMarker{}
@@ -40,7 +41,8 @@ func (r *HpaScaleTargetMarker) Create(e event.CreateEvent) bool {
4041

4142
// if hpa exist and has been propagated, add label to its scale ref resource
4243
if hasBeenPropagated(hpa) {
43-
r.scaleTargetWorker.Add(labelEvent{addLabelEvent, hpa})
44+
priority := util.ItemPriorityIfInInitialList(e.IsInInitialList)
45+
r.scaleTargetWorker.AddWithOpts(util.AddOpts{Priority: priority}, labelEvent{addLabelEvent, hpa})
4446
}
4547

4648
return false

pkg/controllers/mcs/service_export_controller.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
5151
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
5252
"github.com/karmada-io/karmada/pkg/controllers/ctrlutil"
53+
"github.com/karmada-io/karmada/pkg/features"
5354
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
5455
"github.com/karmada-io/karmada/pkg/util"
5556
"github.com/karmada-io/karmada/pkg/util/fedinformer"
@@ -84,7 +85,7 @@ type ServiceExportController struct {
8485
// "member1": instance of ResourceEventHandler
8586
eventHandlers sync.Map
8687
// worker process resources periodic from rateLimitingQueue.
87-
worker util.AsyncWorker
88+
worker util.AsyncPriorityWorker
8889
RateLimiterOptions ratelimiterflag.Options
8990

9091
// epsWorkListFunc is used to mock the work list provider to return a specific work list,
@@ -161,9 +162,10 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager
161162
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
162163
func (c *ServiceExportController) RunWorkQueue() {
163164
workerOptions := util.Options{
164-
Name: "service-export",
165-
KeyFunc: nil,
166-
ReconcileFunc: c.syncServiceExportOrEndpointSlice,
165+
Name: "service-export",
166+
KeyFunc: nil,
167+
ReconcileFunc: c.syncServiceExportOrEndpointSlice,
168+
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
167169
}
168170
c.worker = util.NewAsyncWorker(workerOptions)
169171
c.worker.Run(c.Context, c.WorkerNumber)
@@ -318,15 +320,16 @@ func (c *ServiceExportController) getEventHandler(clusterName string) cache.Reso
318320
return eventHandler
319321
}
320322

321-
func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj interface{}) {
322-
return func(obj interface{}) {
323+
func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj interface{}, isInInitialList bool) {
324+
return func(obj interface{}, isInInitialList bool) {
323325
curObj := obj.(runtime.Object)
324326
key, err := keys.FederatedKeyFunc(clusterName, curObj)
325327
if err != nil {
326328
klog.ErrorS(err, "Failed to generate key for obj", "gvk", curObj.GetObjectKind().GroupVersionKind())
327329
return
328330
}
329-
c.worker.Add(key)
331+
priority := util.ItemPriorityIfInInitialList(isInInitialList)
332+
c.worker.AddWithOpts(util.AddOpts{Priority: priority}, key)
330333
}
331334
}
332335

pkg/controllers/multiclusterservice/endpointslice_collect_controller.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
4747
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
4848
"github.com/karmada-io/karmada/pkg/controllers/ctrlutil"
49+
"github.com/karmada-io/karmada/pkg/features"
4950
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
5051
"github.com/karmada-io/karmada/pkg/util"
5152
"github.com/karmada-io/karmada/pkg/util/fedinformer"
@@ -69,7 +70,7 @@ type EndpointSliceCollectController struct {
6970
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
7071
// "member1": instance of ResourceEventHandler
7172
eventHandlers sync.Map
72-
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
73+
worker util.AsyncPriorityWorker // worker process resources periodic from rateLimitingQueue.
7374

7475
ClusterCacheSyncTimeout metav1.Duration
7576
RateLimiterOptions ratelimiterflag.Options
@@ -133,9 +134,10 @@ func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.
133134
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
134135
func (c *EndpointSliceCollectController) RunWorkQueue() {
135136
workerOptions := util.Options{
136-
Name: "endpointslice-collect",
137-
KeyFunc: nil,
138-
ReconcileFunc: c.collectEndpointSlice,
137+
Name: "endpointslice-collect",
138+
KeyFunc: nil,
139+
ReconcileFunc: c.collectEndpointSlice,
140+
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
139141
}
140142
c.worker = util.NewAsyncWorker(workerOptions)
141143
c.worker.Run(c.Context, c.WorkerNumber)
@@ -242,15 +244,16 @@ func (c *EndpointSliceCollectController) getEventHandler(clusterName string) cac
242244
return eventHandler
243245
}
244246

245-
func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}) {
246-
return func(obj interface{}) {
247+
func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}, isInInitialList bool) {
248+
return func(obj interface{}, isInInitialList bool) {
247249
curObj := obj.(runtime.Object)
248250
key, err := keys.FederatedKeyFunc(clusterName, curObj)
249251
if err != nil {
250252
klog.ErrorS(err, "Failed to generate key for obj", "gvk", curObj.GetObjectKind().GroupVersionKind())
251253
return
252254
}
253-
c.worker.Add(key)
255+
priority := util.ItemPriorityIfInInitialList(isInInitialList)
256+
c.worker.AddWithOpts(util.AddOpts{Priority: priority}, key)
254257
}
255258
}
256259

pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestGetEventHandler(t *testing.T) {
7474
assert.True(t, exists, "Handler should be stored in eventHandlers")
7575
assert.Equal(t, handler, storedHandler, "Stored handler should match returned handler")
7676
if !tc.existingHandler {
77-
assert.IsType(t, &cache.ResourceEventHandlerFuncs{}, handler, "New handler should be of type *cache.ResourceEventHandlerFuncs")
77+
assert.IsType(t, &cache.ResourceEventHandlerDetailedFuncs{}, handler, "New handler should be of type *cache.ResourceEventHandlerDetailedFuncs")
7878
} else {
7979
assert.IsType(t, &mockResourceEventHandler{}, handler, "Existing handler should be of type *mockResourceEventHandler")
8080
}
@@ -92,7 +92,7 @@ func TestGenHandlerFuncs(t *testing.T) {
9292
worker: mockWorker,
9393
}
9494
addFunc := controller.genHandlerAddFunc(clusterName)
95-
addFunc(testObj)
95+
addFunc(testObj, false)
9696
assert.Equal(t, 1, mockWorker.addCount, "Add function should be called once")
9797
})
9898

@@ -448,6 +448,15 @@ func (m *mockAsyncWorker) AddAfter(_ interface{}, _ time.Duration) {}
448448

449449
func (m *mockAsyncWorker) Enqueue(_ interface{}) {}
450450

451+
func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, items ...any) {
452+
for _, item := range items {
453+
m.Add(item)
454+
}
455+
}
456+
func (m *mockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) {
457+
m.Enqueue(item)
458+
}
459+
451460
func (m *mockAsyncWorker) Run(_ context.Context, _ int) {}
452461

453462
type mockResourceEventHandler struct{}

pkg/controllers/status/work_status_controller.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/apimachinery/pkg/api/meta"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29+
"k8s.io/apimachinery/pkg/runtime"
2930
"k8s.io/apimachinery/pkg/runtime/schema"
3031
"k8s.io/client-go/tools/cache"
3132
"k8s.io/client-go/tools/record"
@@ -42,6 +43,7 @@ import (
4243
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
4344
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
4445
"github.com/karmada-io/karmada/pkg/events"
46+
"github.com/karmada-io/karmada/pkg/features"
4547
"github.com/karmada-io/karmada/pkg/metrics"
4648
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
4749
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
@@ -66,7 +68,7 @@ type WorkStatusController struct {
6668
InformerManager genericmanager.MultiClusterInformerManager
6769
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
6870
Context context.Context
69-
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
71+
worker util.AsyncPriorityWorker // worker process resources periodic from rateLimitingQueue.
7072
// ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently.
7173
ConcurrentWorkStatusSyncs int
7274
ObjectWatcher objectwatcher.ObjectWatcher
@@ -137,17 +139,18 @@ func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.C
137139
// getEventHandler return callback function that knows how to handle events from the member cluster.
138140
func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {
139141
if c.eventHandler == nil {
140-
c.eventHandler = fedinformer.NewHandlerOnAllEvents(c.worker.Enqueue)
142+
c.eventHandler = fedinformer.NewHandlerOnEvents(c.onAdd, c.onUpdate, c.onDelete)
141143
}
142144
return c.eventHandler
143145
}
144146

145147
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
146148
func (c *WorkStatusController) RunWorkQueue() {
147149
workerOptions := util.Options{
148-
Name: "work-status",
149-
KeyFunc: generateKey,
150-
ReconcileFunc: c.syncWorkStatus,
150+
Name: "work-status",
151+
KeyFunc: generateKey,
152+
ReconcileFunc: c.syncWorkStatus,
153+
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
151154
}
152155
c.worker = util.NewAsyncWorker(workerOptions)
153156
c.worker.Run(c.Context, c.ConcurrentWorkStatusSyncs)
@@ -565,3 +568,28 @@ func (c *WorkStatusController) eventf(object *unstructured.Unstructured, eventTy
565568
}
566569
c.EventRecorder.Eventf(ref, eventType, reason, messageFmt, args...)
567570
}
571+
572+
func (c *WorkStatusController) onAdd(obj any, isInInitialList bool) {
573+
curObj := obj.(runtime.Object)
574+
priority := util.ItemPriorityIfInInitialList(isInInitialList)
575+
c.worker.EnqueueWithOpts(util.AddOpts{Priority: priority}, curObj)
576+
}
577+
578+
func (c *WorkStatusController) onUpdate(old, cur any) {
579+
curObj := cur.(runtime.Object)
580+
if !reflect.DeepEqual(old, cur) {
581+
c.worker.Enqueue(curObj)
582+
}
583+
}
584+
585+
func (c *WorkStatusController) onDelete(old any) {
586+
if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok {
587+
// This object might be stale but ok for our current usage.
588+
old = deleted.Obj
589+
if old == nil {
590+
return
591+
}
592+
}
593+
oldObj := old.(runtime.Object)
594+
c.worker.Enqueue(oldObj)
595+
}

0 commit comments

Comments
 (0)