Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 180 additions & 41 deletions cmd/clusterctl/client/cluster/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ func (o *objectMover) move(ctx context.Context, graph *objectGraph, toProxy Prox

// Sets the pause field on the Cluster object in the source management cluster, so the controllers stop reconciling it.
log.V(1).Info("Pausing the source cluster")
if err := setClusterPause(ctx, o.fromProxy, clusters, true, o.dryRun); err != nil {
if err := setClustersPause(ctx, o.fromProxy, clusters, true, o.dryRun); err != nil {
return err
}

log.V(1).Info("Pausing the source ClusterClasses")
if err := setClusterClassPause(ctx, o.fromProxy, clusterClasses, true, o.dryRun); err != nil {
if err := setClusterClassesPause(ctx, o.fromProxy, clusterClasses, true, o.dryRun); err != nil {
return errors.Wrap(err, "error pausing ClusterClasses")
}

Expand Down Expand Up @@ -382,13 +382,13 @@ func (o *objectMover) move(ctx context.Context, graph *objectGraph, toProxy Prox

// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target ClusterClasses")
if err := setClusterClassPause(ctx, toProxy, clusterClasses, false, o.dryRun, mutators...); err != nil {
if err := setClusterClassesPause(ctx, toProxy, clusterClasses, false, o.dryRun, mutators...); err != nil {
return errors.Wrap(err, "error resuming ClusterClasses")
}

// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target cluster")
return setClusterPause(ctx, toProxy, clusters, false, o.dryRun, mutators...)
return setClustersPause(ctx, toProxy, clusters, false, o.dryRun, mutators...)
}

func (o *objectMover) toDirectory(ctx context.Context, graph *objectGraph, directory string) error {
Expand All @@ -400,14 +400,24 @@ func (o *objectMover) toDirectory(ctx context.Context, graph *objectGraph, direc
clusterClasses := graph.getClusterClasses()
log.Info("Moving Cluster API objects", "ClusterClasses", len(clusterClasses))

origClustersPauseState, err := getClustersPauseState(ctx, o.fromProxy, clusters)
if err != nil {
return errors.Wrap(err, "error retrieving Clusters pause state")
}

origClusterClassesPauseState, err := getClusterClassesPauseState(ctx, o.fromProxy, clusterClasses)
if err != nil {
return errors.Wrap(err, "error retrieving ClusterClasses pause state")
}

// Sets the pause field on the Cluster object in the source management cluster, so the controllers stop reconciling it.
log.V(1).Info("Pausing the source cluster")
if err := setClusterPause(ctx, o.fromProxy, clusters, true, o.dryRun); err != nil {
return err
log.V(1).Info("Pausing the source Clusters")
if err := setClustersPause(ctx, o.fromProxy, clusters, true, o.dryRun); err != nil {
return errors.Wrap(err, "error pausing Clusters")
}

log.V(1).Info("Pausing the source ClusterClasses")
if err := setClusterClassPause(ctx, o.fromProxy, clusterClasses, true, o.dryRun); err != nil {
if err := setClusterClassesPause(ctx, o.fromProxy, clusterClasses, true, o.dryRun); err != nil {
return errors.Wrap(err, "error pausing ClusterClasses")
}

Expand All @@ -426,15 +436,15 @@ func (o *objectMover) toDirectory(ctx context.Context, graph *objectGraph, direc
}
}

// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target ClusterClasses")
if err := setClusterClassPause(ctx, o.fromProxy, clusterClasses, false, o.dryRun); err != nil {
return errors.Wrap(err, "error resuming ClusterClasses")
// Restore ClusterClasses' pause fields to their original values..
log.V(1).Info("Restoring ClusterClasses paused annotations to original state")
if err := restoreClusterClassesPause(ctx, o.fromProxy, clusterClasses, origClusterClassesPauseState, o.dryRun); err != nil {
return errors.Wrap(err, "error restoring ClusterClass paused annotations")
}

// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the source cluster")
return setClusterPause(ctx, o.fromProxy, clusters, false, o.dryRun)
// Restore Clusters' pause fields to their original values.
log.V(1).Info("Restoring Clusters .Spec.Paused to original states")
return restoreClustersPause(ctx, o.fromProxy, clusters, origClustersPauseState, o.dryRun)
}

func (o *objectMover) fromDirectory(ctx context.Context, graph *objectGraph, toProxy Proxy) error {
Expand Down Expand Up @@ -469,14 +479,14 @@ func (o *objectMover) fromDirectory(ctx context.Context, graph *objectGraph, toP
// Resume reconciling the ClusterClasses after being restored from a backup.
// By default, during backup, ClusterClasses are paused so they must be unpaused to be used again
log.V(1).Info("Resuming the target ClusterClasses")
if err := setClusterClassPause(ctx, toProxy, clusterClasses, false, o.dryRun); err != nil {
if err := setClusterClassesPause(ctx, toProxy, clusterClasses, false, o.dryRun); err != nil {
return errors.Wrap(err, "error resuming ClusterClasses")
}

// Resume reconciling the Clusters after being restored from a directory.
// By default, when moved to a directory, Clusters are paused, so they must be unpaused to be used again.
log.V(1).Info("Resuming the target cluster")
return setClusterPause(ctx, toProxy, clusters, false, o.dryRun)
return setClustersPause(ctx, toProxy, clusters, false, o.dryRun)
}

// moveSequence defines a list of group of moveGroups.
Expand Down Expand Up @@ -554,8 +564,32 @@ func getMoveSequence(graph *objectGraph) *moveSequence {
return moveSequence
}

// setClusterPause sets the paused field on nodes referring to Cluster objects.
func setClusterPause(ctx context.Context, proxy Proxy, clusters []*node, value bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
func stateKey(n *node) string {
return fmt.Sprintf("%s/%s", n.identity.Namespace, n.identity.Name)
}

// setClustersPause sets the paused field on nodes referring to Cluster objects.
func setClustersPause(ctx context.Context, proxy Proxy, clusters []*node, value bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
for i := range clusters {
if err := setClusterPause(ctx, proxy, clusters[i], value, dryRun, mutators...); err != nil {
return err
}
}
return nil
}

// setClusterClassesPause sets the paused annotation on nodes referring to ClusterClass objects.
func setClusterClassesPause(ctx context.Context, proxy Proxy, clusterclasses []*node, pause bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
for i := range clusterclasses {
if err := setClusterClassPause(ctx, proxy, clusterclasses[i], pause, dryRun, mutators...); err != nil {
return err
}
}
return nil
}

// setClusterPause sets the paused field on node referring to a Cluster object.
func setClusterPause(ctx context.Context, proxy Proxy, cluster *node, value bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
if dryRun {
return nil
}
Expand All @@ -570,44 +604,149 @@ func setClusterPause(ctx context.Context, proxy Proxy, clusters []*node, value b
patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf("{\"spec\":{\"paused\":%s}}", patchValue)))

setClusterPauseBackoff := newWriteBackoff()
for i := range clusters {
cluster := clusters[i]
log.V(5).Info("Set Cluster.Spec.Paused", "paused", value, "Cluster", klog.KRef(cluster.identity.Namespace, cluster.identity.Name))
log.V(5).Info("Set Cluster.Spec.Paused", "paused", value, "Cluster", klog.KRef(cluster.identity.Namespace, cluster.identity.Name))

// Nb. The operation is wrapped in a retry loop to make setClusterPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(ctx, setClusterPauseBackoff, func(ctx context.Context) error {
return patchCluster(ctx, proxy, cluster, patch, mutators...)
}); err != nil {
return errors.Wrapf(err, "error setting Cluster.Spec.Paused=%t", value)
}
// Nb. The operation is wrapped in a retry loop to make setClusterPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(ctx, setClusterPauseBackoff, func(ctx context.Context) error {
return patchCluster(ctx, proxy, cluster, patch, mutators...)
}); err != nil {
return errors.Wrapf(err, "error setting Cluster.Spec.Paused=%t", value)
}

return nil
}

// setClusterClassPause sets the paused annotation on nodes referring to ClusterClass objects.
func setClusterClassPause(ctx context.Context, proxy Proxy, clusterclasses []*node, pause bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
// setClusterClassPause sets the paused annotation on node referring to a ClusterClass object.
func setClusterClassPause(ctx context.Context, proxy Proxy, clusterclass *node, pause bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
if dryRun {
return nil
}

log := logf.Log

setClusterClassPauseBackoff := newWriteBackoff()

if pause {
log.V(5).Info("Set Paused annotation", "ClusterClass", clusterclass.identity.Name, "Namespace", clusterclass.identity.Namespace)
} else {
log.V(5).Info("Remove Paused annotation", "ClusterClass", clusterclass.identity.Name, "Namespace", clusterclass.identity.Namespace)
}

// Nb. The operation is wrapped in a retry loop to make setClusterClassPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(ctx, setClusterClassPauseBackoff, func(ctx context.Context) error {
return pauseClusterClass(ctx, proxy, clusterclass, pause, mutators...)
}); err != nil {
return errors.Wrapf(err, "error updating ClusterClass %s/%s", clusterclass.identity.Namespace, clusterclass.identity.Name)
}

return nil
}

// getClustersPauseState returns the pause state of all given nodes referring to Cluster objects.
func getClustersPauseState(ctx context.Context, proxy Proxy, clusters []*node) (map[string]bool, error) {
cFrom, err := proxy.NewClient(ctx)
if err != nil {
return nil, err
}

states := make(map[string]bool, len(clusters))
for i := range clusters {
cluster := clusters[i]

clusterObj := &clusterv1.Cluster{
TypeMeta: metav1.TypeMeta{
Kind: clusterv1.ClusterKind,
APIVersion: clusterv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: cluster.identity.Name,
Namespace: cluster.identity.Namespace,
},
}

if err := cFrom.Get(ctx, client.ObjectKeyFromObject(clusterObj), clusterObj); err != nil {
return nil, errors.Wrapf(err, "error reading Cluster %s/%s", clusterObj.GetNamespace(), clusterObj.GetName())
}

states[stateKey(cluster)] = ptr.Deref(clusterObj.Spec.Paused, false)
}

return states, nil
}

// getClusterClassesPauseState returns the pause state of all given nodes referring to ClusterClass objects.
func getClusterClassesPauseState(ctx context.Context, proxy Proxy, clusterclasses []*node) (map[string]bool, error) {
cFrom, err := proxy.NewClient(ctx)
if err != nil {
return nil, err
}

states := make(map[string]bool, len(clusterclasses))
for i := range clusterclasses {
clusterclass := clusterclasses[i]
if pause {
log.V(5).Info("Set Paused annotation", "ClusterClass", clusterclass.identity.Name, "Namespace", clusterclass.identity.Namespace)

clusterClassObj := &clusterv1.ClusterClass{
TypeMeta: metav1.TypeMeta{
Kind: clusterv1.ClusterClassKind,
APIVersion: clusterv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: clusterclass.identity.Name,
Namespace: clusterclass.identity.Namespace,
},
}

clusterClassKey := client.ObjectKeyFromObject(clusterClassObj)

if err := cFrom.Get(ctx, clusterClassKey, clusterClassObj); err != nil {
return nil, errors.Wrapf(err, "error reading ClusterClass %s/%s", clusterClassObj.GetNamespace(), clusterClassObj.GetName())
}

annotations := clusterClassObj.GetAnnotations()
if annotations == nil {
states[stateKey(clusterclass)] = false
} else {
log.V(5).Info("Remove Paused annotation", "ClusterClass", clusterclass.identity.Name, "Namespace", clusterclass.identity.Namespace)
// adjust annotation key if your implementation uses a different one
_, ok := annotations[clusterv1.PausedAnnotation]
states[stateKey(clusterclass)] = ok
}
}

// Nb. The operation is wrapped in a retry loop to make setClusterClassPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(ctx, setClusterClassPauseBackoff, func(ctx context.Context) error {
return pauseClusterClass(ctx, proxy, clusterclass, pause, mutators...)
}); err != nil {
return errors.Wrapf(err, "error updating ClusterClass %s/%s", clusterclass.identity.Namespace, clusterclass.identity.Name)
return states, nil
}

func restoreClustersPause(ctx context.Context, proxy Proxy, clusters []*node, states map[string]bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
log := logf.Log
for i := range clusters {
cluster := clusters[i]
paused, ok := states[stateKey(cluster)]
if !ok {
paused = false
}

log.V(5).Info("Restoring Cluster.Spec.Paused", "paused", paused, "Cluster", klog.KRef(cluster.identity.Namespace, cluster.identity.Name))

if err := setClusterPause(ctx, proxy, cluster, paused, dryRun, mutators...); err != nil {
return err
}
}

return nil
}

func restoreClusterClassesPause(ctx context.Context, proxy Proxy, clusterclasses []*node, states map[string]bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
for i := range clusterclasses {
clusterclass := clusterclasses[i]
paused, ok := states[stateKey(clusterclass)]
if !ok {
paused = false
}

if err := setClusterClassPause(ctx, proxy, clusterclass, paused, dryRun, mutators...); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -723,7 +862,8 @@ func pauseClusterClass(ctx context.Context, proxy Proxy, n *node, pause bool, mu
ObjectMeta: metav1.ObjectMeta{
Name: n.identity.Name,
Namespace: n.identity.Namespace,
}}, mutators...)
},
}, mutators...)
if err != nil {
return err
}
Expand Down Expand Up @@ -1072,7 +1212,7 @@ func (o *objectMover) backupTargetObject(ctx context.Context, nodeToCreate *node
}
}

err = os.WriteFile(objectFile, byObj, 0600)
err = os.WriteFile(objectFile, byObj, 0o600)
if err != nil {
return err
}
Expand Down Expand Up @@ -1173,7 +1313,6 @@ func (o *objectMover) deleteGroup(ctx context.Context, group moveGroup) error {
err := retryWithExponentialBackoff(ctx, deleteSourceObjectBackoff, func(ctx context.Context) error {
return o.deleteSourceObject(ctx, nodeToDelete)
})

if err != nil {
errList = append(errList, err)
}
Expand Down
Loading