Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
UncachedReader: mgr.GetAPIReader(),
MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported)/30) * math.Ceil(float64(opts.MaxConcurrentClusterPlacement)/10)),
InformerManager: dynamicInformerManager,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(ctx, mgr); err != nil {
klog.ErrorS(err, "Unable to set up rollout controller")
return err
}
Expand Down
119 changes: 116 additions & 3 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@
// SetupWithManager sets up the rollout controller with the Manager.
// The rollout controller watches resource snapshots and resource bindings.
// It reconciles on the CRP when a new resource resourceBinding is created or an existing resource binding is created/updated.
func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
func (r *Reconciler) SetupWithManager(ctx context.Context, mgr runtime.Manager) error {

Check failure on line 623 in pkg/controllers/rollout/controller.go

View workflow job for this annotation

GitHub Actions / Lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
r.recorder = mgr.GetEventRecorderFor("rollout-controller")
return runtime.NewControllerManagedBy(mgr).Named("rollout-controller").
WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles
Expand Down Expand Up @@ -648,9 +648,122 @@
handleResourceBinding(e.Object, q)
},
}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&fleetv1alpha1.ClusterResourceOverrideSnapshot{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a clusterResourceOverrideSnapshot create event", "clusterResourceOverrideSnapshot", klog.KObj(e.Object))
r.handleClusterResourceOverrideSnapshot(ctx, e.Object, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a clusterResourceOverrideSnapshot generic event", "clusterResourceOverrideSnapshot", klog.KObj(e.Object))
r.handleClusterResourceOverrideSnapshot(ctx, e.Object, q)
},
}).
Watches(&fleetv1alpha1.ResourceOverrideSnapshot{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceOverrideSnapshot create event", "resourceOverrideSnapshot", klog.KObj(e.Object))
r.handleResourceOverrideSnapshot(ctx, e.Object, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceOverrideSnapshot generic event", "resourceOverrideSnapshot", klog.KObj(e.Object))
r.handleResourceOverrideSnapshot(ctx, e.Object, q)
},
}).
Complete(r)
}

// findMatchedPlacementForOverrides lists all the latest master clusterResourceSnapshots and finds the matched placements.
func (r *Reconciler) findMatchedPlacementsForOverrides(ctx context.Context, cro *fleetv1alpha1.ClusterResourceOverrideSnapshot, ro *fleetv1alpha1.ResourceOverrideSnapshot, q workqueue.RateLimitingInterface) {
latestResourceLabelMatcher := client.MatchingLabels{
fleetv1beta1.IsLatestSnapshotLabel: "true",
}
resourceSnapshotList := &fleetv1beta1.ClusterResourceSnapshotList{}
if err := r.Client.List(ctx, resourceSnapshotList, latestResourceLabelMatcher); err != nil {
klog.ErrorS(controller.NewAPIServerError(true, err), "Failed to list the latest clusterResourceSnapshot associated with the clusterResourcePlacement")
return
}
// try to find the master clusterResourceSnapshot.
for _, resourceSnapshot := range resourceSnapshotList.Items {
clusterResourceSnapshotKObj := klog.KObj(&resourceSnapshot)

Check failure on line 686 in pkg/controllers/rollout/controller.go

View workflow job for this annotation

GitHub Actions / Lint

G601: Implicit memory aliasing in for loop. (gosec)
// only master has this annotation
if len(resourceSnapshot.Annotations[fleetv1beta1.ResourceGroupHashAnnotation]) == 0 {
err := fmt.Errorf("find a master clusterResourceSnapshot %s without the resourceGroupHash annotation", resourceSnapshot.Name)
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "The master resource snapshot is missing the resourceGroupHash annotation", "clusterResourceSnapshot", clusterResourceSnapshotKObj)
continue
}
crpName := resourceSnapshot.GetLabels()[fleetv1beta1.CRPTrackingLabel]
if len(crpName) == 0 {
err := fmt.Errorf("find a master clusterResourceSnapshot %s without the CRPTrackingLabel label", resourceSnapshot.Name)
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "The master resource snapshot is missing the CRPTrackingLabel label", "clusterResourceSnapshot", clusterResourceSnapshotKObj)
continue
}
var croList []fleetv1alpha1.ClusterResourceOverrideSnapshot
var roList []fleetv1alpha1.ResourceOverrideSnapshot
if cro != nil {
croList = append(croList, *cro)
}
if ro != nil {
roList = append(roList, *ro)
}
filteredCRO, filteredRO, err := r.findMatchedOverrides(ctx, crpName, &resourceSnapshot, croList, roList)

Check failure on line 707 in pkg/controllers/rollout/controller.go

View workflow job for this annotation

GitHub Actions / Lint

G601: Implicit memory aliasing in for loop. (gosec)
if err != nil {
klog.ErrorS(err, "Failed to find the matched overrides for the master clusterResourceSnapshot", "clusterResourcePlacement", crpName, "clusterResourceSnapshot", clusterResourceSnapshotKObj)
return
}
if len(filteredCRO) != 0 || len(filteredRO) != 0 {
klog.V(2).InfoS("Found the matched master clusterResourceSnapshot", "clusterResourcePlacement", crpName, "clusterResourceSnapshot", clusterResourceSnapshotKObj, "matchedCRO", len(filteredCRO) != 0, "matchedRO", len(filteredRO) != 0)
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{Name: crpName},
})
}
}
}

func (r *Reconciler) handleClusterResourceOverrideSnapshot(ctx context.Context, o client.Object, q workqueue.RateLimitingInterface) {
snapshot, ok := o.(*fleetv1alpha1.ClusterResourceOverrideSnapshot)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("expected a clusterResourceOverrideSnapshot, got a %T", o)), "Invalid object type", "object", klog.KObj(o))
return
}
snapshotKRef := klog.KObj(snapshot)
isLatest, err := strconv.ParseBool(snapshot.GetLabels()[fleetv1beta1.IsLatestSnapshotLabel])
if err != nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid label value %s : %w", fleetv1beta1.IsLatestSnapshotLabel, err)),
"Resource clusterResourceOverrideSnapshot has does not have a valid islatest label", "clusterResourceOverrideSnapshot", snapshotKRef)
return
}
if !isLatest {
// All newly created resource snapshots should start with the latest label to be true.
// However, this can happen if the label is removed fast by the time this reconcile loop is triggered.
klog.V(2).InfoS("Newly created resource clusterResourceOverrideSnapshot %s is not the latest", "clusterResourceOverrideSnapshot", snapshotKRef)
return
}
klog.V(2).InfoS("Finding matched placements for clusterResourceOverrideSnapshot", "clusterResourceOverrideSnapshot", snapshotKRef)
r.findMatchedPlacementsForOverrides(ctx, snapshot, nil, q)
}

func (r *Reconciler) handleResourceOverrideSnapshot(ctx context.Context, o client.Object, q workqueue.RateLimitingInterface) {
snapshot, ok := o.(*fleetv1alpha1.ResourceOverrideSnapshot)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("expected a resourceOverrideSnapshot, got a %T", o)), "Invalid object type", "object", klog.KObj(o))
return
}
snapshotKRef := klog.KObj(snapshot)
isLatest, err := strconv.ParseBool(snapshot.GetLabels()[fleetv1beta1.IsLatestSnapshotLabel])
if err != nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid label value %s : %w", fleetv1beta1.IsLatestSnapshotLabel, err)),
"Resource resourceOverrideSnapshot has does not have a valid islatest label", "clusterResourceOverrideSnapshot", snapshotKRef)
return
}
if !isLatest {
// All newly created resource snapshots should start with the latest label to be true.
// However, this can happen if the label is removed fast by the time this reconcile loop is triggered.
klog.V(2).InfoS("Newly created resource resourceOverrideSnapshot %s is not the latest", "resourceOverrideSnapshot", snapshotKRef)
return
}
klog.V(2).InfoS("Finding matched placements for resourceOverrideSnapshot", "resourceOverrideSnapshot", snapshotKRef)
r.findMatchedPlacementsForOverrides(ctx, nil, snapshot, q)
}

// handleResourceSnapshot parse the resourceBinding label and annotation and enqueue the CRP name associated with the resource resourceBinding
func handleResourceSnapshot(snapshot client.Object, q workqueue.RateLimitingInterface) {
snapshotKRef := klog.KObj(snapshot)
Expand All @@ -664,8 +777,8 @@
// check if it is the latest resource resourceBinding
isLatest, err := strconv.ParseBool(snapshot.GetLabels()[fleetv1beta1.IsLatestSnapshotLabel])
if err != nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid annotation value %s : %w", fleetv1beta1.IsLatestSnapshotLabel, err)),
"Resource resourceBinding has does not have a valid islatest annotation", "clusterResourceSnapshot", snapshotKRef)
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid label value %s : %w", fleetv1beta1.IsLatestSnapshotLabel, err)),
"Resource resourceBinding has does not have a valid islatest label", "clusterResourceSnapshot", snapshotKRef)
return
}
if !isLatest {
Expand Down
28 changes: 19 additions & 9 deletions pkg/controllers/rollout/override.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,20 @@ func (r *Reconciler) fetchAllMatchingOverridesForResourceSnapshot(ctx context.Co
if len(croList.Items) == 0 && len(roList.Items) == 0 {
return nil, nil, nil // no overrides and nothing to do
}
return r.findMatchedOverrides(ctx, crp, masterResourceSnapshot, croList.Items, roList.Items)
}

func (r *Reconciler) findMatchedOverrides(ctx context.Context, crp string, masterResourceSnapshot *placementv1beta1.ClusterResourceSnapshot, croList []placementv1alpha1.ClusterResourceOverrideSnapshot, roList []placementv1alpha1.ResourceOverrideSnapshot) ([]*placementv1alpha1.ClusterResourceOverrideSnapshot, []*placementv1alpha1.ResourceOverrideSnapshot, error) {
if len(croList) == 0 && len(roList) == 0 {
klog.V(2).InfoS("Empty overrides", "clusterResourcePlacement", crp)
return nil, nil, nil
}

resourceSnapshots, err := controller.FetchAllClusterResourceSnapshots(ctx, r.Client, crp, masterResourceSnapshot)
if err != nil {
return nil, nil, err
}
klog.V(2).InfoS("Found resourceSnapshots", "clusterResourcePlacement", crp, "resourceSnapshotCount", len(resourceSnapshots))

possibleCROs := make(map[placementv1beta1.ResourceIdentifier]bool)
possibleROs := make(map[placementv1beta1.ResourceIdentifier]bool)
Expand Down Expand Up @@ -91,37 +100,38 @@ func (r *Reconciler) fetchAllMatchingOverridesForResourceSnapshot(ctx context.Co
}
}

filteredCRO := make([]*placementv1alpha1.ClusterResourceOverrideSnapshot, 0, len(croList.Items))
filteredRO := make([]*placementv1alpha1.ResourceOverrideSnapshot, 0, len(roList.Items))
for i := range croList.Items {
for _, selector := range croList.Items[i].Spec.OverrideSpec.ClusterResourceSelectors {
filteredCRO := make([]*placementv1alpha1.ClusterResourceOverrideSnapshot, 0, len(croList))
filteredRO := make([]*placementv1alpha1.ResourceOverrideSnapshot, 0, len(roList))
for i := range croList {
for _, selector := range croList[i].Spec.OverrideSpec.ClusterResourceSelectors {
croKey := placementv1beta1.ResourceIdentifier{
Group: selector.Group,
Version: selector.Version,
Kind: selector.Kind,
Name: selector.Name,
}
if possibleCROs[croKey] {
filteredCRO = append(filteredCRO, &croList.Items[i])
filteredCRO = append(filteredCRO, &croList[i])
break
}
}
}
for i := range roList.Items {
for _, selector := range roList.Items[i].Spec.OverrideSpec.ResourceSelectors {
for i := range roList {
for _, selector := range roList[i].Spec.OverrideSpec.ResourceSelectors {
roKey := placementv1beta1.ResourceIdentifier{
Group: selector.Group,
Version: selector.Version,
Kind: selector.Kind,
Namespace: roList.Items[i].Namespace,
Namespace: roList[i].Namespace,
Name: selector.Name,
}
if possibleROs[roKey] {
filteredRO = append(filteredRO, &roList.Items[i])
filteredRO = append(filteredRO, &roList[i])
break
}
}
}
klog.V(2).InfoS("Found matched overrides", "clusterResourcePlacement", crp, "matchedCROCount", len(filteredCRO), "matchedROCount", len(filteredRO))
return filteredCRO, filteredRO, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/rollout/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var _ = BeforeSuite(func() {
err = (&Reconciler{
Client: k8sClient,
UncachedReader: mgr.GetAPIReader(),
}).SetupWithManager(mgr)
}).SetupWithManager(ctx, mgr)
Expect(err).Should(Succeed())

go func() {
Expand Down
Loading