Skip to content

Commit 81748bb

Browse files
author
Arvind Thirumurugan
committed
add approvalRequest controller
Signed-off-by: Arvind Thirumurugan <[email protected]>
1 parent fd0033d commit 81748bb

File tree

1 file changed

+378
-0
lines changed

1 file changed

+378
-0
lines changed
Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package approvalrequest features a controller to reconcile ApprovalRequest objects
18+
// and create MetricCollector resources on member clusters for approved stages.
19+
package approvalrequest
20+
21+
import (
22+
"context"
23+
"fmt"
24+
"time"
25+
26+
corev1 "k8s.io/api/core/v1"
27+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
28+
"k8s.io/apimachinery/pkg/api/errors"
29+
"k8s.io/apimachinery/pkg/api/meta"
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/types"
32+
"k8s.io/client-go/tools/record"
33+
"k8s.io/klog/v2"
34+
ctrl "sigs.k8s.io/controller-runtime"
35+
"sigs.k8s.io/controller-runtime/pkg/builder"
36+
"sigs.k8s.io/controller-runtime/pkg/client"
37+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
38+
"sigs.k8s.io/controller-runtime/pkg/predicate"
39+
40+
placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
41+
"github.com/kubefleet-dev/kubefleet/pkg/utils"
42+
)
43+
44+
const (
45+
// metricCollectorFinalizer is the finalizer added to ApprovalRequest objects
46+
metricCollectorFinalizer = "kubernetes-fleet.io/metric-collector-cleanup"
47+
48+
// prometheusURL is the default Prometheus URL to use
49+
prometheusURL = "http://prometheus.fleet-system.svc.cluster.local:9090"
50+
)
51+
52+
// Reconciler reconciles an ApprovalRequest object and creates MetricCollector resources
53+
// on member clusters when the approval is granted.
54+
type Reconciler struct {
55+
client.Client
56+
recorder record.EventRecorder
57+
}
58+
59+
// Reconcile reconciles an ApprovalRequest object.
60+
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
61+
startTime := time.Now()
62+
klog.V(2).InfoS("ApprovalRequest reconciliation starts", "approvalRequest", req.NamespacedName)
63+
defer func() {
64+
latency := time.Since(startTime).Milliseconds()
65+
klog.V(2).InfoS("ApprovalRequest reconciliation ends", "approvalRequest", req.NamespacedName, "latency", latency)
66+
}()
67+
68+
// Fetch the ApprovalRequest
69+
approvalReq := &placementv1beta1.ApprovalRequest{}
70+
if err := r.Client.Get(ctx, req.NamespacedName, approvalReq); err != nil {
71+
if errors.IsNotFound(err) {
72+
klog.V(2).InfoS("ApprovalRequest not found, ignoring", "approvalRequest", req.NamespacedName)
73+
return ctrl.Result{}, nil
74+
}
75+
klog.ErrorS(err, "Failed to get ApprovalRequest", "approvalRequest", req.NamespacedName)
76+
return ctrl.Result{}, err
77+
}
78+
79+
approvalReqRef := klog.KObj(approvalReq)
80+
81+
// Handle deletion
82+
if !approvalReq.DeletionTimestamp.IsZero() {
83+
return r.handleDelete(ctx, approvalReq)
84+
}
85+
86+
// Add finalizer if not present
87+
if !controllerutil.ContainsFinalizer(approvalReq, metricCollectorFinalizer) {
88+
controllerutil.AddFinalizer(approvalReq, metricCollectorFinalizer)
89+
if err := r.Client.Update(ctx, approvalReq); err != nil {
90+
klog.ErrorS(err, "Failed to add finalizer", "approvalRequest", approvalReqRef)
91+
return ctrl.Result{}, err
92+
}
93+
klog.V(2).InfoS("Added finalizer to ApprovalRequest", "approvalRequest", approvalReqRef)
94+
}
95+
96+
// Check if the approval request is approved
97+
approvedCond := meta.FindStatusCondition(approvalReq.Status.Conditions, string(placementv1beta1.ApprovalRequestConditionApproved))
98+
if approvedCond == nil || approvedCond.Status != metav1.ConditionTrue {
99+
klog.V(2).InfoS("ApprovalRequest not yet approved, skipping", "approvalRequest", approvalReqRef)
100+
return ctrl.Result{}, nil
101+
}
102+
103+
// Get the UpdateRun
104+
updateRunName := approvalReq.Spec.TargetUpdateRun
105+
updateRun := &placementv1beta1.StagedUpdateRun{}
106+
if err := r.Client.Get(ctx, types.NamespacedName{Name: updateRunName, Namespace: approvalReq.Namespace}, updateRun); err != nil {
107+
klog.ErrorS(err, "Failed to get UpdateRun", "approvalRequest", approvalReqRef, "updateRun", updateRunName)
108+
return ctrl.Result{}, err
109+
}
110+
111+
// Find the stage
112+
stageName := approvalReq.Spec.TargetStage
113+
var stageStatus *placementv1beta1.StageUpdatingStatus
114+
for i := range updateRun.Status.StagesStatus {
115+
if updateRun.Status.StagesStatus[i].StageName == stageName {
116+
stageStatus = &updateRun.Status.StagesStatus[i]
117+
break
118+
}
119+
}
120+
121+
if stageStatus == nil {
122+
err := fmt.Errorf("stage %s not found in UpdateRun %s", stageName, updateRunName)
123+
klog.ErrorS(err, "Failed to find stage", "approvalRequest", approvalReqRef)
124+
return ctrl.Result{}, err
125+
}
126+
127+
// Get all cluster names from the stage
128+
clusterNames := make([]string, 0, len(stageStatus.Clusters))
129+
for _, cluster := range stageStatus.Clusters {
130+
clusterNames = append(clusterNames, cluster.ClusterName)
131+
}
132+
133+
if len(clusterNames) == 0 {
134+
klog.V(2).InfoS("No clusters in stage, skipping", "approvalRequest", approvalReqRef, "stage", stageName)
135+
return ctrl.Result{}, nil
136+
}
137+
138+
klog.V(2).InfoS("Found clusters in stage", "approvalRequest", approvalReqRef, "stage", stageName, "clusters", clusterNames)
139+
140+
// Create or update the MetricCollector resource, CRP, and ResourceOverrides
141+
if err := r.ensureMetricCollectorResources(ctx, approvalReq, clusterNames, updateRunName, stageName); err != nil {
142+
klog.ErrorS(err, "Failed to ensure MetricCollector resources", "approvalRequest", approvalReqRef)
143+
return ctrl.Result{}, err
144+
}
145+
146+
klog.V(2).InfoS("Successfully ensured MetricCollector resources", "approvalRequest", approvalReqRef, "clusters", clusterNames)
147+
return ctrl.Result{}, nil
148+
}
149+
150+
// ensureMetricCollectorResources creates the Namespace, MetricCollector, CRP, and ResourceOverrides
151+
func (r *Reconciler) ensureMetricCollectorResources(
152+
ctx context.Context,
153+
approvalReq *placementv1beta1.ApprovalRequest,
154+
clusterNames []string,
155+
updateRunName, stageName string,
156+
) error {
157+
// Generate names - namespace is derived from updateRun name
158+
namespaceName := fmt.Sprintf("mc-%s", updateRunName)
159+
metricCollectorName := fmt.Sprintf("mc-%s", stageName)
160+
crpName := fmt.Sprintf("crp-mc-%s-%s", updateRunName, stageName)
161+
roName := fmt.Sprintf("ro-mc-%s-%s", updateRunName, stageName)
162+
163+
// Create Namespace on hub
164+
namespace := &corev1.Namespace{
165+
ObjectMeta: metav1.ObjectMeta{
166+
Name: namespaceName,
167+
Labels: map[string]string{
168+
"app": "metric-collector",
169+
"update-run": updateRunName,
170+
},
171+
},
172+
}
173+
174+
existingNS := &corev1.Namespace{}
175+
err := r.Client.Get(ctx, types.NamespacedName{Name: namespaceName}, existingNS)
176+
if err != nil {
177+
if errors.IsNotFound(err) {
178+
if err := r.Client.Create(ctx, namespace); err != nil {
179+
return fmt.Errorf("failed to create Namespace: %w", err)
180+
}
181+
klog.V(2).InfoS("Created Namespace", "namespace", namespaceName)
182+
} else {
183+
return fmt.Errorf("failed to get Namespace: %w", err)
184+
}
185+
}
186+
187+
// Create MetricCollector resource (template) in the namespace on hub
188+
metricCollector := &placementv1beta1.MetricCollector{
189+
ObjectMeta: metav1.ObjectMeta{
190+
Name: metricCollectorName,
191+
Namespace: namespaceName,
192+
Labels: map[string]string{
193+
"app": "metric-collector",
194+
"approval-request": approvalReq.Name,
195+
"update-run": updateRunName,
196+
"stage": stageName,
197+
},
198+
},
199+
Spec: placementv1beta1.MetricCollectorSpec{
200+
PrometheusURL: prometheusURL,
201+
// ReportNamespace will be overridden per cluster
202+
ReportNamespace: "placeholder",
203+
},
204+
}
205+
206+
// Create or update MetricCollector
207+
existingMC := &placementv1beta1.MetricCollector{}
208+
err = r.Client.Get(ctx, types.NamespacedName{Name: metricCollectorName, Namespace: namespaceName}, existingMC)
209+
if err != nil {
210+
if errors.IsNotFound(err) {
211+
if err := r.Client.Create(ctx, metricCollector); err != nil {
212+
return fmt.Errorf("failed to create MetricCollector: %w", err)
213+
}
214+
klog.V(2).InfoS("Created MetricCollector", "metricCollector", klog.KObj(metricCollector))
215+
} else {
216+
return fmt.Errorf("failed to get MetricCollector: %w", err)
217+
}
218+
}
219+
220+
// Create ClusterResourcePlacement with PickFixed policy
221+
crp := &placementv1beta1.ClusterResourcePlacement{
222+
ObjectMeta: metav1.ObjectMeta{
223+
Name: crpName,
224+
Labels: map[string]string{
225+
"approval-request": approvalReq.Name,
226+
"update-run": updateRunName,
227+
"stage": stageName,
228+
},
229+
},
230+
Spec: placementv1beta1.PlacementSpec{
231+
ResourceSelectors: []placementv1beta1.ResourceSelectorTerm{
232+
{
233+
Group: "placement.kubernetes-fleet.io",
234+
Version: "v1beta1",
235+
Kind: "MetricCollector",
236+
Name: metricCollectorName,
237+
},
238+
},
239+
Policy: &placementv1beta1.PlacementPolicy{
240+
PlacementType: placementv1beta1.PickFixedPlacementType,
241+
ClusterNames: clusterNames,
242+
},
243+
},
244+
}
245+
246+
// Create or update CRP
247+
existingCRP := &placementv1beta1.ClusterResourcePlacement{}
248+
err = r.Client.Get(ctx, types.NamespacedName{Name: crpName}, existingCRP)
249+
if err != nil {
250+
if errors.IsNotFound(err) {
251+
if err := r.Client.Create(ctx, crp); err != nil {
252+
return fmt.Errorf("failed to create ClusterResourcePlacement: %w", err)
253+
}
254+
klog.V(2).InfoS("Created ClusterResourcePlacement", "crp", crpName)
255+
} else {
256+
return fmt.Errorf("failed to get ClusterResourcePlacement: %w", err)
257+
}
258+
}
259+
260+
// Create ResourceOverride with rules for each cluster
261+
overrideRules := make([]placementv1beta1.OverrideRule, 0, len(clusterNames))
262+
for _, clusterName := range clusterNames {
263+
reportNamespace := fmt.Sprintf(utils.NamespaceNameFormat, clusterName)
264+
265+
overrideRules = append(overrideRules, placementv1beta1.OverrideRule{
266+
ClusterSelector: &placementv1beta1.ClusterSelector{
267+
ClusterSelectorTerms: []placementv1beta1.ClusterSelectorTerm{
268+
{
269+
LabelSelector: &metav1.LabelSelector{
270+
MatchLabels: map[string]string{
271+
"kubernetes-fleet.io/cluster-name": clusterName,
272+
},
273+
},
274+
},
275+
},
276+
},
277+
JSONPatchOverrides: []placementv1beta1.JSONPatchOverride{
278+
{
279+
Operator: placementv1beta1.JSONPatchOverrideOpReplace,
280+
Path: "/spec/reportNamespace",
281+
Value: apiextensionsv1.JSON{Raw: []byte(fmt.Sprintf(`"%s"`, reportNamespace))},
282+
},
283+
},
284+
})
285+
}
286+
287+
resourceOverride := &placementv1beta1.ResourceOverride{
288+
ObjectMeta: metav1.ObjectMeta{
289+
Name: roName,
290+
Namespace: namespaceName,
291+
Labels: map[string]string{
292+
"approval-request": approvalReq.Name,
293+
"update-run": updateRunName,
294+
"stage": stageName,
295+
},
296+
},
297+
Spec: placementv1beta1.ResourceOverrideSpec{
298+
ResourceSelectors: []placementv1beta1.ResourceSelector{
299+
{
300+
Group: "placement.kubernetes-fleet.io",
301+
Version: "v1beta1",
302+
Kind: "MetricCollector",
303+
Name: metricCollectorName,
304+
},
305+
},
306+
Policy: &placementv1beta1.OverridePolicy{
307+
OverrideRules: overrideRules,
308+
},
309+
},
310+
}
311+
312+
existingRO := &placementv1beta1.ResourceOverride{}
313+
err = r.Client.Get(ctx, types.NamespacedName{Name: roName, Namespace: namespaceName}, existingRO)
314+
if err != nil {
315+
if errors.IsNotFound(err) {
316+
if err := r.Client.Create(ctx, resourceOverride); err != nil {
317+
return fmt.Errorf("failed to create ResourceOverride: %w", err)
318+
}
319+
klog.V(2).InfoS("Created ResourceOverride", "resourceOverride", roName)
320+
} else {
321+
return fmt.Errorf("failed to get ResourceOverride: %w", err)
322+
}
323+
}
324+
325+
return nil
326+
}
327+
328+
// handleDelete handles the deletion of an ApprovalRequest
329+
func (r *Reconciler) handleDelete(ctx context.Context, approvalReq *placementv1beta1.ApprovalRequest) (ctrl.Result, error) {
330+
if !controllerutil.ContainsFinalizer(approvalReq, metricCollectorFinalizer) {
331+
return ctrl.Result{}, nil
332+
}
333+
334+
approvalReqRef := klog.KObj(approvalReq)
335+
klog.V(2).InfoS("Cleaning up resources for ApprovalRequest", "approvalRequest", approvalReqRef)
336+
337+
// Delete CRP (it will cascade delete the resources on member clusters)
338+
updateRunName := approvalReq.Spec.TargetUpdateRun
339+
stageName := approvalReq.Spec.TargetStage
340+
crpName := fmt.Sprintf("crp-mc-%s-%s", updateRunName, stageName)
341+
342+
crp := &placementv1beta1.ClusterResourcePlacement{}
343+
if err := r.Client.Get(ctx, types.NamespacedName{Name: crpName}, crp); err == nil {
344+
if err := r.Client.Delete(ctx, crp); err != nil && !errors.IsNotFound(err) {
345+
return ctrl.Result{}, fmt.Errorf("failed to delete CRP: %w", err)
346+
}
347+
klog.V(2).InfoS("Deleted ClusterResourcePlacement", "crp", crpName)
348+
}
349+
350+
// Delete the namespace (this will delete MetricCollector and ResourceOverride)
351+
namespaceName := fmt.Sprintf("mc-%s", updateRunName)
352+
namespace := &corev1.Namespace{}
353+
if err := r.Client.Get(ctx, types.NamespacedName{Name: namespaceName}, namespace); err == nil {
354+
if err := r.Client.Delete(ctx, namespace); err != nil && !errors.IsNotFound(err) {
355+
return ctrl.Result{}, fmt.Errorf("failed to delete Namespace: %w", err)
356+
}
357+
klog.V(2).InfoS("Deleted Namespace", "namespace", namespaceName)
358+
}
359+
360+
// Remove finalizer
361+
controllerutil.RemoveFinalizer(approvalReq, metricCollectorFinalizer)
362+
if err := r.Client.Update(ctx, approvalReq); err != nil {
363+
klog.ErrorS(err, "Failed to remove finalizer", "approvalRequest", approvalReqRef)
364+
return ctrl.Result{}, err
365+
}
366+
367+
klog.V(2).InfoS("Successfully cleaned up resources", "approvalRequest", approvalReqRef)
368+
return ctrl.Result{}, nil
369+
}
370+
371+
// SetupWithManager sets up the controller with the Manager.
372+
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
373+
r.recorder = mgr.GetEventRecorderFor("approvalrequest-controller")
374+
return ctrl.NewControllerManagedBy(mgr).
375+
Named("approvalrequest-controller").
376+
For(&placementv1beta1.ApprovalRequest{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
377+
Complete(r)
378+
}

0 commit comments

Comments
 (0)