Skip to content

Commit 678c8e2

Browse files
authored
Merge pull request #2106 from chrischdi/pr-capi-flags-concurrency
⚠️ Introduce concurrency flags per controller
2 parents 5adad8c + 8c866da commit 678c8e2

13 files changed

+75
-48
lines changed

controllers/controllers_suite_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/client-go/kubernetes/scheme"
3030
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
3131
ctrl "sigs.k8s.io/controller-runtime"
32+
"sigs.k8s.io/controller-runtime/pkg/controller"
3233

3334
infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
3435
vmwarev1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/vmware/v1beta1"
@@ -62,25 +63,27 @@ func setup() {
6263

6364
testEnv = helpers.NewTestEnvironment()
6465

65-
if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}); err != nil {
66+
controllerOpts := controller.Options{MaxConcurrentReconciles: 10}
67+
68+
if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}, controllerOpts); err != nil {
6669
panic(fmt.Sprintf("unable to setup VsphereCluster controller: %v", err))
6770
}
68-
if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}); err != nil {
71+
if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}, controllerOpts); err != nil {
6972
panic(fmt.Sprintf("unable to setup VsphereMachine controller: %v", err))
7073
}
71-
if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
74+
if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
7275
panic(fmt.Sprintf("unable to setup VsphereVM controller: %v", err))
7376
}
74-
if err := AddVsphereClusterIdentityControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
77+
if err := AddVsphereClusterIdentityControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
7578
panic(fmt.Sprintf("unable to setup VSphereClusterIdentity controller: %v", err))
7679
}
77-
if err := AddVSphereDeploymentZoneControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
80+
if err := AddVSphereDeploymentZoneControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
7881
panic(fmt.Sprintf("unable to setup VSphereDeploymentZone controller: %v", err))
7982
}
80-
if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
83+
if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
8184
panic(fmt.Sprintf("unable to setup ServiceAccount controller: %v", err))
8285
}
83-
if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
86+
if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
8487
panic(fmt.Sprintf("unable to setup SvcDiscovery controller: %v", err))
8588
}
8689

controllers/serviceaccount_controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"sigs.k8s.io/cluster-api/util/patch"
3939
ctrl "sigs.k8s.io/controller-runtime"
4040
"sigs.k8s.io/controller-runtime/pkg/client"
41+
"sigs.k8s.io/controller-runtime/pkg/controller"
4142
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
4243
"sigs.k8s.io/controller-runtime/pkg/handler"
4344
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -64,7 +65,7 @@ const (
6465
)
6566

6667
// AddServiceAccountProviderControllerToManager adds this controller to the provided manager.
67-
func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error {
68+
func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
6869
var (
6970
controlledType = &vmwarev1.ProviderServiceAccount{}
7071
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
@@ -87,6 +88,7 @@ func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManager
8788
clusterToInfraFn := clusterToSupervisorInfrastructureMapFunc(ctx)
8889

8990
return ctrl.NewControllerManagedBy(mgr).For(controlledType).
91+
WithOptions(options).
9092
// Watch a ProviderServiceAccount
9193
Watches(
9294
&vmwarev1.ProviderServiceAccount{},

controllers/servicediscovery_controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
ctrl "sigs.k8s.io/controller-runtime"
4141
"sigs.k8s.io/controller-runtime/pkg/cache"
4242
"sigs.k8s.io/controller-runtime/pkg/client"
43+
"sigs.k8s.io/controller-runtime/pkg/controller"
4344
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
4445
"sigs.k8s.io/controller-runtime/pkg/handler"
4546
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -70,7 +71,7 @@ const (
7071
// +kubebuilder:rbac:groups="",resources=configmaps/status,verbs=get
7172

7273
// AddServiceDiscoveryControllerToManager adds the ServiceDiscovery controller to the provided manager.
73-
func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error {
74+
func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
7475
var (
7576
controllerNameShort = ServiceDiscoveryControllerName
7677
controllerNameLong = fmt.Sprintf("%s/%s/%s", ctx.Namespace, ctx.Name, ServiceDiscoveryControllerName)
@@ -102,6 +103,7 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex
102103
src := source.Kind(configMapCache, &corev1.ConfigMap{})
103104

104105
return ctrl.NewControllerManagedBy(mgr).For(&vmwarev1.VSphereCluster{}).
106+
WithOptions(options).
105107
Watches(
106108
&corev1.Service{},
107109
handler.EnqueueRequestsFromMapFunc(r.serviceToClusters),

controllers/vmware/test/controllers_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
3434
"sigs.k8s.io/controller-runtime/pkg/cache"
3535
"sigs.k8s.io/controller-runtime/pkg/client"
36+
"sigs.k8s.io/controller-runtime/pkg/controller"
3637
ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
3738

3839
infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
@@ -232,12 +233,14 @@ func getManager(cfg *rest.Config, networkProvider string) manager.Manager {
232233
CredentialsFile: tmpFile.Name(),
233234
}
234235

236+
controllerOpts := controller.Options{MaxConcurrentReconciles: 10}
237+
235238
opts.AddToManager = func(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error {
236-
if err := controllers.AddClusterControllerToManager(ctx, mgr, &vmwarev1.VSphereCluster{}); err != nil {
239+
if err := controllers.AddClusterControllerToManager(ctx, mgr, &vmwarev1.VSphereCluster{}, controllerOpts); err != nil {
237240
return err
238241
}
239242

240-
return controllers.AddMachineControllerToManager(ctx, mgr, &vmwarev1.VSphereMachine{})
243+
return controllers.AddMachineControllerToManager(ctx, mgr, &vmwarev1.VSphereMachine{}, controllerOpts)
241244
}
242245

243246
mgr, err := manager.New(opts)

controllers/vspherecluster_controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ import (
5757

5858
// AddClusterControllerToManager adds the cluster controller to the provided
5959
// manager.
60-
func AddClusterControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, clusterControlledType client.Object) error {
60+
func AddClusterControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, clusterControlledType client.Object, options controller.Options) error {
6161
supervisorBased, err := util.IsSupervisorType(clusterControlledType)
6262
if err != nil {
6363
return err
@@ -97,11 +97,11 @@ func AddClusterControllerToManager(ctx *context.ControllerManagerContext, mgr ma
9797
return ctrl.NewControllerManagedBy(mgr).
9898
Named(controllerNameShort).
9999
For(clusterControlledType).
100+
WithOptions(options).
100101
Watches(
101102
&vmwarev1.VSphereMachine{},
102103
handler.EnqueueRequestsFromMapFunc(reconciler.VSphereMachineToCluster),
103104
).
104-
WithOptions(controller.Options{MaxConcurrentReconciles: ctx.MaxConcurrentReconciles}).
105105
Complete(reconciler)
106106
}
107107

@@ -113,6 +113,7 @@ func AddClusterControllerToManager(ctx *context.ControllerManagerContext, mgr ma
113113
c, err := ctrl.NewControllerManagedBy(mgr).
114114
// Watch the controlled, infrastructure resource.
115115
For(clusterControlledType).
116+
WithOptions(options).
116117
// Watch the CAPI resource that owns this infrastructure resource.
117118
Watches(
118119
&clusterv1.Cluster{},
@@ -159,7 +160,6 @@ func AddClusterControllerToManager(ctx *context.ControllerManagerContext, mgr ma
159160
&handler.EnqueueRequestForObject{},
160161
).
161162
WithEventFilter(predicates.ResourceIsNotExternallyManaged(reconciler.Logger)).
162-
WithOptions(controller.Options{MaxConcurrentReconciles: ctx.MaxConcurrentReconciles}).
163163
Build(reconciler)
164164
if err != nil {
165165
return err

controllers/vsphereclusteridentity_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ var (
5252
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=vsphereclusteridentities/status,verbs=get;update;patch
5353
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;patch;update;delete
5454

55-
func AddVsphereClusterIdentityControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error {
55+
func AddVsphereClusterIdentityControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
5656
var (
5757
controllerNameShort = fmt.Sprintf("%s-controller", strings.ToLower(identityControlledTypeName))
5858
controllerNameLong = fmt.Sprintf("%s/%s/%s", ctx.Namespace, ctx.Name, controllerNameShort)
@@ -70,7 +70,7 @@ func AddVsphereClusterIdentityControllerToManager(ctx *context.ControllerManager
7070

7171
return ctrl.NewControllerManagedBy(mgr).
7272
For(identityControlledType).
73-
WithOptions(controller.Options{MaxConcurrentReconciles: ctx.MaxConcurrentReconciles}).
73+
WithOptions(options).
7474
Complete(reconciler)
7575
}
7676

controllers/vspheredeploymentzone_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ import (
5454
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=vspherefailuredomains,verbs=get;list;watch;create;update;patch;delete
5555

5656
// AddVSphereDeploymentZoneControllerToManager adds the VSphereDeploymentZone controller to the provided manager.
57-
func AddVSphereDeploymentZoneControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error {
57+
func AddVSphereDeploymentZoneControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
5858
var (
5959
controlledType = &infrav1.VSphereDeploymentZone{}
6060
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
@@ -76,6 +76,7 @@ func AddVSphereDeploymentZoneControllerToManager(ctx *context.ControllerManagerC
7676
return ctrl.NewControllerManagedBy(mgr).
7777
// Watch the controlled, infrastructure resource.
7878
For(controlledType).
79+
WithOptions(options).
7980
Watches(
8081
&infrav1.VSphereFailureDomain{},
8182
handler.EnqueueRequestsFromMapFunc(reconciler.failureDomainsToDeploymentZones)).
@@ -87,7 +88,6 @@ func AddVSphereDeploymentZoneControllerToManager(ctx *context.ControllerManagerC
8788
&source.Channel{Source: ctx.GetGenericEventChannelFor(controlledTypeGVK)},
8889
&handler.EnqueueRequestForObject{},
8990
).
90-
WithOptions(controller.Options{MaxConcurrentReconciles: ctx.MaxConcurrentReconciles}).
9191
Complete(reconciler)
9292
}
9393

controllers/vspheremachine_controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ const hostInfoErrStr = "host info cannot be used as a label value"
7878
// AddMachineControllerToManager adds the machine controller to the provided
7979
// manager.
8080

81-
func AddMachineControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, controlledType client.Object) error {
81+
func AddMachineControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, controlledType client.Object, options controller.Options) error {
8282
supervisorBased, err := util.IsSupervisorType(controlledType)
8383
if err != nil {
8484
return err
@@ -108,6 +108,7 @@ func AddMachineControllerToManager(ctx *context.ControllerManagerContext, mgr ma
108108
builder := ctrl.NewControllerManagedBy(mgr).
109109
// Watch the controlled, infrastructure resource.
110110
For(controlledType).
111+
WithOptions(options).
111112
// Watch the CAPI resource that owns this infrastructure resource.
112113
Watches(
113114
&clusterv1.Machine{},
@@ -121,8 +122,7 @@ func AddMachineControllerToManager(ctx *context.ControllerManagerContext, mgr ma
121122
WatchesRawSource(
122123
&source.Channel{Source: ctx.GetGenericEventChannelFor(controlledTypeGVK)},
123124
&handler.EnqueueRequestForObject{},
124-
).
125-
WithOptions(controller.Options{MaxConcurrentReconciles: ctx.MaxConcurrentReconciles})
125+
)
126126

127127
r := &machineReconciler{
128128
ControllerContext: controllerContext,

controllers/vspherevm_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ import (
6969
// AddVMControllerToManager adds the VM controller to the provided manager.
7070
//
7171
//nolint:forcetypeassert
72-
func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error {
72+
func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
7373
var (
7474
controlledType = &infrav1.VSphereVM{}
7575
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
@@ -93,6 +93,7 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager
9393
controller, err := ctrl.NewControllerManagedBy(mgr).
9494
// Watch the controlled, infrastructure resource.
9595
For(controlledType).
96+
WithOptions(options).
9697
// Watch a GenericEvent channel for the controlled resource.
9798
//
9899
// This is useful when there are events outside of Kubernetes that
@@ -102,7 +103,6 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager
102103
&source.Channel{Source: ctx.GetGenericEventChannelFor(controlledTypeGVK)},
103104
&handler.EnqueueRequestForObject{},
104105
).
105-
WithOptions(controller.Options{MaxConcurrentReconciles: ctx.MaxConcurrentReconciles}).
106106
Build(r)
107107
if err != nil {
108108
return err

main.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"sigs.k8s.io/cluster-api/controllers/remote"
3939
"sigs.k8s.io/cluster-api/util/flags"
4040
ctrl "sigs.k8s.io/controller-runtime"
41+
"sigs.k8s.io/controller-runtime/pkg/controller"
4142
ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
4243
ctrlsig "sigs.k8s.io/controller-runtime/pkg/manager/signals"
4344
"sigs.k8s.io/controller-runtime/pkg/webhook"
@@ -70,6 +71,14 @@ var (
7071
webhookOpts webhook.Options
7172
watchNamespace string
7273

74+
vSphereClusterConcurrency int
75+
vSphereMachineConcurrency int
76+
providerServiceAccountConcurrency int
77+
serviceDiscoveryConcurrency int
78+
vSphereVMConcurrency int
79+
vSphereClusterIdentityConcurrency int
80+
vSphereDeploymentZoneConcurrency int
81+
7382
tlsOptions = flags.TLSOptions{}
7483

7584
defaultProfilerAddr = os.Getenv("PROFILER_ADDR")
@@ -91,11 +100,26 @@ func InitFlags(fs *pflag.FlagSet) {
91100
defaultLeaderElectionID,
92101
"Name of the config map to use as the locking resource when configuring leader election.")
93102

94-
fs.IntVar(
95-
&managerOpts.MaxConcurrentReconciles,
96-
"max-concurrent-reconciles",
97-
10,
98-
"The maximum number of allowed, concurrent reconciles.")
103+
fs.IntVar(&vSphereClusterConcurrency, "vspherecluster-concurrency", 10,
104+
"Number of vSphere clusters to process simultaneously")
105+
106+
fs.IntVar(&vSphereMachineConcurrency, "vspheremachine-concurrency", 10,
107+
"Number of vSphere machines to process simultaneously")
108+
109+
fs.IntVar(&providerServiceAccountConcurrency, "providerserviceaccount-concurrency", 10,
110+
"Number of provider service accounts to process simultaneously")
111+
112+
fs.IntVar(&serviceDiscoveryConcurrency, "servicediscovery-concurrency", 10,
113+
"Number of vSphere clusters for service discovery to process simultaneously")
114+
115+
fs.IntVar(&vSphereVMConcurrency, "vspherevm-concurrency", 10,
116+
"Number of vSphere vms to process simultaneously")
117+
118+
fs.IntVar(&vSphereClusterIdentityConcurrency, "vsphereclusteridentity-concurrency", 10,
119+
"Number of vSphere cluster identities to process simultaneously")
120+
121+
fs.IntVar(&vSphereDeploymentZoneConcurrency, "vspheredeploymentzone-concurrency", 10,
122+
"Number of vSphere deployment zones to process simultaneously")
99123

100124
fs.StringVar(
101125
&managerOpts.PodName,
@@ -315,36 +339,36 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man
315339
return err
316340
}
317341

318-
if err := controllers.AddClusterControllerToManager(ctx, mgr, &v1beta1.VSphereCluster{}); err != nil {
342+
if err := controllers.AddClusterControllerToManager(ctx, mgr, &v1beta1.VSphereCluster{}, concurrency(vSphereClusterConcurrency)); err != nil {
319343
return err
320344
}
321-
if err := controllers.AddMachineControllerToManager(ctx, mgr, &v1beta1.VSphereMachine{}); err != nil {
345+
if err := controllers.AddMachineControllerToManager(ctx, mgr, &v1beta1.VSphereMachine{}, concurrency(vSphereMachineConcurrency)); err != nil {
322346
return err
323347
}
324-
if err := controllers.AddVMControllerToManager(ctx, mgr); err != nil {
348+
if err := controllers.AddVMControllerToManager(ctx, mgr, concurrency(vSphereVMConcurrency)); err != nil {
325349
return err
326350
}
327-
if err := controllers.AddVsphereClusterIdentityControllerToManager(ctx, mgr); err != nil {
351+
if err := controllers.AddVsphereClusterIdentityControllerToManager(ctx, mgr, concurrency(vSphereClusterIdentityConcurrency)); err != nil {
328352
return err
329353
}
330354

331-
return controllers.AddVSphereDeploymentZoneControllerToManager(ctx, mgr)
355+
return controllers.AddVSphereDeploymentZoneControllerToManager(ctx, mgr, concurrency(vSphereDeploymentZoneConcurrency))
332356
}
333357

334358
func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error {
335-
if err := controllers.AddClusterControllerToManager(ctx, mgr, &vmwarev1b1.VSphereCluster{}); err != nil {
359+
if err := controllers.AddClusterControllerToManager(ctx, mgr, &vmwarev1b1.VSphereCluster{}, concurrency(vSphereClusterConcurrency)); err != nil {
336360
return err
337361
}
338362

339-
if err := controllers.AddMachineControllerToManager(ctx, mgr, &vmwarev1b1.VSphereMachine{}); err != nil {
363+
if err := controllers.AddMachineControllerToManager(ctx, mgr, &vmwarev1b1.VSphereMachine{}, concurrency(vSphereMachineConcurrency)); err != nil {
340364
return err
341365
}
342366

343-
if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr); err != nil {
367+
if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr, concurrency(providerServiceAccountConcurrency)); err != nil {
344368
return err
345369
}
346370

347-
return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr)
371+
return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr, concurrency(serviceDiscoveryConcurrency))
348372
}
349373

350374
func setupChecks(mgr ctrlmgr.Manager) {
@@ -377,3 +401,7 @@ func isCRDDeployed(mgr ctrlmgr.Manager, gvr schema.GroupVersionResource) (bool,
377401
}
378402
return true, nil
379403
}
404+
405+
func concurrency(c int) controller.Options {
406+
return controller.Options{MaxConcurrentReconciles: c}
407+
}

0 commit comments

Comments
 (0)