Skip to content

Commit 0e9003a

Browse files
author
Ryan Zhang
committed
fix the code that UT catches
Signed-off-by: Ryan Zhang <[email protected]>
1 parent 87dddec commit 0e9003a

File tree

4 files changed

+131
-79
lines changed

4 files changed

+131
-79
lines changed

pkg/controllers/clusterresourceplacement/controller.go

Lines changed: 97 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -279,9 +279,8 @@ func (r *Reconciler) getOrCreateSchedulingPolicySnapshot(ctx context.Context, pl
279279
return nil, controller.NewUnexpectedBehaviorError(err)
280280
}
281281

282-
// Use the unified helper function to fetch the latest policy snapshot
283-
placementKey := types.NamespacedName{Name: placementObj.GetName(), Namespace: placementObj.GetNamespace()}
284-
latestPolicySnapshot, latestPolicySnapshotIndex, err := controller.FetchLatestPolicySnapshot(ctx, r.Client, placementKey)
282+
// latestPolicySnapshotIndex should be -1 when there is no snapshot.
283+
latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestSchedulingPolicySnapshot(ctx, placementObj)
285284
if err != nil {
286285
return nil, err
287286
}
@@ -338,8 +337,102 @@ func (r *Reconciler) getOrCreateSchedulingPolicySnapshot(ctx context.Context, pl
338337
return newPolicySnapshot, nil
339338
}
340339

340+
// lookupLatestSchedulingPolicySnapshot finds the latest snapshots and its policy index.
341+
// There will be only one active policy snapshot if exists.
342+
// It first checks whether there is an active policy snapshot.
343+
// If not, it finds the one whose policyIndex label is the largest.
344+
// The policy index will always start from 0.
345+
// Return error when 1) cannot list the snapshots 2) there are more than one active policy snapshots 3) snapshot has the
346+
// invalid label value.
347+
// 2 & 3 should never happen.
348+
func (r *Reconciler) lookupLatestSchedulingPolicySnapshot(ctx context.Context, placement fleetv1beta1.PlacementObj) (fleetv1beta1.PolicySnapshotObj, int, error) {
349+
placementKey := types.NamespacedName{Name: placement.GetName(), Namespace: placement.GetNamespace()}
350+
snapshotList, err := controller.FetchLatestPolicySnapshot(ctx, r.Client, placementKey)
351+
if err != nil {
352+
return nil, -1, err
353+
}
354+
placementKObj := klog.KObj(placement)
355+
if err != nil {
356+
klog.ErrorS(err, "Failed to list active schedulingPolicySnapshots", "placement", placementKObj)
357+
// Placement controller needs a scheduling policy snapshot watcher to enqueue the placement request.
358+
// So the snapshots should be read from cache.
359+
return nil, -1, controller.NewAPIServerError(true, err)
360+
}
361+
policySnapshotItems := snapshotList.GetPolicySnapshotObjs()
362+
if len(policySnapshotItems) == 1 {
363+
policyIndex, err := labels.ParsePolicyIndexFromLabel(policySnapshotItems[0])
364+
if err != nil {
365+
klog.ErrorS(err, "Failed to parse the policy index label", "placement", placementKObj, "policySnapshot", klog.KObj(policySnapshotItems[0]))
366+
return nil, -1, controller.NewUnexpectedBehaviorError(err)
367+
}
368+
return policySnapshotItems[0], policyIndex, nil
369+
} else if len(policySnapshotItems) > 1 {
370+
// It means there are multiple active snapshots and should never happen.
371+
err := fmt.Errorf("there are %d active schedulingPolicySnapshots owned by placement %v", len(policySnapshotItems), placementKey)
372+
klog.ErrorS(err, "Invalid schedulingPolicySnapshots", "placement", placementKObj)
373+
return nil, -1, controller.NewUnexpectedBehaviorError(err)
374+
}
375+
// When there are no active snapshots, find the one who has the largest policy index.
376+
// It should be rare only when placement controller is crashed before creating the new active snapshot.
377+
sortedList, err := r.listSortedSchedulingPolicySnapshots(ctx, placement)
378+
if err != nil {
379+
return nil, -1, err
380+
}
381+
382+
if len(sortedList.GetPolicySnapshotObjs()) == 0 {
383+
// The policy index of the first snapshot will start from 0.
384+
return nil, -1, nil
385+
}
386+
latestSnapshot := sortedList.GetPolicySnapshotObjs()[len(sortedList.GetPolicySnapshotObjs())-1]
387+
policyIndex, err := labels.ParsePolicyIndexFromLabel(latestSnapshot)
388+
if err != nil {
389+
klog.ErrorS(err, "Failed to parse the policy index label", "placement", placementKObj, "policySnapshot", klog.KObj(latestSnapshot))
390+
return nil, -1, controller.NewUnexpectedBehaviorError(err)
391+
}
392+
return latestSnapshot, policyIndex, nil
393+
}
394+
395+
// listSortedSchedulingPolicySnapshots returns the policy snapshots sorted by the policy index.
396+
// Now works with both cluster-scoped and namespaced policy snapshots using interface types.
397+
func (r *Reconciler) listSortedSchedulingPolicySnapshots(ctx context.Context, placementObj fleetv1beta1.PlacementObj) (fleetv1beta1.PolicySnapshotList, error) {
398+
placementKey := types.NamespacedName{
399+
Namespace: placementObj.GetNamespace(),
400+
Name: placementObj.GetName(),
401+
}
402+
403+
snapshotList, err := controller.ListPolicySnapshots(ctx, r.Client, placementKey)
404+
if err != nil {
405+
klog.ErrorS(err, "Failed to list all policySnapshots", "placement", klog.KObj(placementObj))
406+
// CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request.
407+
// So the snapshots should be read from cache.
408+
return nil, controller.NewAPIServerError(true, err)
409+
}
410+
411+
items := snapshotList.GetPolicySnapshotObjs()
412+
var errs []error
413+
sort.Slice(items, func(i, j int) bool {
414+
ii, err := labels.ParsePolicyIndexFromLabel(items[i])
415+
if err != nil {
416+
klog.ErrorS(err, "Failed to parse the policy index label", "placement", klog.KObj(placementObj), "policySnapshot", klog.KObj(items[i]))
417+
errs = append(errs, err)
418+
}
419+
ji, err := labels.ParsePolicyIndexFromLabel(items[j])
420+
if err != nil {
421+
klog.ErrorS(err, "Failed to parse the policy index label", "placement", klog.KObj(placementObj), "policySnapshot", klog.KObj(items[j]))
422+
errs = append(errs, err)
423+
}
424+
return ii < ji
425+
})
426+
427+
if len(errs) > 0 {
428+
return nil, controller.NewUnexpectedBehaviorError(utilerrors.NewAggregate(errs))
429+
}
430+
431+
return snapshotList, nil
432+
}
433+
341434
func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, placementObj fleetv1beta1.PlacementObj, revisionHistoryLimit int) error {
342-
sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, placementObj)
435+
sortedList, err := r.listSortedSchedulingPolicySnapshots(ctx, placementObj)
343436
if err != nil {
344437
return err
345438
}
@@ -756,45 +849,6 @@ func (r *Reconciler) ensureLatestResourceSnapshot(ctx context.Context, latest fl
756849
return nil
757850
}
758851

759-
// listSortedClusterSchedulingPolicySnapshots returns the policy snapshots sorted by the policy index.
760-
// Now works with both cluster-scoped and namespaced policy snapshots using interface types.
761-
func (r *Reconciler) listSortedClusterSchedulingPolicySnapshots(ctx context.Context, placementObj fleetv1beta1.PlacementObj) (fleetv1beta1.PolicySnapshotList, error) {
762-
placementKey := types.NamespacedName{
763-
Namespace: placementObj.GetNamespace(),
764-
Name: placementObj.GetName(),
765-
}
766-
767-
snapshotList, err := controller.ListPolicySnapshots(ctx, r.Client, placementKey)
768-
if err != nil {
769-
klog.ErrorS(err, "Failed to list all policySnapshots", "placement", klog.KObj(placementObj))
770-
// CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request.
771-
// So the snapshots should be read from cache.
772-
return nil, controller.NewAPIServerError(true, err)
773-
}
774-
775-
items := snapshotList.GetPolicySnapshotObjs()
776-
var errs []error
777-
sort.Slice(items, func(i, j int) bool {
778-
ii, err := labels.ParsePolicyIndexFromLabel(items[i])
779-
if err != nil {
780-
klog.ErrorS(err, "Failed to parse the policy index label", "placement", klog.KObj(placementObj), "policySnapshot", klog.KObj(items[i]))
781-
errs = append(errs, err)
782-
}
783-
ji, err := labels.ParsePolicyIndexFromLabel(items[j])
784-
if err != nil {
785-
klog.ErrorS(err, "Failed to parse the policy index label", "placement", klog.KObj(placementObj), "policySnapshot", klog.KObj(items[j]))
786-
errs = append(errs, err)
787-
}
788-
return ii < ji
789-
})
790-
791-
if len(errs) > 0 {
792-
return nil, controller.NewUnexpectedBehaviorError(utilerrors.NewAggregate(errs))
793-
}
794-
795-
return snapshotList, nil
796-
}
797-
798852
// lookupLatestResourceSnapshot finds the latest snapshots and.
799853
// There will be only one active resource snapshot if exists.
800854
// It first checks whether there is an active resource snapshot.

pkg/controllers/clusterresourceplacement/controller_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,9 @@ func clusterResourcePlacementForTest() *fleetv1beta1.ClusterResourcePlacement {
121121

122122
func TestGetOrCreateClusterSchedulingPolicySnapshot(t *testing.T) {
123123
testPolicy := placementPolicyForTest()
124-
testPolicy.NumberOfClusters = nil
125-
jsonBytes, err := json.Marshal(testPolicy)
124+
testPolicyHash := testPolicy.DeepCopy()
125+
testPolicyHash.NumberOfClusters = nil
126+
jsonBytes, err := json.Marshal(testPolicyHash)
126127
if err != nil {
127128
t.Fatalf("failed to create the policy hash: %v", err)
128129
}
@@ -749,10 +750,17 @@ func TestGetOrCreateClusterSchedulingPolicySnapshot(t *testing.T) {
749750
if err != nil {
750751
t.Fatalf("failed to getOrCreateClusterSchedulingPolicySnapshot: %v", err)
751752
}
753+
754+
// Convert interface to concrete type for comparison
755+
gotSnapshot, ok := got.(*fleetv1beta1.ClusterSchedulingPolicySnapshot)
756+
if !ok {
757+
t.Fatalf("expected *ClusterSchedulingPolicySnapshot, got %T", got)
758+
}
759+
752760
options := []cmp.Option{
753761
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"),
754762
}
755-
if diff := cmp.Diff(tc.wantPolicySnapshots[tc.wantLatestSnapshotIndex], got, options...); diff != "" {
763+
if diff := cmp.Diff(tc.wantPolicySnapshots[tc.wantLatestSnapshotIndex], *gotSnapshot, options...); diff != "" {
756764
t.Errorf("getOrCreateClusterSchedulingPolicySnapshot() mismatch (-want, +got):\n%s", diff)
757765
}
758766
clusterPolicySnapshotList := &fleetv1beta1.ClusterSchedulingPolicySnapshotList{}
@@ -2785,7 +2793,7 @@ func TestGetOrCreateClusterResourceSnapshot_failure(t *testing.T) {
27852793
},
27862794
{
27872795
// Should never hit this case unless there is a bug in the controller or customers manually modify the clusterResourceSnapshot.
2788-
name: "existing active policy snapshot does not have hash annotation",
2796+
name: "existing active resource snapshot does not have hash annotation",
27892797
resourceSnapshots: []fleetv1beta1.ClusterResourceSnapshot{
27902798
{
27912799
ObjectMeta: metav1.ObjectMeta{

pkg/utils/controller/policy_snapshot_resolver.go

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -119,41 +119,31 @@ func BuildPolicySnapshot(placementObj fleetv1beta1.PlacementObj, policySnapshotI
119119
// FetchLatestPolicySnapshot fetches the latest policy snapshot for a given placement.
120120
// For cluster-scoped placements, it fetches ClusterSchedulingPolicySnapshot.
121121
// For namespaced placements, it fetches SchedulingPolicySnapshot.
122-
func FetchLatestPolicySnapshot(ctx context.Context, k8Client client.Reader, placementKey types.NamespacedName) (fleetv1beta1.PolicySnapshotObj, int, error) {
123-
policySnapshotList, err := ListPolicySnapshots(ctx, k8Client, placementKey)
124-
if err != nil {
125-
return nil, -1, err
126-
}
122+
func FetchLatestPolicySnapshot(ctx context.Context, k8Client client.Reader, placementKey types.NamespacedName) (fleetv1beta1.PolicySnapshotList, error) {
123+
namespace := placementKey.Namespace
124+
name := placementKey.Name
127125

128-
items := policySnapshotList.GetPolicySnapshotObjs()
129-
if len(items) == 0 {
130-
klog.V(2).InfoS("No policySnapshots found for the placement", "placement", placementKey)
131-
return nil, -1, nil
132-
}
126+
var policySnapshotList fleetv1beta1.PolicySnapshotList
127+
var listOptions []client.ListOption
128+
listOptions = append(listOptions, client.MatchingLabels{
129+
fleetv1beta1.PlacementTrackingLabel: name,
130+
fleetv1beta1.IsLatestSnapshotLabel: strconv.FormatBool(true),
131+
})
133132

134-
// Find the latest policy snapshot by comparing policy index
135-
var latestPolicySnapshot fleetv1beta1.PolicySnapshotObj
136-
latestIndex := -1
137-
138-
for _, policySnapshot := range items {
139-
labels := policySnapshot.GetLabels()
140-
if indexStr, exists := labels[fleetv1beta1.PolicyIndexLabel]; exists {
141-
if index, err := strconv.Atoi(indexStr); err == nil {
142-
if index > latestIndex {
143-
latestIndex = index
144-
latestPolicySnapshot = policySnapshot
145-
}
146-
}
147-
}
133+
if namespace != "" {
134+
// This is a namespaced SchedulingPolicySnapshotList
135+
policySnapshotList = &fleetv1beta1.SchedulingPolicySnapshotList{}
136+
listOptions = append(listOptions, client.InNamespace(namespace))
137+
} else {
138+
// This is a cluster-scoped ClusterSchedulingPolicySnapshotList
139+
policySnapshotList = &fleetv1beta1.ClusterSchedulingPolicySnapshotList{}
148140
}
149141

150-
if latestPolicySnapshot == nil {
151-
klog.V(2).InfoS("No valid policySnapshot found for the placement", "placement", placementKey)
152-
return nil, -1, nil
142+
if err := k8Client.List(ctx, policySnapshotList, listOptions...); err != nil {
143+
klog.ErrorS(err, "Failed to list the policySnapshots associated with the placement", "placement", placementKey)
144+
return nil, err
153145
}
154-
155-
klog.V(2).InfoS("Found the latest policySnapshot", "placement", placementKey, "policySnapshot", klog.KObj(latestPolicySnapshot))
156-
return latestPolicySnapshot, latestIndex, nil
146+
return policySnapshotList, nil
157147
}
158148

159149
// ListPolicySnapshots lists all policy snapshots associated with a placement key.

pkg/utils/controller/resource_snapshot_resolver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ func FetchLatestMasterResourceSnapshot(ctx context.Context, k8Client client.Read
5151
break
5252
}
5353
}
54-
// It is possible that no master resourceSnapshot is found, e.g., when the new resourceSnapshot is created but not yet marked as the latest.
54+
// The master is always the first to be created in the resourcegroup so it should be found.
5555
if masterResourceSnapshot == nil {
56-
return nil, fmt.Errorf("no masterResourceSnapshot found for the placement %v", placementKey)
56+
return nil, NewUnexpectedBehaviorError(fmt.Errorf("no masterResourceSnapshot found for the placement %v", placementKey))
5757
}
5858
klog.V(2).InfoS("Found the latest associated masterResourceSnapshot", "placement", placementKey, "masterResourceSnapshot", klog.KObj(masterResourceSnapshot))
5959
return masterResourceSnapshot, nil

0 commit comments

Comments
 (0)