Skip to content

Commit 7abbf8c

Browse files
author
Arvind Thirumurugan
committed
update functions used by initialize
Signed-off-by: Arvind Thirumurugan <[email protected]>
1 parent c0bd328 commit 7abbf8c

File tree

14 files changed

+913
-214
lines changed

14 files changed

+913
-214
lines changed

.github/.copilot/breadcrumbs/2025-09-25-1430-initialize-refactor.md

Lines changed: 532 additions & 0 deletions
Large diffs are not rendered by default.

pkg/controllers/placement/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
7272
klog.V(2).InfoS("Placement reconciliation ends", "placementKey", placementKey, "latency", latency)
7373
}()
7474

75-
placementObj, err := controller.FetchPlacementFromKey(ctx, r.Client, queue.PlacementKey(placementKey))
75+
placementObj, err := controller.FetchPlacementFromQueueKey(ctx, r.Client, queue.PlacementKey(placementKey))
7676
if err != nil {
7777
if apierrors.IsNotFound(err) {
7878
klog.V(4).InfoS("Ignoring NotFound placement", "placementKey", placementKey)

pkg/controllers/rollout/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
7272
}()
7373

7474
// Get the placement object (either ClusterResourcePlacement or ResourcePlacement)
75-
placementObj, err := controller.FetchPlacementFromKey(ctx, r.Client, controller.GetObjectKeyFromRequest(req))
75+
placementObj, err := controller.FetchPlacementFromQueueKey(ctx, r.Client, controller.GetObjectKeyFromRequest(req))
7676
if err != nil {
7777
if errors.IsNotFound(err) {
7878
klog.V(4).InfoS("Ignoring NotFound placement", "placementKey", placementKey)

pkg/controllers/updaterun/controller.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -378,13 +378,15 @@ func emitUpdateRunStatusMetric(updateRun *placementv1beta1.ClusterStagedUpdateRu
378378
klog.V(2).InfoS("There's no valid status condition on updateRun, status updating failed possibly", "updateRun", klog.KObj(updateRun))
379379
}
380380

381-
func removeWaitTimeFromUpdateRunStatus(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
381+
func removeWaitTimeFromUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj) {
382382
// Remove waitTime from the updateRun status for AfterStageTask for type Approval.
383-
if updateRun.Status.UpdateStrategySnapshot != nil {
384-
for i := range updateRun.Status.UpdateStrategySnapshot.Stages {
385-
for j := range updateRun.Status.UpdateStrategySnapshot.Stages[i].AfterStageTasks {
386-
if updateRun.Status.UpdateStrategySnapshot.Stages[i].AfterStageTasks[j].Type == placementv1beta1.AfterStageTaskTypeApproval {
387-
updateRun.Status.UpdateStrategySnapshot.Stages[i].AfterStageTasks[j].WaitTime = nil
383+
// Use interface methods to access and update status
384+
updateRunStatus := updateRun.GetUpdateRunStatus()
385+
if updateRunStatus.UpdateStrategySnapshot != nil {
386+
for i := range updateRunStatus.UpdateStrategySnapshot.Stages {
387+
for j := range updateRunStatus.UpdateStrategySnapshot.Stages[i].AfterStageTasks {
388+
if updateRunStatus.UpdateStrategySnapshot.Stages[i].AfterStageTasks[j].Type == placementv1beta1.AfterStageTaskTypeApproval {
389+
updateRunStatus.UpdateStrategySnapshot.Stages[i].AfterStageTasks[j].WaitTime = nil
388390
}
389391
}
390392
}

pkg/controllers/updaterun/initialization.go

Lines changed: 246 additions & 163 deletions
Large diffs are not rendered by default.

pkg/controllers/updaterun/initialization_integration_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const (
4949
regionWestus = "westus"
5050
)
5151

52-
var _ = Describe("Updaterun initialization tests", func() {
52+
var _ = FDescribe("Updaterun initialization tests", func() {
5353
var updateRun *placementv1beta1.ClusterStagedUpdateRun
5454
var crp *placementv1beta1.ClusterResourcePlacement
5555
var policySnapshot *placementv1beta1.ClusterSchedulingPolicySnapshot
@@ -138,7 +138,7 @@ var _ = Describe("Updaterun initialization tests", func() {
138138
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
139139

140140
By("Validating the initialization failed")
141-
validateFailedInitCondition(ctx, updateRun, "parent clusterResourcePlacement not found")
141+
validateFailedInitCondition(ctx, updateRun, "parent placement not found")
142142
})
143143

144144
It("Should fail to initialize if CRP does not have external rollout strategy type", func() {
@@ -151,7 +151,7 @@ var _ = Describe("Updaterun initialization tests", func() {
151151

152152
By("Validating the initialization failed")
153153
validateFailedInitCondition(ctx, updateRun,
154-
"parent clusterResourcePlacement does not have an external rollout strategy")
154+
"parent placement does not have an external rollout strategy")
155155
})
156156

157157
It("Should copy the ApplyStrategy in the CRP to the UpdateRun", func() {
@@ -405,7 +405,7 @@ var _ = Describe("Updaterun initialization tests", func() {
405405
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
406406

407407
By("Validating the initialization failed")
408-
validateFailedInitCondition(ctx, updateRun, "no scheduled or to-be-deleted clusterResourceBindings found")
408+
validateFailedInitCondition(ctx, updateRun, "no scheduled or to-be-deleted bindings found")
409409
})
410410

411411
It("Should fail to initialize if the number of selected bindings does not match the observed cluster count", func() {
@@ -505,7 +505,7 @@ var _ = Describe("Updaterun initialization tests", func() {
505505

506506
By("Validating the initialization not failed due to no selected cluster")
507507
// it should fail due to strategy not found
508-
validateFailedInitCondition(ctx, updateRun, "referenced clusterStagedUpdateStrategy not found")
508+
validateFailedInitCondition(ctx, updateRun, "referenced stagedUpdateStrategyObj not found")
509509
})
510510

511511
It("Should update the ObservedClusterCount to the number of scheduled bindings if it's pickAll policy", func() {
@@ -517,7 +517,7 @@ var _ = Describe("Updaterun initialization tests", func() {
517517

518518
By("Validating the initialization not failed due to no selected cluster")
519519
// it should fail due to strategy not found
520-
validateFailedInitCondition(ctx, updateRun, "referenced clusterStagedUpdateStrategy not found")
520+
validateFailedInitCondition(ctx, updateRun, "referenced stagedUpdateStrategyObj not found")
521521

522522
By("Validating the ObservedClusterCount is updated")
523523
Expect(updateRun.Status.PolicyObservedClusterCount).To(Equal(1), "failed to update the updateRun PolicyObservedClusterCount status")
@@ -565,7 +565,7 @@ var _ = Describe("Updaterun initialization tests", func() {
565565
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
566566

567567
By("Validating the initialization failed")
568-
validateFailedInitCondition(ctx, updateRun, "referenced clusterStagedUpdateStrategy not found")
568+
validateFailedInitCondition(ctx, updateRun, "referenced stagedUpdateStrategyObj not found")
569569
})
570570

571571
Context("Test computeRunStageStatus", func() {
@@ -777,7 +777,7 @@ var _ = Describe("Updaterun initialization tests", func() {
777777
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
778778

779779
By("Validating the initialization failed")
780-
validateFailedInitCondition(ctx, updateRun, "no clusterResourceSnapshots with index `0` found")
780+
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")
781781

782782
By("Checking update run status metrics are emitted")
783783
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
@@ -792,7 +792,7 @@ var _ = Describe("Updaterun initialization tests", func() {
792792
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
793793

794794
By("Validating the initialization failed")
795-
validateFailedInitCondition(ctx, updateRun, "no clusterResourceSnapshots with index `0` found")
795+
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")
796796

797797
By("Checking update run status metrics are emitted")
798798
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
@@ -807,7 +807,7 @@ var _ = Describe("Updaterun initialization tests", func() {
807807
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
808808

809809
By("Validating the initialization failed")
810-
validateFailedInitCondition(ctx, updateRun, "no clusterResourceSnapshots with index `0` found")
810+
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")
811811

812812
By("Checking update run status metrics are emitted")
813813
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
@@ -822,7 +822,7 @@ var _ = Describe("Updaterun initialization tests", func() {
822822
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
823823

824824
By("Validating the initialization failed")
825-
validateFailedInitCondition(ctx, updateRun, "no master clusterResourceSnapshot found for clusterResourcePlacement")
825+
validateFailedInitCondition(ctx, updateRun, "no master resourceSnapshot found for clusterResourcePlacement")
826826

827827
By("Checking update run status metrics are emitted")
828828
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))

pkg/controllers/updaterun/validation.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (r *Reconciler) validate(
4444
klog.V(2).InfoS("Start to validate the clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
4545

4646
// Validate the ClusterResourcePlacement object referenced by the ClusterStagedUpdateRun.
47-
placementName, err := r.validateCRP(ctx, updateRunCopy)
47+
placementName, err := r.validatePlacement(ctx, updateRunCopy)
4848
if err != nil {
4949
return -1, nil, nil, err
5050
}
@@ -75,16 +75,27 @@ func (r *Reconciler) validate(
7575
}
7676

7777
// Collect the clusters by the corresponding ClusterResourcePlacement with the latest policy snapshot.
78-
scheduledBindings, toBeDeletedBindings, err := r.collectScheduledClusters(ctx, placementName, latestPolicySnapshot, updateRunCopy)
78+
scheduledBindingObjs, toBeDeletedBindingObjs, err := r.collectScheduledClusters(ctx, placementName, latestPolicySnapshot, updateRunCopy)
7979
if err != nil {
8080
return -1, nil, nil, err
8181
}
8282

8383
// Validate the stages and return the updating stage index.
84-
updatingStageIndex, err := r.validateStagesStatus(ctx, scheduledBindings, toBeDeletedBindings, updateRun, updateRunCopy)
84+
updatingStageIndex, err := r.validateStagesStatus(ctx, scheduledBindingObjs, toBeDeletedBindingObjs, updateRun, updateRunCopy)
8585
if err != nil {
8686
return -1, nil, nil, err
8787
}
88+
89+
// TODO (arvindth): remove this conversion step after refactoring other functions to use interface types.
90+
scheduledBindings, err := controller.ConvertBindingObjsToConcreteCRBArray(scheduledBindingObjs)
91+
if err != nil {
92+
return -1, nil, nil, err
93+
}
94+
toBeDeletedBindings, err := controller.ConvertBindingObjsToConcreteCRBArray(toBeDeletedBindingObjs)
95+
if err != nil {
96+
return -1, nil, nil, err
97+
}
98+
8899
return updatingStageIndex, scheduledBindings, toBeDeletedBindings, nil
89100
}
90101

@@ -95,7 +106,7 @@ func (r *Reconciler) validate(
95106
// If the updating stage index is len(updateRun.Status.StagesStatus), the next stage to be updated will be the delete stage.
96107
func (r *Reconciler) validateStagesStatus(
97108
ctx context.Context,
98-
scheduledBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding,
109+
scheduledBindingObjs, toBeDeletedBindingObjs []placementv1beta1.BindingObj,
99110
updateRun, updateRunCopy *placementv1beta1.ClusterStagedUpdateRun,
100111
) (int, error) {
101112
updateRunRef := klog.KObj(updateRun)
@@ -108,7 +119,7 @@ func (r *Reconciler) validateStagesStatus(
108119
klog.ErrorS(unexpectedErr, "Failed to find the updateStrategySnapshot in the clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
109120
return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
110121
}
111-
if err := r.computeRunStageStatus(ctx, scheduledBindings, updateRunCopy); err != nil {
122+
if err := r.computeRunStageStatus(ctx, scheduledBindingObjs, updateRunCopy); err != nil {
112123
return -1, err
113124
}
114125

@@ -124,7 +135,7 @@ func (r *Reconciler) validateStagesStatus(
124135
return -1, validateErr
125136
}
126137

127-
return validateDeleteStageStatus(updatingStageIndex, lastFinishedStageIndex, len(existingStageStatus), toBeDeletedBindings, updateRunCopy)
138+
return validateDeleteStageStatus(updatingStageIndex, lastFinishedStageIndex, len(existingStageStatus), toBeDeletedBindingObjs, updateRunCopy)
128139
}
129140

130141
// validateUpdateStagesStatus is a helper function to validate the updating stages in the clusterStagedUpdateRun.
@@ -256,7 +267,7 @@ func validateClusterUpdatingStatus(
256267
// It returns the updating stage index, or any error encountered.
257268
func validateDeleteStageStatus(
258269
updatingStageIndex, lastFinishedStageIndex, totalStages int,
259-
toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding,
270+
toBeDeletedBindingObjs []placementv1beta1.BindingObj,
260271
updateRun *placementv1beta1.ClusterStagedUpdateRun,
261272
) (int, error) {
262273
updateRunRef := klog.KObj(updateRun)
@@ -274,10 +285,11 @@ func validateDeleteStageStatus(
274285
for _, cluster := range existingDeleteStageStatus.Clusters {
275286
deletingClusterMap[cluster.ClusterName] = struct{}{}
276287
}
277-
for _, binding := range toBeDeletedBindings {
278-
if _, ok := deletingClusterMap[binding.Spec.TargetCluster]; !ok {
279-
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the cluster `%s` to be deleted is not in the delete stage", binding.Spec.TargetCluster))
280-
klog.ErrorS(unexpectedErr, "Detect new cluster to be unscheduled", "clusterResourceBinding", klog.KObj(binding), "clusterStagedUpdateRun", updateRunRef)
288+
for _, bindingObj := range toBeDeletedBindingObjs {
289+
bindingSpec := bindingObj.GetBindingSpec()
290+
if _, ok := deletingClusterMap[bindingSpec.TargetCluster]; !ok {
291+
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the cluster `%s` to be deleted is not in the delete stage", bindingSpec.TargetCluster))
292+
klog.ErrorS(unexpectedErr, "Detect new cluster to be unscheduled", "binding", klog.KObj(bindingObj), "clusterStagedUpdateRun", updateRunRef)
281293
return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
282294
}
283295
}

pkg/controllers/updaterun/validation_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func TestValidateDeleteStageStatus(t *testing.T) {
244244
name string
245245
updatingStageIndex int
246246
lastFinishedStageIndex int
247-
toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding
247+
toBeDeletedBindings []placementv1beta1.BindingObj
248248
deleteStageStatus *placementv1beta1.StageUpdatingStatus
249249
wantErr error
250250
wantUpdatingStageIndex int
@@ -257,12 +257,12 @@ func TestValidateDeleteStageStatus(t *testing.T) {
257257
},
258258
{
259259
name: "validateDeleteStageStatus should return error if there's new to-be-deleted bindings",
260-
toBeDeletedBindings: []*placementv1beta1.ClusterResourceBinding{
261-
{
260+
toBeDeletedBindings: []placementv1beta1.BindingObj{
261+
&placementv1beta1.ClusterResourceBinding{
262262
ObjectMeta: metav1.ObjectMeta{Name: "binding-1"},
263263
Spec: placementv1beta1.ResourceBindingSpec{TargetCluster: "cluster-1"},
264264
},
265-
{
265+
&placementv1beta1.ClusterResourceBinding{
266266
ObjectMeta: metav1.ObjectMeta{Name: "binding-2"},
267267
Spec: placementv1beta1.ResourceBindingSpec{TargetCluster: "cluster-2"},
268268
},
@@ -280,8 +280,8 @@ func TestValidateDeleteStageStatus(t *testing.T) {
280280
name: "validateDeleteStageStatus should not return error if there's fewer to-be-deleted bindings",
281281
updatingStageIndex: -1,
282282
lastFinishedStageIndex: -1,
283-
toBeDeletedBindings: []*placementv1beta1.ClusterResourceBinding{
284-
{
283+
toBeDeletedBindings: []placementv1beta1.BindingObj{
284+
&placementv1beta1.ClusterResourceBinding{
285285
ObjectMeta: metav1.ObjectMeta{Name: "binding-1"},
286286
Spec: placementv1beta1.ResourceBindingSpec{TargetCluster: "cluster-1"},
287287
},
@@ -300,8 +300,8 @@ func TestValidateDeleteStageStatus(t *testing.T) {
300300
name: "validateDeleteStageStatus should not return error if there are equal to-be-deleted bindings",
301301
updatingStageIndex: -1,
302302
lastFinishedStageIndex: -1,
303-
toBeDeletedBindings: []*placementv1beta1.ClusterResourceBinding{
304-
{
303+
toBeDeletedBindings: []placementv1beta1.BindingObj{
304+
&placementv1beta1.ClusterResourceBinding{
305305
ObjectMeta: metav1.ObjectMeta{Name: "binding-1"},
306306
Spec: placementv1beta1.ResourceBindingSpec{TargetCluster: "cluster-1"},
307307
},

pkg/scheduler/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
134134
}()
135135

136136
// Retrieve the placement object (either ClusterResourcePlacement or ResourcePlacement).
137-
placement, err := controller.FetchPlacementFromKey(ctx, s.client, placementKey)
137+
placement, err := controller.FetchPlacementFromQueueKey(ctx, s.client, placementKey)
138138
if err != nil {
139139
if apiErrors.IsNotFound(err) {
140140
// The placement has been gone before the scheduler gets a chance to

pkg/scheduler/watchers/placement/watcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
5555

5656
// Retrieve the placement object (either ClusterResourcePlacement or ResourcePlacement).
5757
placementKey := controller.GetObjectKeyFromRequest(req)
58-
placement, err := controller.FetchPlacementFromKey(ctx, r.Client, placementKey)
58+
placement, err := controller.FetchPlacementFromQueueKey(ctx, r.Client, placementKey)
5959
if err != nil {
6060
klog.ErrorS(err, "Failed to get placement", "placement", placementRef)
6161
return ctrl.Result{}, controller.NewAPIServerError(true, client.IgnoreNotFound(err))

0 commit comments

Comments
 (0)