diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go index 8a582f561..173af0701 100644 --- a/cmd/hubagent/workload/setup.go +++ b/cmd/hubagent/workload/setup.go @@ -201,7 +201,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, UncachedReader: mgr.GetAPIReader(), MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported)/30) * math.Ceil(float64(opts.MaxConcurrentClusterPlacement)/10)), InformerManager: dynamicInformerManager, - }).SetupWithManager(mgr); err != nil { + }).SetupWithManager(ctx, mgr); err != nil { klog.ErrorS(err, "Unable to set up rollout controller") return err } diff --git a/pkg/controllers/rollout/controller.go b/pkg/controllers/rollout/controller.go index d2b1674d0..5c3fb0468 100644 --- a/pkg/controllers/rollout/controller.go +++ b/pkg/controllers/rollout/controller.go @@ -620,7 +620,7 @@ func (r *Reconciler) updateBindings(ctx context.Context, bindings []toBeUpdatedB // SetupWithManager sets up the rollout controller with the Manager. // The rollout controller watches resource snapshots and resource bindings. // It reconciles on the CRP when a new resource resourceBinding is created or an existing resource binding is created/updated. -func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error { +func (r *Reconciler) SetupWithManager(ctx context.Context, mgr runtime.Manager) error { r.recorder = mgr.GetEventRecorderFor("rollout-controller") return runtime.NewControllerManagedBy(mgr).Named("rollout-controller"). WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles @@ -648,9 +648,122 @@ func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error { handleResourceBinding(e.Object, q) }, }, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches(&fleetv1alpha1.ClusterResourceOverrideSnapshot{}, handler.Funcs{ + CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + klog.V(2).InfoS("Handling a clusterResourceOverrideSnapshot create event", "clusterResourceOverrideSnapshot", klog.KObj(e.Object)) + r.handleClusterResourceOverrideSnapshot(ctx, e.Object, q) + }, + GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { + klog.V(2).InfoS("Handling a clusterResourceOverrideSnapshot generic event", "clusterResourceOverrideSnapshot", klog.KObj(e.Object)) + r.handleClusterResourceOverrideSnapshot(ctx, e.Object, q) + }, + }). + Watches(&fleetv1alpha1.ResourceOverrideSnapshot{}, handler.Funcs{ + CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + klog.V(2).InfoS("Handling a resourceOverrideSnapshot create event", "resourceOverrideSnapshot", klog.KObj(e.Object)) + r.handleResourceOverrideSnapshot(ctx, e.Object, q) + }, + GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { + klog.V(2).InfoS("Handling a resourceOverrideSnapshot generic event", "resourceOverrideSnapshot", klog.KObj(e.Object)) + r.handleResourceOverrideSnapshot(ctx, e.Object, q) + }, + }). Complete(r) } +// findMatchedPlacementForOverrides lists all the latest master clusterResourceSnapshots and finds the matched placements. +func (r *Reconciler) findMatchedPlacementsForOverrides(ctx context.Context, cro *fleetv1alpha1.ClusterResourceOverrideSnapshot, ro *fleetv1alpha1.ResourceOverrideSnapshot, q workqueue.RateLimitingInterface) { + latestResourceLabelMatcher := client.MatchingLabels{ + fleetv1beta1.IsLatestSnapshotLabel: "true", + } + resourceSnapshotList := &fleetv1beta1.ClusterResourceSnapshotList{} + if err := r.Client.List(ctx, resourceSnapshotList, latestResourceLabelMatcher); err != nil { + klog.ErrorS(controller.NewAPIServerError(true, err), "Failed to list the latest clusterResourceSnapshot associated with the clusterResourcePlacement") + return + } + // try to find the master clusterResourceSnapshot. + for _, resourceSnapshot := range resourceSnapshotList.Items { + clusterResourceSnapshotKObj := klog.KObj(&resourceSnapshot) + // only master has this annotation + if len(resourceSnapshot.Annotations[fleetv1beta1.ResourceGroupHashAnnotation]) == 0 { + err := fmt.Errorf("find a master clusterResourceSnapshot %s without the resourceGroupHash annotation", resourceSnapshot.Name) + klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "The master resource snapshot is missing the resourceGroupHash annotation", "clusterResourceSnapshot", clusterResourceSnapshotKObj) + continue + } + crpName := resourceSnapshot.GetLabels()[fleetv1beta1.CRPTrackingLabel] + if len(crpName) == 0 { + err := fmt.Errorf("find a master clusterResourceSnapshot %s without the CRPTrackingLabel label", resourceSnapshot.Name) + klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "The master resource snapshot is missing the CRPTrackingLabel label", "clusterResourceSnapshot", clusterResourceSnapshotKObj) + continue + } + var croList []fleetv1alpha1.ClusterResourceOverrideSnapshot + var roList []fleetv1alpha1.ResourceOverrideSnapshot + if cro != nil { + croList = append(croList, *cro) + } + if ro != nil { + roList = append(roList, *ro) + } + filteredCRO, filteredRO, err := r.findMatchedOverrides(ctx, crpName, &resourceSnapshot, croList, roList) + if err != nil { + klog.ErrorS(err, "Failed to find the matched overrides for the master clusterResourceSnapshot", "clusterResourcePlacement", crpName, "clusterResourceSnapshot", clusterResourceSnapshotKObj) + return + } + if len(filteredCRO) != 0 || len(filteredRO) != 0 { + klog.V(2).InfoS("Found the matched master clusterResourceSnapshot", "clusterResourcePlacement", crpName, "clusterResourceSnapshot", clusterResourceSnapshotKObj, "matchedCRO", len(filteredCRO) != 0, "matchedRO", len(filteredRO) != 0) + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{Name: crpName}, + }) + } + } +} + +func (r *Reconciler) handleClusterResourceOverrideSnapshot(ctx context.Context, o client.Object, q workqueue.RateLimitingInterface) { + snapshot, ok := o.(*fleetv1alpha1.ClusterResourceOverrideSnapshot) + if !ok { + klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("expected a clusterResourceOverrideSnapshot, got a %T", o)), "Invalid object type", "object", klog.KObj(o)) + return + } + snapshotKRef := klog.KObj(snapshot) + isLatest, err := strconv.ParseBool(snapshot.GetLabels()[fleetv1beta1.IsLatestSnapshotLabel]) + if err != nil { + klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid label value %s : %w", fleetv1beta1.IsLatestSnapshotLabel, err)), + "Resource clusterResourceOverrideSnapshot has does not have a valid islatest label", "clusterResourceOverrideSnapshot", snapshotKRef) + return + } + if !isLatest { + // All newly created resource snapshots should start with the latest label to be true. + // However, this can happen if the label is removed fast by the time this reconcile loop is triggered. + klog.V(2).InfoS("Newly created resource clusterResourceOverrideSnapshot %s is not the latest", "clusterResourceOverrideSnapshot", snapshotKRef) + return + } + klog.V(2).InfoS("Finding matched placements for clusterResourceOverrideSnapshot", "clusterResourceOverrideSnapshot", snapshotKRef) + r.findMatchedPlacementsForOverrides(ctx, snapshot, nil, q) +} + +func (r *Reconciler) handleResourceOverrideSnapshot(ctx context.Context, o client.Object, q workqueue.RateLimitingInterface) { + snapshot, ok := o.(*fleetv1alpha1.ResourceOverrideSnapshot) + if !ok { + klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("expected a resourceOverrideSnapshot, got a %T", o)), "Invalid object type", "object", klog.KObj(o)) + return + } + snapshotKRef := klog.KObj(snapshot) + isLatest, err := strconv.ParseBool(snapshot.GetLabels()[fleetv1beta1.IsLatestSnapshotLabel]) + if err != nil { + klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid label value %s : %w", fleetv1beta1.IsLatestSnapshotLabel, err)), + "Resource resourceOverrideSnapshot has does not have a valid islatest label", "clusterResourceOverrideSnapshot", snapshotKRef) + return + } + if !isLatest { + // All newly created resource snapshots should start with the latest label to be true. + // However, this can happen if the label is removed fast by the time this reconcile loop is triggered. + klog.V(2).InfoS("Newly created resource resourceOverrideSnapshot %s is not the latest", "resourceOverrideSnapshot", snapshotKRef) + return + } + klog.V(2).InfoS("Finding matched placements for resourceOverrideSnapshot", "resourceOverrideSnapshot", snapshotKRef) + r.findMatchedPlacementsForOverrides(ctx, nil, snapshot, q) +} + // handleResourceSnapshot parse the resourceBinding label and annotation and enqueue the CRP name associated with the resource resourceBinding func handleResourceSnapshot(snapshot client.Object, q workqueue.RateLimitingInterface) { snapshotKRef := klog.KObj(snapshot) @@ -664,8 +777,8 @@ func handleResourceSnapshot(snapshot client.Object, q workqueue.RateLimitingInte // check if it is the latest resource resourceBinding isLatest, err := strconv.ParseBool(snapshot.GetLabels()[fleetv1beta1.IsLatestSnapshotLabel]) if err != nil { - klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid annotation value %s : %w", fleetv1beta1.IsLatestSnapshotLabel, err)), - "Resource resourceBinding has does not have a valid islatest annotation", "clusterResourceSnapshot", snapshotKRef) + klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid label value %s : %w", fleetv1beta1.IsLatestSnapshotLabel, err)), + "Resource resourceBinding has does not have a valid islatest label", "clusterResourceSnapshot", snapshotKRef) return } if !isLatest { diff --git a/pkg/controllers/rollout/override.go b/pkg/controllers/rollout/override.go index fed2af39c..55945890b 100644 --- a/pkg/controllers/rollout/override.go +++ b/pkg/controllers/rollout/override.go @@ -45,11 +45,20 @@ func (r *Reconciler) fetchAllMatchingOverridesForResourceSnapshot(ctx context.Co if len(croList.Items) == 0 && len(roList.Items) == 0 { return nil, nil, nil // no overrides and nothing to do } + return r.findMatchedOverrides(ctx, crp, masterResourceSnapshot, croList.Items, roList.Items) +} + +func (r *Reconciler) findMatchedOverrides(ctx context.Context, crp string, masterResourceSnapshot *placementv1beta1.ClusterResourceSnapshot, croList []placementv1alpha1.ClusterResourceOverrideSnapshot, roList []placementv1alpha1.ResourceOverrideSnapshot) ([]*placementv1alpha1.ClusterResourceOverrideSnapshot, []*placementv1alpha1.ResourceOverrideSnapshot, error) { + if len(croList) == 0 && len(roList) == 0 { + klog.V(2).InfoS("Empty overrides", "clusterResourcePlacement", crp) + return nil, nil, nil + } resourceSnapshots, err := controller.FetchAllClusterResourceSnapshots(ctx, r.Client, crp, masterResourceSnapshot) if err != nil { return nil, nil, err } + klog.V(2).InfoS("Found resourceSnapshots", "clusterResourcePlacement", crp, "resourceSnapshotCount", len(resourceSnapshots)) possibleCROs := make(map[placementv1beta1.ResourceIdentifier]bool) possibleROs := make(map[placementv1beta1.ResourceIdentifier]bool) @@ -91,10 +100,10 @@ func (r *Reconciler) fetchAllMatchingOverridesForResourceSnapshot(ctx context.Co } } - filteredCRO := make([]*placementv1alpha1.ClusterResourceOverrideSnapshot, 0, len(croList.Items)) - filteredRO := make([]*placementv1alpha1.ResourceOverrideSnapshot, 0, len(roList.Items)) - for i := range croList.Items { - for _, selector := range croList.Items[i].Spec.OverrideSpec.ClusterResourceSelectors { + filteredCRO := make([]*placementv1alpha1.ClusterResourceOverrideSnapshot, 0, len(croList)) + filteredRO := make([]*placementv1alpha1.ResourceOverrideSnapshot, 0, len(roList)) + for i := range croList { + for _, selector := range croList[i].Spec.OverrideSpec.ClusterResourceSelectors { croKey := placementv1beta1.ResourceIdentifier{ Group: selector.Group, Version: selector.Version, @@ -102,26 +111,27 @@ func (r *Reconciler) fetchAllMatchingOverridesForResourceSnapshot(ctx context.Co Name: selector.Name, } if possibleCROs[croKey] { - filteredCRO = append(filteredCRO, &croList.Items[i]) + filteredCRO = append(filteredCRO, &croList[i]) break } } } - for i := range roList.Items { - for _, selector := range roList.Items[i].Spec.OverrideSpec.ResourceSelectors { + for i := range roList { + for _, selector := range roList[i].Spec.OverrideSpec.ResourceSelectors { roKey := placementv1beta1.ResourceIdentifier{ Group: selector.Group, Version: selector.Version, Kind: selector.Kind, - Namespace: roList.Items[i].Namespace, + Namespace: roList[i].Namespace, Name: selector.Name, } if possibleROs[roKey] { - filteredRO = append(filteredRO, &roList.Items[i]) + filteredRO = append(filteredRO, &roList[i]) break } } } + klog.V(2).InfoS("Found matched overrides", "clusterResourcePlacement", crp, "matchedCROCount", len(filteredCRO), "matchedROCount", len(filteredRO)) return filteredCRO, filteredRO, nil } diff --git a/pkg/controllers/rollout/suite_test.go b/pkg/controllers/rollout/suite_test.go index f8b2fdcfe..13068ba92 100644 --- a/pkg/controllers/rollout/suite_test.go +++ b/pkg/controllers/rollout/suite_test.go @@ -97,7 +97,7 @@ var _ = BeforeSuite(func() { err = (&Reconciler{ Client: k8sClient, UncachedReader: mgr.GetAPIReader(), - }).SetupWithManager(mgr) + }).SetupWithManager(ctx, mgr) Expect(err).Should(Succeed()) go func() {