Skip to content

Commit 57900ed

Browse files
feat(controllers): add failure domain rollout controller
Adds a new controller that monitors cluster.status.failureDomains and triggers rollouts on KubeAdmControlPlane when failure domains currently in use by control plane machines are disabled or removed. The controller is enabled by default and can be disabled with --failure-domain-rollout-enabled=false.
1 parent 9c71eb0 commit 57900ed

File tree

6 files changed

+727
-0
lines changed

6 files changed

+727
-0
lines changed

charts/cluster-api-runtime-extensions-nutanix/templates/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,21 @@ rules:
7575
- cluster.x-k8s.io
7676
resources:
7777
- clusters
78+
- machines
7879
verbs:
7980
- get
8081
- list
8182
- watch
8283
- apiGroups:
8384
- cluster.x-k8s.io
8485
resources:
86+
- clusters/status
87+
verbs:
88+
- get
89+
- apiGroups:
90+
- cluster.x-k8s.io
91+
resources:
92+
- kubeadmcontrolplanes
8593
- machinedeployments
8694
verbs:
8795
- get

cmd/main.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/nutanix-cloud-native/cluster-api-runtime-extensions-nutanix/common/pkg/capi/clustertopology/handlers"
3333
"github.com/nutanix-cloud-native/cluster-api-runtime-extensions-nutanix/common/pkg/server"
3434
"github.com/nutanix-cloud-native/cluster-api-runtime-extensions-nutanix/pkg/controllers/enforceclusterautoscalerlimits"
35+
"github.com/nutanix-cloud-native/cluster-api-runtime-extensions-nutanix/pkg/controllers/failuredomainrollout"
3536
"github.com/nutanix-cloud-native/cluster-api-runtime-extensions-nutanix/pkg/controllers/namespacesync"
3637
"github.com/nutanix-cloud-native/cluster-api-runtime-extensions-nutanix/pkg/feature"
3738
"github.com/nutanix-cloud-native/cluster-api-runtime-extensions-nutanix/pkg/handlers/aws"
@@ -121,6 +122,7 @@ func main() {
121122

122123
namespacesyncOptions := namespacesync.Options{}
123124
enforceClusterAutoscalerLimitsOptions := enforceclusterautoscalerlimits.Options{}
125+
failureDomainRolloutOptions := failuredomainrollout.Options{}
124126

125127
// Initialize and parse command line flags.
126128
logs.AddFlags(pflag.CommandLine, logs.SkipLoggingConfigurationFlags())
@@ -133,6 +135,7 @@ func main() {
133135
nutanixMetaHandlers.AddFlags(pflag.CommandLine)
134136
namespacesyncOptions.AddFlags(pflag.CommandLine)
135137
enforceClusterAutoscalerLimitsOptions.AddFlags(pflag.CommandLine)
138+
failureDomainRolloutOptions.AddFlags(pflag.CommandLine)
136139
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
137140
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
138141

@@ -232,6 +235,23 @@ func main() {
232235
}
233236
}
234237

238+
if failureDomainRolloutOptions.Enabled {
239+
if err := (&failuredomainrollout.Reconciler{
240+
Client: mgr.GetClient(),
241+
}).SetupWithManager(
242+
mgr,
243+
&controller.Options{MaxConcurrentReconciles: failureDomainRolloutOptions.Concurrency},
244+
); err != nil {
245+
setupLog.Error(
246+
err,
247+
"unable to create controller",
248+
"controller",
249+
"failuredomainrollout.Reconciler",
250+
)
251+
os.Exit(1)
252+
}
253+
}
254+
235255
mgr.GetWebhookServer().Register("/mutate-v1beta1-cluster", &webhook.Admission{
236256
Handler: cluster.NewDefaulter(mgr.GetClient(), admission.NewDecoder(mgr.GetScheme())),
237257
})
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
// Copyright 2025 Nutanix. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package failuredomainrollout
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"sort"
10+
"time"
11+
12+
apierrors "k8s.io/apimachinery/pkg/api/errors"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/types"
15+
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
16+
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
17+
ctrl "sigs.k8s.io/controller-runtime"
18+
"sigs.k8s.io/controller-runtime/pkg/builder"
19+
"sigs.k8s.io/controller-runtime/pkg/client"
20+
"sigs.k8s.io/controller-runtime/pkg/controller"
21+
"sigs.k8s.io/controller-runtime/pkg/handler"
22+
"sigs.k8s.io/controller-runtime/pkg/predicate"
23+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
24+
)
25+
26+
const (
27+
// FailureDomainLastUpdateAnnotation is the annotation key used to store the last failure domain update time
28+
FailureDomainLastUpdateAnnotation = "caren.nutanix.com/failure-domain-last-update"
29+
// FailureDomainHashAnnotation is the annotation key used to store the hash of the failure domains
30+
FailureDomainHashAnnotation = "caren.nutanix.com/failure-domain-hash"
31+
)
32+
33+
type Reconciler struct {
34+
client.Client
35+
}
36+
37+
func (r *Reconciler) SetupWithManager(
38+
mgr ctrl.Manager,
39+
options *controller.Options,
40+
) error {
41+
return ctrl.NewControllerManagedBy(mgr).
42+
For(&clusterv1.Cluster{}).
43+
Watches(
44+
&controlplanev1.KubeadmControlPlane{},
45+
handler.EnqueueRequestsFromMapFunc(r.kubeadmControlPlaneToCluster),
46+
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
47+
).
48+
WithOptions(*options).
49+
Complete(r)
50+
}
51+
52+
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
53+
logger := ctrl.LoggerFrom(ctx).WithValues("cluster", req.NamespacedName)
54+
55+
var cluster clusterv1.Cluster
56+
if err := r.Get(ctx, req.NamespacedName, &cluster); err != nil {
57+
if apierrors.IsNotFound(err) {
58+
logger.V(5).Info("Cluster not found, skipping reconciliation")
59+
return ctrl.Result{}, nil
60+
}
61+
return ctrl.Result{}, fmt.Errorf("failed to get Cluster %s: %w", req.NamespacedName, err)
62+
}
63+
64+
// Skip if cluster is not using topology
65+
if cluster.Spec.Topology == nil {
66+
logger.V(5).Info("Cluster is not using topology, skipping reconciliation")
67+
return ctrl.Result{}, nil
68+
}
69+
70+
// Skip if cluster doesn't have a control plane reference
71+
if cluster.Spec.ControlPlaneRef == nil {
72+
logger.V(5).Info("Cluster has no control plane reference, skipping reconciliation")
73+
return ctrl.Result{}, nil
74+
}
75+
76+
// Skip if cluster doesn't have failure domains
77+
if cluster.Status.FailureDomains == nil || len(cluster.Status.FailureDomains) == 0 {
78+
logger.V(5).Info("Cluster has no failure domains, skipping reconciliation")
79+
return ctrl.Result{}, nil
80+
}
81+
82+
// Get the KubeAdmControlPlane
83+
var kcp controlplanev1.KubeadmControlPlane
84+
kcpKey := types.NamespacedName{
85+
Namespace: cluster.Namespace,
86+
Name: cluster.Spec.ControlPlaneRef.Name,
87+
}
88+
if err := r.Get(ctx, kcpKey, &kcp); err != nil {
89+
if apierrors.IsNotFound(err) {
90+
logger.V(5).Info("KubeAdmControlPlane not found, skipping reconciliation")
91+
return ctrl.Result{}, nil
92+
}
93+
return ctrl.Result{}, fmt.Errorf("failed to get KubeAdmControlPlane %s: %w", kcpKey, err)
94+
}
95+
96+
// Check if we need to trigger a rollout
97+
needsRollout, reason, err := r.shouldTriggerRollout(ctx, &cluster, &kcp)
98+
if err != nil {
99+
return ctrl.Result{}, fmt.Errorf("failed to determine if rollout is needed: %w", err)
100+
}
101+
102+
if needsRollout {
103+
logger.Info("Triggering rollout due to failure domain changes", "reason", reason)
104+
105+
// Set rolloutAfter to trigger immediate rollout
106+
now := metav1.Now()
107+
kcpCopy := kcp.DeepCopy()
108+
kcpCopy.Spec.RolloutAfter = &now
109+
110+
// Update the annotation to track the last update
111+
if kcpCopy.Annotations == nil {
112+
kcpCopy.Annotations = make(map[string]string)
113+
}
114+
kcpCopy.Annotations[FailureDomainLastUpdateAnnotation] = now.Format(time.RFC3339)
115+
116+
// Store the current failure domain hash
117+
fdHash := r.calculateFailureDomainHash(cluster.Status.FailureDomains)
118+
kcpCopy.Annotations[FailureDomainHashAnnotation] = fdHash
119+
120+
if err := r.Update(ctx, kcpCopy); err != nil {
121+
return ctrl.Result{}, fmt.Errorf("failed to update KubeAdmControlPlane %s: %w", kcpKey, err)
122+
}
123+
124+
logger.Info("Successfully triggered rollout", "rolloutAfter", now.Format(time.RFC3339))
125+
}
126+
127+
return ctrl.Result{}, nil
128+
}
129+
130+
// shouldTriggerRollout determines if a rollout should be triggered based on failure domain changes
131+
func (r *Reconciler) shouldTriggerRollout(
132+
ctx context.Context,
133+
cluster *clusterv1.Cluster,
134+
kcp *controlplanev1.KubeadmControlPlane,
135+
) (bool, string, error) {
136+
logger := ctrl.LoggerFrom(ctx).WithValues("cluster", client.ObjectKeyFromObject(cluster))
137+
138+
// Calculate current failure domain hash
139+
currentFDHash := r.calculateFailureDomainHash(cluster.Status.FailureDomains)
140+
141+
// Get the stored hash from annotations
142+
storedFDHash, hasStoredHash := kcp.Annotations[FailureDomainHashAnnotation]
143+
144+
// If no stored hash, this is the first time - store the hash but don't trigger rollout
145+
if !hasStoredHash {
146+
logger.V(5).Info("No previous failure domain hash found, storing current hash")
147+
return false, "", nil
148+
}
149+
150+
// If hashes are the same, no changes detected
151+
if currentFDHash == storedFDHash {
152+
logger.V(5).Info("No failure domain changes detected")
153+
return false, "", nil
154+
}
155+
156+
// Get the current KCP machines to understand current placement
157+
currentlyUsedFailureDomains, err := r.getCurrentlyUsedFailureDomains(ctx, cluster, kcp)
158+
if err != nil {
159+
return false, "", fmt.Errorf("failed to get currently used failure domains: %w", err)
160+
}
161+
162+
logger.V(5).Info("Analyzing failure domain changes",
163+
"currentlyUsedFailureDomains", currentlyUsedFailureDomains,
164+
"availableFailureDomains", getAvailableFailureDomains(cluster.Status.FailureDomains))
165+
166+
// Check if any currently used failure domain is disabled or removed
167+
for _, usedFD := range currentlyUsedFailureDomains {
168+
if fd, exists := cluster.Status.FailureDomains[usedFD]; !exists {
169+
return true, fmt.Sprintf("failure domain %s is removed", usedFD), nil
170+
} else if !fd.ControlPlane {
171+
return true, fmt.Sprintf("failure domain %s is disabled for control plane", usedFD), nil
172+
}
173+
}
174+
175+
// If we reach here, failure domains changed but no meaningful impact
176+
// (e.g., new failure domains added but existing ones still valid)
177+
logger.V(5).Info("Failure domains changed but no meaningful impact detected")
178+
return false, "", nil
179+
}
180+
181+
// getCurrentlyUsedFailureDomains returns the failure domains currently used by the KCP machines
182+
func (r *Reconciler) getCurrentlyUsedFailureDomains(
183+
ctx context.Context,
184+
cluster *clusterv1.Cluster,
185+
kcp *controlplanev1.KubeadmControlPlane,
186+
) ([]string, error) {
187+
var machines clusterv1.MachineList
188+
if err := r.List(ctx, &machines, client.InNamespace(cluster.Namespace), client.MatchingLabels{
189+
clusterv1.ClusterNameLabel: cluster.Name,
190+
clusterv1.MachineControlPlaneLabel: "",
191+
}); err != nil {
192+
return nil, fmt.Errorf("failed to list control plane machines: %w", err)
193+
}
194+
195+
usedFailureDomains := make(map[string]struct{})
196+
for _, machine := range machines.Items {
197+
if machine.Spec.FailureDomain != nil {
198+
usedFailureDomains[*machine.Spec.FailureDomain] = struct{}{}
199+
}
200+
}
201+
202+
result := make([]string, 0, len(usedFailureDomains))
203+
for fd := range usedFailureDomains {
204+
result = append(result, fd)
205+
}
206+
return result, nil
207+
}
208+
209+
// calculateFailureDomainHash calculates a hash of the failure domains for comparison
210+
func (r *Reconciler) calculateFailureDomainHash(failureDomains clusterv1.FailureDomains) string {
211+
if len(failureDomains) == 0 {
212+
return ""
213+
}
214+
215+
// Create a deterministic representation of the failure domains
216+
// focusing on control plane eligible domains
217+
var controlPlaneDomains []string
218+
for name, fd := range failureDomains {
219+
if fd.ControlPlane {
220+
controlPlaneDomains = append(controlPlaneDomains, name)
221+
}
222+
}
223+
224+
// Sort to ensure deterministic hash
225+
sort.Strings(controlPlaneDomains)
226+
227+
// Use a simple concatenation for now - in production, consider using a proper hash function
228+
result := ""
229+
for _, name := range controlPlaneDomains {
230+
result += name + ","
231+
}
232+
return result
233+
}
234+
235+
// getAvailableFailureDomains returns the names of available failure domains for control plane
236+
func getAvailableFailureDomains(failureDomains clusterv1.FailureDomains) []string {
237+
var available []string
238+
for name, fd := range failureDomains {
239+
if fd.ControlPlane {
240+
available = append(available, name)
241+
}
242+
}
243+
return available
244+
}
245+
246+
// kubeadmControlPlaneToCluster maps KubeAdmControlPlane changes to cluster reconcile requests
247+
func (r *Reconciler) kubeadmControlPlaneToCluster(ctx context.Context, obj client.Object) []reconcile.Request {
248+
kcp, ok := obj.(*controlplanev1.KubeadmControlPlane)
249+
if !ok {
250+
return nil
251+
}
252+
253+
// Find the cluster that owns this KCP
254+
var clusters clusterv1.ClusterList
255+
if err := r.List(ctx, &clusters, client.InNamespace(kcp.Namespace)); err != nil {
256+
return nil
257+
}
258+
259+
for _, cluster := range clusters.Items {
260+
if cluster.Spec.ControlPlaneRef != nil && cluster.Spec.ControlPlaneRef.Name == kcp.Name {
261+
return []reconcile.Request{
262+
{
263+
NamespacedName: types.NamespacedName{
264+
Namespace: cluster.Namespace,
265+
Name: cluster.Name,
266+
},
267+
},
268+
}
269+
}
270+
}
271+
272+
return nil
273+
}

0 commit comments

Comments
 (0)