Skip to content

Commit 45d0205

Browse files
fix(test): address review feedback
- remove KCP watch - refactor reconciler logic into helpers - add exhaustive unit tests for helpers
1 parent 7217a84 commit 45d0205

File tree

2 files changed

+1031
-270
lines changed

2 files changed

+1031
-270
lines changed

pkg/controllers/failuredomainrollout/controller.go

Lines changed: 125 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,17 @@ import (
88
"fmt"
99
"time"
1010

11+
"github.com/go-logr/logr"
1112
corev1 "k8s.io/api/core/v1"
1213
apierrors "k8s.io/apimachinery/pkg/api/errors"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/apimachinery/pkg/types"
1516
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
1617
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
18+
"sigs.k8s.io/cluster-api/util/annotations"
1719
ctrl "sigs.k8s.io/controller-runtime"
18-
"sigs.k8s.io/controller-runtime/pkg/builder"
1920
"sigs.k8s.io/controller-runtime/pkg/client"
2021
"sigs.k8s.io/controller-runtime/pkg/controller"
21-
"sigs.k8s.io/controller-runtime/pkg/handler"
22-
"sigs.k8s.io/controller-runtime/pkg/predicate"
23-
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2422
)
2523

2624
type Reconciler struct {
@@ -33,49 +31,89 @@ func (r *Reconciler) SetupWithManager(
3331
) error {
3432
return ctrl.NewControllerManagedBy(mgr).
3533
For(&clusterv1.Cluster{}).
36-
Watches(
37-
&controlplanev1.KubeadmControlPlane{},
38-
handler.EnqueueRequestsFromMapFunc(r.kubeadmControlPlaneToCluster),
39-
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
40-
).
4134
WithOptions(*options).
4235
Complete(r)
4336
}
4437

45-
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
46-
logger := ctrl.LoggerFrom(ctx).WithValues("cluster", req.NamespacedName)
47-
logger.V(5).Info("Starting failure domain rollout reconciliation")
38+
// areResourcesDeleting checks if either the cluster or KCP has a deletion timestamp.
39+
// Returns true if any resource is being deleted and reconciliation should be skipped.
40+
func (r *Reconciler) areResourcesDeleting(cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) bool {
41+
if cluster != nil && !cluster.DeletionTimestamp.IsZero() {
42+
return true
43+
}
4844

49-
var cluster clusterv1.Cluster
50-
if err := r.Get(ctx, req.NamespacedName, &cluster); err != nil {
51-
if apierrors.IsNotFound(err) {
52-
logger.V(5).Info("Cluster not found, skipping reconciliation")
53-
return ctrl.Result{}, nil
54-
}
55-
return ctrl.Result{}, fmt.Errorf("failed to get Cluster %s: %w", req.NamespacedName, err)
45+
if kcp != nil && !kcp.DeletionTimestamp.IsZero() {
46+
return true
5647
}
5748

58-
// Early validation checks
49+
return false
50+
}
51+
52+
// areResourcesUpdating checks if either the cluster or KCP has not fully reconciled.
53+
// Returns true if any resource is still being updated and reconciliation should be requeued.
54+
func (r *Reconciler) areResourcesUpdating(cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) bool {
55+
if cluster != nil && cluster.Status.ObservedGeneration < cluster.Generation {
56+
return true
57+
}
58+
59+
if kcp != nil && kcp.Status.ObservedGeneration < kcp.Generation {
60+
return true
61+
}
62+
63+
return false
64+
}
65+
66+
// areResourcesPaused checks if either the cluster or KCP is paused.
67+
// Uses the standard CAPI annotations.IsPaused utility which handles both cluster.Spec.Paused and paused annotations.
68+
func (r *Reconciler) areResourcesPaused(cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) bool {
69+
if cluster != nil && annotations.IsPaused(cluster, cluster) {
70+
return true
71+
}
72+
73+
if cluster != nil && kcp != nil && annotations.IsPaused(cluster, kcp) {
74+
return true
75+
}
76+
77+
return false
78+
}
79+
80+
// shouldSkipClusterReconciliation checks if the cluster should be skipped for reconciliation
81+
// based on early validation checks. Returns true if reconciliation should be skipped.
82+
func (r *Reconciler) shouldSkipClusterReconciliation(cluster *clusterv1.Cluster, logger logr.Logger) bool {
5983
if cluster.Spec.Topology == nil {
6084
logger.V(5).Info("Cluster is not using topology, skipping reconciliation")
61-
return ctrl.Result{}, nil
85+
return true
6286
}
6387

6488
if cluster.Spec.ControlPlaneRef == nil {
6589
logger.V(5).Info("Cluster has no control plane reference, skipping reconciliation")
66-
return ctrl.Result{}, nil
90+
return true
6791
}
6892

6993
if len(cluster.Status.FailureDomains) == 0 {
7094
logger.V(5).Info("Cluster has no failure domains, skipping reconciliation")
71-
return ctrl.Result{}, nil
95+
return true
7296
}
7397

74-
// If the Cluster is not fully reconciled, we should skip our own reconciliation.
75-
if cluster.Status.ObservedGeneration < cluster.Generation {
76-
logger.V(5).Info("Cluster is not yet reconciled, skipping failure domain rollout check",
77-
"observedGeneration", cluster.Status.ObservedGeneration, "generation", cluster.Generation)
78-
return ctrl.Result{RequeueAfter: 2 * time.Minute}, nil
98+
return false
99+
}
100+
101+
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
102+
logger := ctrl.LoggerFrom(ctx).WithValues("cluster", req.NamespacedName)
103+
logger.V(5).Info("Starting failure domain rollout reconciliation")
104+
105+
var cluster clusterv1.Cluster
106+
if err := r.Get(ctx, req.NamespacedName, &cluster); err != nil {
107+
if apierrors.IsNotFound(err) {
108+
logger.V(5).Info("Cluster not found, skipping reconciliation")
109+
return ctrl.Result{}, nil
110+
}
111+
return ctrl.Result{}, fmt.Errorf("failed to get Cluster %s: %w", req.NamespacedName, err)
112+
}
113+
114+
// Early validation checks
115+
if r.shouldSkipClusterReconciliation(&cluster, logger) {
116+
return ctrl.Result{}, nil
79117
}
80118

81119
// Get the KubeAdmControlPlane
@@ -94,10 +132,21 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
94132
return ctrl.Result{}, fmt.Errorf("failed to get KubeAdmControlPlane %s: %w", kcpKey, err)
95133
}
96134

97-
// If the KubeadmControlPlane is not fully reconciled, we should skip our own reconciliation.
98-
if kcp.Status.ObservedGeneration < kcp.Generation {
99-
logger.V(5).Info("KubeAdmControlPlane is not yet reconciled, skipping failure domain rollout check",
100-
"observedGeneration", kcp.Status.ObservedGeneration, "generation", kcp.Generation)
135+
// Skip reconciliation if either cluster or KCP is being deleted
136+
if r.areResourcesDeleting(&cluster, &kcp) {
137+
logger.V(5).Info("Cluster or KubeadmControlPlane is being deleted, skipping reconciliation")
138+
return ctrl.Result{}, nil
139+
}
140+
141+
// Skip reconciliation if either cluster or KCP is paused
142+
if r.areResourcesPaused(&cluster, &kcp) {
143+
logger.V(5).Info("Cluster or KubeadmControlPlane is paused, skipping reconciliation")
144+
return ctrl.Result{}, nil
145+
}
146+
147+
// Skip reconciliation if either cluster or KCP is not fully reconciled
148+
if r.areResourcesUpdating(&cluster, &kcp) {
149+
logger.V(5).Info("Cluster or KubeadmControlPlane is not yet reconciled, requeuing")
101150
return ctrl.Result{RequeueAfter: 2 * time.Minute}, nil
102151
}
103152

@@ -107,34 +156,37 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
107156
return ctrl.Result{}, fmt.Errorf("failed to determine if rollout is needed: %w", err)
108157
}
109158

110-
if needsRollout {
111-
logger.Info("Rollout needed due to failure domain changes", "reason", reason)
159+
if !needsRollout {
160+
logger.V(5).Info("No rollout needed due to failure domain changes", "reason", reason)
161+
return ctrl.Result{}, nil
162+
}
112163

113-
// Check if we should skip the rollout due to recent or ongoing rollout activities
114-
if shouldSkip, requeueAfter := r.shouldSkipRollout(&kcp); shouldSkip {
115-
logger.Info("Skipping rollout due to recent or ongoing rollout activity",
116-
"reason", reason, "requeueAfter", requeueAfter)
117-
return ctrl.Result{RequeueAfter: requeueAfter}, nil
118-
}
164+
logger.Info("Rollout needed due to failure domain changes", "reason", reason)
119165

120-
logger.Info("Attempting to trigger KCP rollout due to failure domain changes", "reason", reason)
166+
// Check if we should skip the rollout due to recent or ongoing rollout activities
167+
if shouldSkip, requeueAfter := r.shouldSkipRollout(&kcp); shouldSkip {
168+
logger.V(5).Info("Skipping rollout due to recent or ongoing rollout activity",
169+
"reason", reason, "requeueAfter", requeueAfter)
170+
return ctrl.Result{RequeueAfter: requeueAfter}, nil
171+
}
121172

122-
// Set rolloutAfter to trigger immediate rollout
123-
now := metav1.Now()
124-
kcpCopy := kcp.DeepCopy()
125-
kcpCopy.Spec.RolloutAfter = &now
173+
logger.Info("Attempting to trigger KCP rollout due to failure domain changes", "reason", reason)
126174

127-
if err := r.Update(ctx, kcpCopy); err != nil {
128-
return ctrl.Result{}, fmt.Errorf("failed to update KubeAdmControlPlane %s: %w", kcpKey, err)
129-
}
175+
// Set rolloutAfter to trigger immediate rollout
176+
now := metav1.Now()
177+
kcpCopy := kcp.DeepCopy()
178+
kcpCopy.Spec.RolloutAfter = &now
130179

131-
logger.Info(
132-
"Successfully triggered KCP rollout due to failure domain changes",
133-
"rolloutAfter",
134-
now.Format(time.RFC3339),
135-
)
180+
if err := r.Update(ctx, kcpCopy); err != nil {
181+
return ctrl.Result{}, fmt.Errorf("failed to update KubeAdmControlPlane %s: %w", kcpKey, err)
136182
}
137183

184+
logger.Info(
185+
"Successfully triggered KCP rollout due to failure domain changes",
186+
"rolloutAfter",
187+
now.Format(time.RFC3339),
188+
)
189+
138190
return ctrl.Result{}, nil
139191
}
140192

@@ -264,101 +316,52 @@ func (r *Reconciler) hasSuboptimalDistribution(distribution map[string]int, repl
264316
return false
265317
}
266318

267-
// canImproveWithMoreFDs checks if using additional failure domains would improve fault tolerance by comparing
268-
// current max concentration vs ideal max with optimal distribution.
269-
// Also checks if we can reduce the number of FDs at maximum concentration.
319+
// canImproveWithMoreFDs checks if using additional failure domains would improve fault tolerance
320+
// by reducing either the maximum number of replicas per FD, or the number of FDs at maximum concentration.
270321
func (r *Reconciler) canImproveWithMoreFDs(currentDistribution map[string]int, replicas, availableCount int) bool {
271-
if len(currentDistribution) == 0 {
322+
if len(currentDistribution) == 0 || replicas == 0 || availableCount <= len(currentDistribution) {
272323
return false
273324
}
274325

275-
// Find current min and max counts to understand current concentration
276-
minCount, maxCount := replicas, 0
326+
currentMaxPerFD, currentFDsAtMax := 0, 0
277327
for _, count := range currentDistribution {
278-
if count < minCount {
279-
minCount = count
280-
}
281-
if count > maxCount {
282-
maxCount = count
283-
}
284-
}
285-
286-
// When this function is called, we know hasSuboptimalDistribution was false,
287-
// which means maxCount <= calculateMaxIdealPerFD(replicas, availableCount).
288-
// Therefore, we only need to check if we can improve concentration.
289-
290-
currentFDsAtMax := 0
291-
for _, count := range currentDistribution {
292-
if count == maxCount {
328+
if count > currentMaxPerFD {
329+
currentMaxPerFD, currentFDsAtMax = count, 1
330+
} else if count == currentMaxPerFD {
293331
currentFDsAtMax++
294332
}
295333
}
296334

297-
// Calculate optimal number of FDs that should have the maximum
298-
// In optimal distribution, (replicas % availableCount) FDs get the extra replica
299-
optimalFDsAtMax := replicas % availableCount
300-
if optimalFDsAtMax == 0 && replicas > 0 {
301-
// If evenly divisible and we have replicas, all FDs get the same amount
302-
// This means no FDs are "at max" in the sense of having extras
335+
optimalMaxPerFD := r.calculateMaxIdealPerFD(replicas, availableCount)
336+
extra := replicas % availableCount
337+
optimalFDsAtMax := extra
338+
if optimalFDsAtMax == 0 {
303339
optimalFDsAtMax = availableCount
304340
}
305341

306-
// Improvement if we can reduce the number of FDs at maximum concentration
307-
return optimalFDsAtMax < currentFDsAtMax
342+
return optimalMaxPerFD < currentMaxPerFD ||
343+
(optimalMaxPerFD == currentMaxPerFD && optimalFDsAtMax < currentFDsAtMax)
308344
}
309345

310346
// calculateMaxIdealPerFD calculates the maximum number of machines per failure domain in ideal distribution.
311347
func (r *Reconciler) calculateMaxIdealPerFD(replicas, availableCount int) int {
312348
if availableCount == 0 {
313349
return replicas
314350
}
315-
316-
baseReplicasPerFD := replicas / availableCount
317-
extraReplicas := replicas % availableCount
318-
319-
if extraReplicas > 0 {
320-
return baseReplicasPerFD + 1
351+
base := replicas / availableCount
352+
if replicas%availableCount > 0 {
353+
return base + 1
321354
}
322-
323-
return baseReplicasPerFD
355+
return base
324356
}
325357

326358
// getAvailableFailureDomains returns the names of available failure domains for control plane.
327359
func getAvailableFailureDomains(failureDomains clusterv1.FailureDomains) []string {
328360
var available []string
329-
for name, fd := range failureDomains {
330-
if fd.ControlPlane {
331-
available = append(available, name)
361+
for fd, info := range failureDomains {
362+
if info.ControlPlane {
363+
available = append(available, fd)
332364
}
333365
}
334366
return available
335367
}
336-
337-
// kubeadmControlPlaneToCluster maps KubeAdmControlPlane changes to cluster reconcile requests.
338-
func (r *Reconciler) kubeadmControlPlaneToCluster(ctx context.Context, obj client.Object) []reconcile.Request {
339-
kcp, ok := obj.(*controlplanev1.KubeadmControlPlane)
340-
if !ok {
341-
return nil
342-
}
343-
344-
// Find the cluster that owns this KCP
345-
var clusters clusterv1.ClusterList
346-
if err := r.List(ctx, &clusters, client.InNamespace(kcp.Namespace)); err != nil {
347-
return nil
348-
}
349-
350-
for i := range clusters.Items {
351-
if clusters.Items[i].Spec.ControlPlaneRef != nil && clusters.Items[i].Spec.ControlPlaneRef.Name == kcp.Name {
352-
return []reconcile.Request{
353-
{
354-
NamespacedName: types.NamespacedName{
355-
Namespace: clusters.Items[i].Namespace,
356-
Name: clusters.Items[i].Name,
357-
},
358-
},
359-
}
360-
}
361-
}
362-
363-
return nil
364-
}

0 commit comments

Comments
 (0)