Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions pkg/clusterdiscovery/clusterapi/clusterapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
secretutil "sigs.k8s.io/cluster-api/util/secret"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/karmadactl/join"
"github.com/karmada-io/karmada/pkg/karmadactl/options"
"github.com/karmada-io/karmada/pkg/karmadactl/unjoin"
Expand Down Expand Up @@ -62,7 +63,7 @@ type ClusterDetector struct {
ClusterAPIClient client.Client
InformerManager genericmanager.SingleClusterInformerManager
EventHandler cache.ResourceEventHandler
Processor util.AsyncWorker
Processor util.AsyncPriorityWorker
ConcurrentReconciles int
}

Expand All @@ -72,9 +73,10 @@ func (d *ClusterDetector) Start(ctx context.Context) error {

d.EventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
workerOptions := util.Options{
Name: "cluster-api cluster detector",
KeyFunc: ClusterWideKeyFunc,
ReconcileFunc: d.Reconcile,
Name: "cluster-api cluster detector",
KeyFunc: ClusterWideKeyFunc,
ReconcileFunc: d.Reconcile,
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
}
d.Processor = util.NewAsyncWorker(workerOptions)
d.Processor.Run(ctx, d.ConcurrentReconciles)
Expand All @@ -97,17 +99,18 @@ func (d *ClusterDetector) discoveryCluster() {
}

// OnAdd handles object add event and push the object to queue.
func (d *ClusterDetector) OnAdd(obj interface{}) {
func (d *ClusterDetector) OnAdd(obj interface{}, isInInitialList bool) {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
return
}
d.Processor.Enqueue(runtimeObj)
priority := util.ItemPriorityIfInInitialList(isInInitialList)
d.Processor.EnqueueWithOpts(util.AddOpts{Priority: priority}, runtimeObj)
}

// OnUpdate handles object update event and push the object to queue.
func (d *ClusterDetector) OnUpdate(_, newObj interface{}) {
d.OnAdd(newObj)
d.OnAdd(newObj, false)
}

// OnDelete handles object delete event and push the object to queue.
Expand All @@ -119,7 +122,7 @@ func (d *ClusterDetector) OnDelete(obj interface{}) {
return
}
}
d.OnAdd(obj)
d.OnAdd(obj, false)
}

// Reconcile performs a full reconciliation for the object referred to by the key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
)
Expand All @@ -42,15 +43,16 @@ type HpaScaleTargetMarker struct {
DynamicClient dynamic.Interface
RESTMapper meta.RESTMapper

scaleTargetWorker util.AsyncWorker
scaleTargetWorker util.AsyncPriorityWorker
RateLimiterOptions ratelimiterflag.Options
}

// SetupWithManager creates a controller and register to controller manager.
func (r *HpaScaleTargetMarker) SetupWithManager(mgr controllerruntime.Manager) error {
scaleTargetWorkerOptions := util.Options{
Name: "scale target worker",
ReconcileFunc: r.reconcileScaleRef,
Name: "scale target worker",
ReconcileFunc: r.reconcileScaleRef,
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
}
r.scaleTargetWorker = util.NewAsyncWorker(scaleTargetWorkerOptions)
r.scaleTargetWorker.Run(context.Background(), scaleTargetWorkerNum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
)

var _ predicate.Predicate = &HpaScaleTargetMarker{}
Expand All @@ -40,7 +41,8 @@ func (r *HpaScaleTargetMarker) Create(e event.CreateEvent) bool {

// if hpa exist and has been propagated, add label to its scale ref resource
if hasBeenPropagated(hpa) {
r.scaleTargetWorker.Add(labelEvent{addLabelEvent, hpa})
priority := util.ItemPriorityIfInInitialList(e.IsInInitialList)
r.scaleTargetWorker.AddWithOpts(util.AddOpts{Priority: priority}, labelEvent{addLabelEvent, hpa})
}

return false
Expand Down
17 changes: 10 additions & 7 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/controllers/ctrlutil"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
Expand Down Expand Up @@ -84,7 +85,7 @@ type ServiceExportController struct {
// "member1": instance of ResourceEventHandler
eventHandlers sync.Map
// worker process resources periodic from rateLimitingQueue.
worker util.AsyncWorker
worker util.AsyncPriorityWorker
RateLimiterOptions ratelimiterflag.Options

// epsWorkListFunc is used to mock the work list provider to return a specific work list,
Expand Down Expand Up @@ -161,9 +162,10 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *ServiceExportController) RunWorkQueue() {
workerOptions := util.Options{
Name: "service-export",
KeyFunc: nil,
ReconcileFunc: c.syncServiceExportOrEndpointSlice,
Name: "service-export",
KeyFunc: nil,
ReconcileFunc: c.syncServiceExportOrEndpointSlice,
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker.Run(c.Context, c.WorkerNumber)
Expand Down Expand Up @@ -318,15 +320,16 @@ func (c *ServiceExportController) getEventHandler(clusterName string) cache.Reso
return eventHandler
}

func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj interface{}) {
return func(obj interface{}) {
func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj interface{}, isInInitialList bool) {
return func(obj interface{}, isInInitialList bool) {
curObj := obj.(runtime.Object)
key, err := keys.FederatedKeyFunc(clusterName, curObj)
if err != nil {
klog.ErrorS(err, "Failed to generate key for obj", "gvk", curObj.GetObjectKind().GroupVersionKind())
return
}
c.worker.Add(key)
priority := util.ItemPriorityIfInInitialList(isInInitialList)
c.worker.AddWithOpts(util.AddOpts{Priority: priority}, key)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/controllers/ctrlutil"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
Expand All @@ -69,7 +70,7 @@ type EndpointSliceCollectController struct {
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
eventHandlers sync.Map
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
worker util.AsyncPriorityWorker // worker process resources periodic from rateLimitingQueue.

ClusterCacheSyncTimeout metav1.Duration
RateLimiterOptions ratelimiterflag.Options
Expand Down Expand Up @@ -133,9 +134,10 @@ func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *EndpointSliceCollectController) RunWorkQueue() {
workerOptions := util.Options{
Name: "endpointslice-collect",
KeyFunc: nil,
ReconcileFunc: c.collectEndpointSlice,
Name: "endpointslice-collect",
KeyFunc: nil,
ReconcileFunc: c.collectEndpointSlice,
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker.Run(c.Context, c.WorkerNumber)
Expand Down Expand Up @@ -242,15 +244,16 @@ func (c *EndpointSliceCollectController) getEventHandler(clusterName string) cac
return eventHandler
}

func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}) {
return func(obj interface{}) {
func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}, isInInitialList bool) {
return func(obj interface{}, isInInitialList bool) {
curObj := obj.(runtime.Object)
key, err := keys.FederatedKeyFunc(clusterName, curObj)
if err != nil {
klog.ErrorS(err, "Failed to generate key for obj", "gvk", curObj.GetObjectKind().GroupVersionKind())
return
}
c.worker.Add(key)
priority := util.ItemPriorityIfInInitialList(isInInitialList)
c.worker.AddWithOpts(util.AddOpts{Priority: priority}, key)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestGetEventHandler(t *testing.T) {
assert.True(t, exists, "Handler should be stored in eventHandlers")
assert.Equal(t, handler, storedHandler, "Stored handler should match returned handler")
if !tc.existingHandler {
assert.IsType(t, &cache.ResourceEventHandlerFuncs{}, handler, "New handler should be of type *cache.ResourceEventHandlerFuncs")
assert.IsType(t, &cache.ResourceEventHandlerDetailedFuncs{}, handler, "New handler should be of type *cache.ResourceEventHandlerDetailedFuncs")
} else {
assert.IsType(t, &mockResourceEventHandler{}, handler, "Existing handler should be of type *mockResourceEventHandler")
}
Expand All @@ -92,7 +92,7 @@ func TestGenHandlerFuncs(t *testing.T) {
worker: mockWorker,
}
addFunc := controller.genHandlerAddFunc(clusterName)
addFunc(testObj)
addFunc(testObj, false)
assert.Equal(t, 1, mockWorker.addCount, "Add function should be called once")
})

Expand Down Expand Up @@ -448,6 +448,15 @@ func (m *mockAsyncWorker) AddAfter(_ interface{}, _ time.Duration) {}

func (m *mockAsyncWorker) Enqueue(_ interface{}) {}

func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, items ...any) {
for _, item := range items {
m.Add(item)
}
}
func (m *mockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) {
m.Enqueue(item)
}

func (m *mockAsyncWorker) Run(_ context.Context, _ int) {}

type mockResourceEventHandler struct{}
Expand Down
38 changes: 33 additions & 5 deletions pkg/controllers/status/work_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand All @@ -42,6 +43,7 @@ import (
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
Expand All @@ -66,7 +68,7 @@ type WorkStatusController struct {
InformerManager genericmanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
Context context.Context
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
worker util.AsyncPriorityWorker // worker process resources periodic from rateLimitingQueue.
// ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently.
ConcurrentWorkStatusSyncs int
ObjectWatcher objectwatcher.ObjectWatcher
Expand Down Expand Up @@ -137,17 +139,18 @@ func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.C
// getEventHandler return callback function that knows how to handle events from the member cluster.
func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {
if c.eventHandler == nil {
c.eventHandler = fedinformer.NewHandlerOnAllEvents(c.worker.Enqueue)
c.eventHandler = fedinformer.NewHandlerOnEvents(c.onAdd, c.onUpdate, c.onDelete)
}
return c.eventHandler
}

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *WorkStatusController) RunWorkQueue() {
workerOptions := util.Options{
Name: "work-status",
KeyFunc: generateKey,
ReconcileFunc: c.syncWorkStatus,
Name: "work-status",
KeyFunc: generateKey,
ReconcileFunc: c.syncWorkStatus,
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker.Run(c.Context, c.ConcurrentWorkStatusSyncs)
Expand Down Expand Up @@ -565,3 +568,28 @@ func (c *WorkStatusController) eventf(object *unstructured.Unstructured, eventTy
}
c.EventRecorder.Eventf(ref, eventType, reason, messageFmt, args...)
}

func (c *WorkStatusController) onAdd(obj any, isInInitialList bool) {
curObj := obj.(runtime.Object)
priority := util.ItemPriorityIfInInitialList(isInInitialList)
c.worker.EnqueueWithOpts(util.AddOpts{Priority: priority}, curObj)
}

func (c *WorkStatusController) onUpdate(old, cur any) {
curObj := cur.(runtime.Object)
if !reflect.DeepEqual(old, cur) {
c.worker.Enqueue(curObj)
}
}

func (c *WorkStatusController) onDelete(old any) {
if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok {
// This object might be stale but ok for our current usage.
old = deleted.Obj
if old == nil {
return
}
}
oldObj := old.(runtime.Object)
c.worker.Enqueue(oldObj)
}
Loading