Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 116 additions & 5 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
allBindings = append(allBindings, binding.DeepCopy())
}

// Process apply strategy updates (if any). This runs independently of the rollout process.
//
// Apply strategy changes will be immediately applied to all bindings that have not been
// marked for deletion yet. Note that even unscheduled bindings will receive this update;
// as apply strategy changes might have an effect on its Applied and Available status, and
// consequently on the rollout progress.
if err := r.processApplyStrategyUpdates(ctx, &crp, allBindings); err != nil {
klog.ErrorS(err, "Failed to process apply strategy updates", "clusterResourcePlacement", crpName)
return runtime.Result{}, err
}

// handle the case that a cluster was unselected by the scheduler and then selected again but the unselected binding is not completely deleted yet
wait, err := waitForResourcesToCleanUp(allBindings, &crp)
if err != nil {
Expand Down Expand Up @@ -276,13 +287,14 @@ type toBeUpdatedBinding struct {
desiredBinding *fleetv1beta1.ClusterResourceBinding // only valid for scheduled or bound binding
}

func createUpdateInfo(binding *fleetv1beta1.ClusterResourceBinding, crp *fleetv1beta1.ClusterResourcePlacement,
func createUpdateInfo(binding *fleetv1beta1.ClusterResourceBinding,
latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, cro []string, ro []fleetv1beta1.NamespacedName) toBeUpdatedBinding {
desiredBinding := binding.DeepCopy()
desiredBinding.Spec.State = fleetv1beta1.BindingStateBound
desiredBinding.Spec.ResourceSnapshotName = latestResourceSnapshot.Name
// update the resource apply strategy when controller rolls out the new changes
desiredBinding.Spec.ApplyStrategy = crp.Spec.Strategy.ApplyStrategy

// Apply strategy is updated separately for all bindings.

// TODO: check the size of the cro and ro to not exceed the limit
desiredBinding.Spec.ClusterResourceOverrideSnapshots = cro
desiredBinding.Spec.ResourceOverrideSnapshots = ro
Expand Down Expand Up @@ -377,7 +389,7 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
if err != nil {
return nil, nil, false, minWaitTime, err
}
boundingCandidates = append(boundingCandidates, createUpdateInfo(binding, crp, latestResourceSnapshot, cro, ro))
boundingCandidates = append(boundingCandidates, createUpdateInfo(binding, latestResourceSnapshot, cro, ro))
case fleetv1beta1.BindingStateBound:
bindingFailed := false
schedulerTargetedBinds = append(schedulerTargetedBinds, binding)
Expand Down Expand Up @@ -408,7 +420,7 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
}
// The binding needs update if it's not pointing to the latest resource resourceBinding or the overrides.
if binding.Spec.ResourceSnapshotName != latestResourceSnapshot.Name || !equality.Semantic.DeepEqual(binding.Spec.ClusterResourceOverrideSnapshots, cro) || !equality.Semantic.DeepEqual(binding.Spec.ResourceOverrideSnapshots, ro) {
updateInfo := createUpdateInfo(binding, crp, latestResourceSnapshot, cro, ro)
updateInfo := createUpdateInfo(binding, latestResourceSnapshot, cro, ro)
if bindingFailed {
// the binding has been applied but failed to apply, we can safely update it to latest resources without affecting max unavailable count
applyFailedUpdateCandidates = append(applyFailedUpdateCandidates, updateInfo)
Expand Down Expand Up @@ -668,6 +680,15 @@ func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
handleResourceBinding(e.Object, q)
},
}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
// Aside from ClusterResourceSnapshot and ClusterResourceBinding objects, the rollout
// controller also watches ClusterResourcePlacement objects, so that it can push apply
// strategy updates to all bindings right away.
Watches(&fleetv1beta1.ClusterResourcePlacement{}, handler.Funcs{
// Ignore all Create, Delete, and Generic events; these do not concern the rollout controller.
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
handleCRP(e.ObjectNew, e.ObjectOld, q)
},
}).
Complete(r)
}

Expand Down Expand Up @@ -839,3 +860,93 @@ func (r *Reconciler) updateBindingStatus(ctx context.Context, binding *fleetv1be
klog.V(2).InfoS("Updated the status of a binding", "clusterResourceBinding", klog.KObj(binding), "condition", cond)
return nil
}

// processApplyStrategyUpdates processes apply strategy updates on the CRP end; specifically
// it will push the update to all applicable bindings.
func (r *Reconciler) processApplyStrategyUpdates(
ctx context.Context,
crp *fleetv1beta1.ClusterResourcePlacement,
allBindings []*fleetv1beta1.ClusterResourceBinding,
) error {
applyStrategy := crp.Spec.Strategy.ApplyStrategy
if applyStrategy == nil {
// Initialize the apply strategy with default values; normally this would not happen
// as default values have been set up in the definitions.
//
// Note (chenyu1): at this moment, due to the fact that Fleet offers both v1 and v1beta1
// APIs at the same time with Kubernetes favoring the v1 API by default, should the
// user chooses to use the v1 API, default values for v1beta1 exclusive fields
// might not be handled correctly, hence the default value resetting logic added here.
applyStrategy = &fleetv1beta1.ApplyStrategy{}
defaulter.SetDefaultsApplyStrategy(applyStrategy)
}

errs, childCtx := errgroup.WithContext(ctx)
for idx := range allBindings {
binding := allBindings[idx]
if !binding.DeletionTimestamp.IsZero() {
// The binding has been marked for deletion; no need to push the apply strategy
// update there.
continue
}

// Verify if the binding has the latest apply strategy set.
if equality.Semantic.DeepEqual(binding.Spec.ApplyStrategy, applyStrategy) {
// The binding already has the latest apply strategy set; no need to push the update.
klog.V(2).InfoS("The binding already has the latest apply strategy; skip the apply strategy update", "clusterResourceBinding", klog.KObj(binding))
continue
}

// Push the new apply strategy to the binding.
//
// The ApplyStrategy field on binding objects are managed exclusively by the rollout
// controller; to avoid unnecessary conflicts, Fleet will patch the field directly.
updatedBinding := binding.DeepCopy()
updatedBinding.Spec.ApplyStrategy = applyStrategy

errs.Go(func() error {
if err := r.Client.Patch(childCtx, updatedBinding, client.MergeFrom(binding)); err != nil {
klog.ErrorS(err, "Failed to update binding with new apply strategy", "clusterResourceBinding", klog.KObj(binding))
return controller.NewAPIServerError(false, err)
}
klog.V(2).InfoS("Updated binding with new apply strategy", "clusterResourceBinding", klog.KObj(binding))
return nil
})
}

// The patches are issued in parallel; wait for all of them to complete (or the first error
// to return).
return errs.Wait()
}

// handleCRP handles the update event of a ClusterResourcePlacement, which the rollout controller
// watches.
func handleCRP(newCRPObj, oldCRPObj client.Object, q workqueue.RateLimitingInterface) {
// Do some sanity checks. Normally these checks would never fail.
if newCRPObj == nil || oldCRPObj == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("CRP object is nil")), "Received an unexpected nil object in the CRP Update event", "CRP (new)", klog.KObj(newCRPObj), "CRP (old)", klog.KObj(oldCRPObj))
}
newCRP, newOK := newCRPObj.(*fleetv1beta1.ClusterResourcePlacement)
oldCRP, oldOK := oldCRPObj.(*fleetv1beta1.ClusterResourcePlacement)
if !newOK || !oldOK {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("object is not an CRP object")), "Failed to cast the new object in the CRP Update event to a CRP object", "CRP (new)", klog.KObj(newCRPObj), "CRP (old)", klog.KObj(oldCRPObj), "canCastNewObj", newOK, "canCastOldObj", oldOK)
}

// Check if the CRP has been deleted.
if newCRPObj.GetDeletionTimestamp() != nil {
// No need to process a CRP that has been marked for deletion.
return
}

// Check if the apply strategy has been updated.
newApplyStrategy := newCRP.Spec.Strategy.ApplyStrategy
oldApplyStrategy := oldCRP.Spec.Strategy.ApplyStrategy
if !equality.Semantic.DeepEqual(newApplyStrategy, oldApplyStrategy) {
klog.V(2).InfoS("Detected an update to the apply strategy on the CRP", "clusterResourcePlacement", klog.KObj(newCRP))
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{Name: newCRP.GetName()},
})
}

klog.V(2).InfoS("No update to apply strategy detected; ignore the CRP Update event", "clusterResourcePlacement", klog.KObj(newCRP))
}
137 changes: 137 additions & 0 deletions pkg/controllers/rollout/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -32,6 +34,11 @@ const (
customBindingFinalizer = "custom-binding-finalizer"
)

var (
ignoreCRBTypeMetaAndStatusFields = cmpopts.IgnoreFields(fleetv1beta1.ClusterResourceBinding{}, "TypeMeta", "Status")
ignoreObjectMetaAutoGenFields = cmpopts.IgnoreFields(metav1.ObjectMeta{}, "CreationTimestamp", "Generation", "ResourceVersion", "SelfLink", "UID", "ManagedFields")
)

var testCRPName string

var _ = Describe("Test the rollout Controller", func() {
Expand Down Expand Up @@ -96,6 +103,136 @@ var _ = Describe("Test the rollout Controller", func() {
}, timeout, interval).Should(BeTrue(), "rollout controller should roll all the bindings to Bound state")
})

It("should push apply strategy changes to all the bindings (if applicable)", func() {
// Create a CRP.
targetClusterCount := int32(3)
rolloutCRP = clusterResourcePlacementForTest(
testCRPName,
createPlacementPolicyForTest(fleetv1beta1.PickNPlacementType, targetClusterCount),
createPlacementRolloutStrategyForTest(fleetv1beta1.RollingUpdateRolloutStrategyType, generateDefaultRollingUpdateConfig(), nil))
Expect(k8sClient.Create(ctx, rolloutCRP)).Should(Succeed(), "Failed to create CRP")

// Create a master cluster resource snapshot.
resourceSnapshot := generateResourceSnapshot(rolloutCRP.Name, 0, true)
Expect(k8sClient.Create(ctx, resourceSnapshot)).Should(Succeed(), "Failed to create cluster resource snapshot")

// Create all the bindings.
clusters := make([]string, targetClusterCount)
for i := 0; i < int(targetClusterCount); i++ {
clusters[i] = "cluster-" + utils.RandStr()

// Prepare bindings of various states.
var binding *fleetv1beta1.ClusterResourceBinding
switch {
case i%3 == 0:
binding = generateClusterResourceBinding(fleetv1beta1.BindingStateScheduled, resourceSnapshot.Name, clusters[i])
case i%3 == 1:
binding = generateClusterResourceBinding(fleetv1beta1.BindingStateBound, resourceSnapshot.Name, clusters[i])
default:
binding = generateClusterResourceBinding(fleetv1beta1.BindingStateUnscheduled, resourceSnapshot.Name, clusters[i])
}
Expect(k8sClient.Create(ctx, binding)).Should(Succeed(), "Failed to create cluster resource binding")
bindings = append(bindings, binding)
}

// Verify that all the bindings are updated per rollout strategy.
Eventually(func() error {
for _, binding := range bindings {
gotBinding := &fleetv1beta1.ClusterResourceBinding{}
if err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, gotBinding); err != nil {
return fmt.Errorf("failed to get binding %s: %w", binding.Name, err)
}

wantBinding := &fleetv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: binding.Name,
},
Spec: fleetv1beta1.ResourceBindingSpec{
State: binding.Spec.State,
TargetCluster: binding.Spec.TargetCluster,
ApplyStrategy: &fleetv1beta1.ApplyStrategy{
ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison,
WhenToApply: fleetv1beta1.WhenToApplyTypeAlways,
WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeAlways,
Type: fleetv1beta1.ApplyStrategyTypeClientSideApply,
},
},
}
// The bound binding will have no changes; the scheduled binding, per given
// rollout strategy, will be bound with the resource snapshot.
if binding.Spec.State == fleetv1beta1.BindingStateBound || binding.Spec.State == fleetv1beta1.BindingStateScheduled {
wantBinding.Spec.State = fleetv1beta1.BindingStateBound
wantBinding.Spec.ResourceSnapshotName = resourceSnapshot.Name
}
if diff := cmp.Diff(
gotBinding, wantBinding,
ignoreCRBTypeMetaAndStatusFields, ignoreObjectMetaAutoGenFields,
// For this spec, labels and annotations are irrelevant.
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Labels", "Annotations"),
); diff != "" {
return fmt.Errorf("binding diff (-got, +want):\n%s", diff)
}
}
return nil
}, timeout, interval).Should(Succeed(), "Failed to verify that all the bindings are bound")

// Update the CRP with a new apply strategy.
rolloutCRP.Spec.Strategy.ApplyStrategy = &fleetv1beta1.ApplyStrategy{
ComparisonOption: fleetv1beta1.ComparisonOptionTypeFullComparison,
WhenToApply: fleetv1beta1.WhenToApplyTypeIfNotDrifted,
WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeIfNoDiff,
Type: fleetv1beta1.ApplyStrategyTypeServerSideApply,
ServerSideApplyConfig: &fleetv1beta1.ServerSideApplyConfig{
ForceConflicts: true,
},
}
Expect(k8sClient.Update(ctx, rolloutCRP)).Should(Succeed(), "Failed to update CRP")

// Verify that all the bindings are updated with the new apply strategy.
Eventually(func() error {
for _, binding := range bindings {
gotBinding := &fleetv1beta1.ClusterResourceBinding{}
if err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, gotBinding); err != nil {
return fmt.Errorf("failed to get binding %s: %w", binding.Name, err)
}

wantBinding := &fleetv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: binding.Name,
},
Spec: fleetv1beta1.ResourceBindingSpec{
State: binding.Spec.State,
TargetCluster: binding.Spec.TargetCluster,
ApplyStrategy: &fleetv1beta1.ApplyStrategy{
ComparisonOption: fleetv1beta1.ComparisonOptionTypeFullComparison,
WhenToApply: fleetv1beta1.WhenToApplyTypeIfNotDrifted,
WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeIfNoDiff,
Type: fleetv1beta1.ApplyStrategyTypeServerSideApply,
ServerSideApplyConfig: &fleetv1beta1.ServerSideApplyConfig{
ForceConflicts: true,
},
},
},
}
// The bound binding will have no changes; the scheduled binding, per given
// rollout strategy, will be bound with the resource snapshot.
if binding.Spec.State == fleetv1beta1.BindingStateBound || binding.Spec.State == fleetv1beta1.BindingStateScheduled {
wantBinding.Spec.State = fleetv1beta1.BindingStateBound
wantBinding.Spec.ResourceSnapshotName = resourceSnapshot.Name
}
if diff := cmp.Diff(
gotBinding, wantBinding,
ignoreCRBTypeMetaAndStatusFields, ignoreObjectMetaAutoGenFields,
// For this spec, labels and annotations are irrelevant.
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Labels", "Annotations"),
); diff != "" {
return fmt.Errorf("binding diff (-got, +want):\n%s", diff)
}
}
return nil
}, timeout, interval).Should(Succeed(), "Failed to update all bindings with the new apply strategy")
})

It("Should rollout all the selected bindings when the rollout strategy is not set", func() {
// create CRP
var targetCluster int32 = 11
Expand Down
Loading
Loading