Skip to content

Commit b7bdcab

Browse files
authored
Merge pull request #151 from jokuniew/concurrency
feat: concurrency
2 parents f05ff4f + d6be3fe commit b7bdcab

File tree

5 files changed

+27
-9
lines changed

5 files changed

+27
-9
lines changed

bootstrap/controllers/kthreesconfig_controller.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"sigs.k8s.io/cluster-api/util/predicates"
4242
ctrl "sigs.k8s.io/controller-runtime"
4343
"sigs.k8s.io/controller-runtime/pkg/client"
44+
"sigs.k8s.io/controller-runtime/pkg/controller"
4445
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4546
kubeyaml "sigs.k8s.io/yaml"
4647

@@ -535,13 +536,16 @@ func (r *KThreesConfigReconciler) handleClusterNotInitialized(ctx context.Contex
535536
return ctrl.Result{}, nil
536537
}
537538

538-
func (r *KThreesConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
539+
func (r *KThreesConfigReconciler) SetupWithManager(mgr ctrl.Manager, concurrency int) error {
539540
if r.KThreesInitLock == nil {
540541
r.KThreesInitLock = locking.NewControlPlaneInitMutex(mgr.GetClient())
541542
}
542543

543544
return ctrl.NewControllerManagedBy(mgr).
544545
For(&bootstrapv1.KThreesConfig{}).
546+
WithOptions(controller.Options{
547+
MaxConcurrentReconciles: concurrency,
548+
}).
545549
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
546550
Complete(r)
547551
}

bootstrap/main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func main() {
5555
var metricsAddr string
5656
var enableLeaderElection bool
5757
var syncPeriod time.Duration
58+
var concurrencyNumber int
5859

5960
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
6061
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
@@ -64,6 +65,9 @@ func main() {
6465
flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute,
6566
"The minimum interval at which watched resources are reconciled (e.g. 15m)")
6667

68+
flag.IntVar(&concurrencyNumber, "concurrency", 1,
69+
"Number of core resources to process simultaneously")
70+
6771
flag.Parse()
6872

6973
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
@@ -91,7 +95,7 @@ func main() {
9195
Client: mgr.GetClient(),
9296
Log: ctrl.Log.WithName("controllers").WithName("KThreesConfig"),
9397
Scheme: mgr.GetScheme(),
94-
}).SetupWithManager(mgr); err != nil {
98+
}).SetupWithManager(mgr, concurrencyNumber); err != nil {
9599
setupLog.Error(err, "unable to create controller", "controller", "KThreesConfig")
96100
os.Exit(1)
97101
}
@@ -108,7 +112,7 @@ func main() {
108112
}
109113
// +kubebuilder:scaffold:builder
110114

111-
setupLog.Info("starting manager")
115+
setupLog.Info("Starting manager", "concurrency", concurrencyNumber)
112116
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
113117
setupLog.Error(err, "problem running manager")
114118
os.Exit(1)

controlplane/controllers/kthreescontrolplane_controller.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,11 +283,13 @@ func patchKThreesControlPlane(ctx context.Context, patchHelper *patch.Helper, kc
283283
)
284284
}
285285

286-
func (r *KThreesControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger) error {
286+
func (r *KThreesControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger, concurrency int) error {
287287
c, err := ctrl.NewControllerManagedBy(mgr).
288288
For(&controlplanev1.KThreesControlPlane{}).
289289
Owns(&clusterv1.Machine{}).
290-
// WithOptions(options).
290+
WithOptions(controller.Options{
291+
MaxConcurrentReconciles: concurrency,
292+
}).
291293
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
292294
Watches(
293295
&clusterv1.Cluster{},

controlplane/controllers/machine_controller.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"sigs.k8s.io/cluster-api/util/predicates"
2020
ctrl "sigs.k8s.io/controller-runtime"
2121
"sigs.k8s.io/controller-runtime/pkg/client"
22+
"sigs.k8s.io/controller-runtime/pkg/controller"
2223

2324
k3s "github.com/k3s-io/cluster-api-k3s/pkg/k3s"
2425
)
@@ -43,9 +44,12 @@ type MachineReconciler struct {
4344
managementClusterUncached k3s.ManagementCluster
4445
}
4546

46-
func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger) error {
47+
func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger, concurrency int) error {
4748
_, err := ctrl.NewControllerManagedBy(mgr).
4849
For(&clusterv1.Machine{}).
50+
WithOptions(controller.Options{
51+
MaxConcurrentReconciles: concurrency,
52+
}).
4953
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
5054
Build(r)
5155

controlplane/main.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func main() {
6363
var syncPeriod time.Duration
6464
var etcdDialTimeout time.Duration
6565
var etcdCallTimeout time.Duration
66+
var concurrencyNumber int
6667

6768
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
6869
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
@@ -78,6 +79,9 @@ func main() {
7879
flag.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", etcd.DefaultCallTimeout,
7980
"Duration that the etcd client waits at most for read and write operations to etcd.")
8081

82+
flag.IntVar(&concurrencyNumber, "concurrency", 1,
83+
"Number of core resources to process simultaneously")
84+
8185
flag.Parse()
8286

8387
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
@@ -110,7 +114,7 @@ func main() {
110114
Scheme: mgr.GetScheme(),
111115
EtcdDialTimeout: etcdDialTimeout,
112116
EtcdCallTimeout: etcdCallTimeout,
113-
}).SetupWithManager(ctx, mgr, &ctrPlaneLogger); err != nil {
117+
}).SetupWithManager(ctx, mgr, &ctrPlaneLogger, concurrencyNumber); err != nil {
114118
setupLog.Error(err, "unable to create controller", "controller", "KThreesControlPlane")
115119
os.Exit(1)
116120
}
@@ -122,7 +126,7 @@ func main() {
122126
Scheme: mgr.GetScheme(),
123127
EtcdDialTimeout: etcdDialTimeout,
124128
EtcdCallTimeout: etcdCallTimeout,
125-
}).SetupWithManager(ctx, mgr, &ctrMachineLogger); err != nil {
129+
}).SetupWithManager(ctx, mgr, &ctrMachineLogger, concurrencyNumber); err != nil {
126130
setupLog.Error(err, "unable to create controller", "controller", "Machine")
127131
os.Exit(1)
128132
}
@@ -135,7 +139,7 @@ func main() {
135139
}
136140
// +kubebuilder:scaffold:builder
137141

138-
setupLog.Info("starting manager")
142+
setupLog.Info("Starting manager", "concurrency", concurrencyNumber)
139143
if err := mgr.Start(ctx); err != nil {
140144
setupLog.Error(err, "problem running manager")
141145
os.Exit(1)

0 commit comments

Comments
 (0)