@@ -42,10 +42,13 @@ import (
42
42
"sigs.k8s.io/cluster-api/util/yaml"
43
43
)
44
44
45
+ // ResourceMutatorFunc holds the type for mutators to be applied on resources during a move operation.
46
+ type ResourceMutatorFunc func (u * unstructured.Unstructured )
47
+
45
48
// ObjectMover defines methods for moving Cluster API objects to another management cluster.
46
49
type ObjectMover interface {
47
50
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
48
- Move (namespace string , toCluster Client , dryRun bool ) error
51
+ Move (namespace string , mutators [] ResourceMutatorFunc , toCluster Client , dryRun bool ) error
49
52
50
53
// ToDirectory writes all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target directory.
51
54
ToDirectory (namespace string , directory string ) error
@@ -64,7 +67,7 @@ type objectMover struct {
64
67
// ensure objectMover implements the ObjectMover interface.
65
68
var _ ObjectMover = & objectMover {}
66
69
67
- func (o * objectMover ) Move (namespace string , toCluster Client , dryRun bool ) error {
70
+ func (o * objectMover ) Move (namespace string , mutators [] ResourceMutatorFunc , toCluster Client , dryRun bool ) error {
68
71
log := logf .Log
69
72
log .Info ("Performing move..." )
70
73
o .dryRun = dryRun
@@ -92,7 +95,7 @@ func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) erro
92
95
proxy = toCluster .Proxy ()
93
96
}
94
97
95
- return o .move (objectGraph , proxy )
98
+ return o .move (objectGraph , proxy , mutators )
96
99
}
97
100
98
101
func (o * objectMover ) ToDirectory (namespace string , directory string ) error {
@@ -309,7 +312,7 @@ func getMachineObj(proxy Proxy, machine *node, machineObj *clusterv1.Machine) er
309
312
}
310
313
311
314
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
312
- func (o * objectMover ) move (graph * objectGraph , toProxy Proxy ) error {
315
+ func (o * objectMover ) move (graph * objectGraph , toProxy Proxy , mutators [] ResourceMutatorFunc ) error {
313
316
log := logf .Log
314
317
315
318
clusters := graph .getClusters ()
@@ -320,12 +323,12 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
320
323
321
324
// Sets the pause field on the Cluster object in the source management cluster, so the controllers stop reconciling it.
322
325
log .V (1 ).Info ("Pausing the source cluster" )
323
- if err := setClusterPause (o .fromProxy , clusters , true , o .dryRun ); err != nil {
326
+ if err := setClusterPause (o .fromProxy , clusters , nil , true , o .dryRun ); err != nil {
324
327
return err
325
328
}
326
329
327
330
log .V (1 ).Info ("Pausing the source ClusterClasses" )
328
- if err := setClusterClassPause (o .fromProxy , clusterClasses , true , o .dryRun ); err != nil {
331
+ if err := setClusterClassPause (o .fromProxy , clusterClasses , nil , true , o .dryRun ); err != nil {
329
332
return errors .Wrap (err , "error pausing ClusterClasses" )
330
333
}
331
334
@@ -345,7 +348,7 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
345
348
// Create all objects group by group, ensuring all the ownerReferences are re-created.
346
349
log .Info ("Creating objects in the target cluster" )
347
350
for groupIndex := 0 ; groupIndex < len (moveSequence .groups ); groupIndex ++ {
348
- if err := o .createGroup (moveSequence .getGroup (groupIndex ), toProxy ); err != nil {
351
+ if err := o .createGroup (moveSequence .getGroup (groupIndex ), toProxy , mutators ); err != nil {
349
352
return err
350
353
}
351
354
}
@@ -360,13 +363,13 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
360
363
361
364
// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
362
365
log .V (1 ).Info ("Resuming the target ClusterClasses" )
363
- if err := setClusterClassPause (toProxy , clusterClasses , false , o .dryRun ); err != nil {
366
+ if err := setClusterClassPause (toProxy , clusterClasses , mutators , false , o .dryRun ); err != nil {
364
367
return errors .Wrap (err , "error resuming ClusterClasses" )
365
368
}
366
369
367
370
// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
368
371
log .V (1 ).Info ("Resuming the target cluster" )
369
- return setClusterPause (toProxy , clusters , false , o .dryRun )
372
+ return setClusterPause (toProxy , clusters , mutators , false , o .dryRun )
370
373
}
371
374
372
375
func (o * objectMover ) toDirectory (graph * objectGraph , directory string ) error {
@@ -380,12 +383,12 @@ func (o *objectMover) toDirectory(graph *objectGraph, directory string) error {
380
383
381
384
// Sets the pause field on the Cluster object in the source management cluster, so the controllers stop reconciling it.
382
385
log .V (1 ).Info ("Pausing the source cluster" )
383
- if err := setClusterPause (o .fromProxy , clusters , true , o .dryRun ); err != nil {
386
+ if err := setClusterPause (o .fromProxy , clusters , nil , true , o .dryRun ); err != nil {
384
387
return err
385
388
}
386
389
387
390
log .V (1 ).Info ("Pausing the source ClusterClasses" )
388
- if err := setClusterClassPause (o .fromProxy , clusterClasses , true , o .dryRun ); err != nil {
391
+ if err := setClusterClassPause (o .fromProxy , clusterClasses , nil , true , o .dryRun ); err != nil {
389
392
return errors .Wrap (err , "error pausing ClusterClasses" )
390
393
}
391
394
@@ -406,13 +409,13 @@ func (o *objectMover) toDirectory(graph *objectGraph, directory string) error {
406
409
407
410
// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
408
411
log .V (1 ).Info ("Resuming the target ClusterClasses" )
409
- if err := setClusterClassPause (o .fromProxy , clusterClasses , false , o .dryRun ); err != nil {
412
+ if err := setClusterClassPause (o .fromProxy , clusterClasses , nil , false , o .dryRun ); err != nil {
410
413
return errors .Wrap (err , "error resuming ClusterClasses" )
411
414
}
412
415
413
416
// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
414
417
log .V (1 ).Info ("Resuming the source cluster" )
415
- return setClusterPause (o .fromProxy , clusters , false , o .dryRun )
418
+ return setClusterPause (o .fromProxy , clusters , nil , false , o .dryRun )
416
419
}
417
420
418
421
func (o * objectMover ) fromDirectory (graph * objectGraph , toProxy Proxy ) error {
@@ -447,14 +450,14 @@ func (o *objectMover) fromDirectory(graph *objectGraph, toProxy Proxy) error {
447
450
// Resume reconciling the ClusterClasses after being restored from a backup.
448
451
// By default, during backup, ClusterClasses are paused so they must be unpaused to be used again
449
452
log .V (1 ).Info ("Resuming the target ClusterClasses" )
450
- if err := setClusterClassPause (toProxy , clusterClasses , false , o .dryRun ); err != nil {
453
+ if err := setClusterClassPause (toProxy , clusterClasses , nil , false , o .dryRun ); err != nil {
451
454
return errors .Wrap (err , "error resuming ClusterClasses" )
452
455
}
453
456
454
457
// Resume reconciling the Clusters after being restored from a directory.
455
458
// By default, when moved to a directory, Clusters are paused, so they must be unpaused to be used again.
456
459
log .V (1 ).Info ("Resuming the target cluster" )
457
- return setClusterPause (toProxy , clusters , false , o .dryRun )
460
+ return setClusterPause (toProxy , clusters , nil , false , o .dryRun )
458
461
}
459
462
460
463
// moveSequence defines a list of group of moveGroups.
@@ -533,7 +536,7 @@ func getMoveSequence(graph *objectGraph) *moveSequence {
533
536
}
534
537
535
538
// setClusterPause sets the paused field on nodes referring to Cluster objects.
536
- func setClusterPause (proxy Proxy , clusters []* node , value bool , dryRun bool ) error {
539
+ func setClusterPause (proxy Proxy , clusters []* node , mutators [] ResourceMutatorFunc , value bool , dryRun bool ) error {
537
540
if dryRun {
538
541
return nil
539
542
}
@@ -554,7 +557,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err
554
557
555
558
// Nb. The operation is wrapped in a retry loop to make setClusterPause more resilient to unexpected conditions.
556
559
if err := retryWithExponentialBackoff (setClusterPauseBackoff , func () error {
557
- return patchCluster (proxy , cluster , patch )
560
+ return patchCluster (proxy , cluster , patch , mutators )
558
561
}); err != nil {
559
562
return errors .Wrapf (err , "error setting Cluster.Spec.Paused=%t" , value )
560
563
}
@@ -563,7 +566,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err
563
566
}
564
567
565
568
// setClusterClassPause sets the paused annotation on nodes referring to ClusterClass objects.
566
- func setClusterClassPause (proxy Proxy , clusterclasses []* node , pause bool , dryRun bool ) error {
569
+ func setClusterClassPause (proxy Proxy , clusterclasses []* node , mutators [] ResourceMutatorFunc , pause bool , dryRun bool ) error {
567
570
if dryRun {
568
571
return nil
569
572
}
@@ -581,7 +584,7 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu
581
584
582
585
// Nb. The operation is wrapped in a retry loop to make setClusterClassPause more resilient to unexpected conditions.
583
586
if err := retryWithExponentialBackoff (setClusterClassPauseBackoff , func () error {
584
- return pauseClusterClass (proxy , clusterclass , pause )
587
+ return pauseClusterClass (proxy , clusterclass , pause , mutators )
585
588
}); err != nil {
586
589
return errors .Wrapf (err , "error updating ClusterClass %s/%s" , clusterclass .identity .Namespace , clusterclass .identity .Name )
587
590
}
@@ -590,19 +593,23 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu
590
593
}
591
594
592
595
// patchCluster applies a patch to a node referring to a Cluster object.
593
- func patchCluster (proxy Proxy , cluster * node , patch client.Patch ) error {
596
+ func patchCluster (proxy Proxy , n * node , patch client.Patch , mutators [] ResourceMutatorFunc ) error {
594
597
cFrom , err := proxy .NewClient ()
595
598
if err != nil {
596
599
return err
597
600
}
598
601
599
- clusterObj := & clusterv1.Cluster {}
600
- clusterObjKey := client.ObjectKey {
601
- Namespace : cluster .identity .Namespace ,
602
- Name : cluster .identity .Name ,
602
+ // Get the ClusterClass from the server
603
+ clusterObj := & unstructured.Unstructured {}
604
+ clusterObj .SetAPIVersion (clusterv1 .GroupVersion .String ())
605
+ clusterObj .SetKind (clusterv1 .KindCluster )
606
+ clusterObj .SetName (n .identity .Name )
607
+ clusterObj .SetNamespace (n .identity .Namespace )
608
+ for _ , mutator := range mutators {
609
+ mutator (clusterObj )
603
610
}
604
611
605
- if err := cFrom .Get (ctx , clusterObjKey , clusterObj ); err != nil {
612
+ if err := cFrom .Get (ctx , client . ObjectKeyFromObject ( clusterObj ) , clusterObj ); err != nil {
606
613
return errors .Wrapf (err , "error reading Cluster %s/%s" ,
607
614
clusterObj .GetNamespace (), clusterObj .GetName ())
608
615
}
@@ -615,19 +622,22 @@ func patchCluster(proxy Proxy, cluster *node, patch client.Patch) error {
615
622
return nil
616
623
}
617
624
618
- func pauseClusterClass (proxy Proxy , n * node , pause bool ) error {
625
+ func pauseClusterClass (proxy Proxy , n * node , pause bool , mutators [] ResourceMutatorFunc ) error {
619
626
cFrom , err := proxy .NewClient ()
620
627
if err != nil {
621
628
return errors .Wrap (err , "error creating client" )
622
629
}
623
630
624
631
// Get the ClusterClass from the server
625
- clusterClass := & clusterv1.ClusterClass {}
626
- clusterClassObjKey := client.ObjectKey {
627
- Name : n .identity .Name ,
628
- Namespace : n .identity .Namespace ,
629
- }
630
- if err := cFrom .Get (ctx , clusterClassObjKey , clusterClass ); err != nil {
632
+ clusterClass := & unstructured.Unstructured {}
633
+ clusterClass .SetAPIVersion (clusterv1 .GroupVersion .String ())
634
+ clusterClass .SetKind (clusterv1 .KindClusterClass )
635
+ clusterClass .SetName (n .identity .Name )
636
+ clusterClass .SetNamespace (n .identity .Namespace )
637
+ for _ , mutator := range mutators {
638
+ mutator (clusterClass )
639
+ }
640
+ if err := cFrom .Get (ctx , client .ObjectKeyFromObject (clusterClass ), clusterClass ); err != nil {
631
641
return errors .Wrapf (err , "error reading ClusterClass %s/%s" , n .identity .Namespace , n .identity .Name )
632
642
}
633
643
@@ -740,7 +750,7 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
740
750
return err
741
751
}
742
752
743
- // If the namespace does not exists , create it.
753
+ // If the namespace does not exist , create it.
744
754
ns = & corev1.Namespace {
745
755
TypeMeta : metav1.TypeMeta {
746
756
APIVersion : "v1" ,
@@ -758,15 +768,18 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
758
768
}
759
769
760
770
// createGroup creates all the Kubernetes objects into the target management cluster corresponding to the object graph nodes in a moveGroup.
761
- func (o * objectMover ) createGroup (group moveGroup , toProxy Proxy ) error {
771
+ func (o * objectMover ) createGroup (group moveGroup , toProxy Proxy , mutators [] ResourceMutatorFunc ) error {
762
772
createTargetObjectBackoff := newWriteBackoff ()
763
773
errList := []error {}
764
774
775
+ // Maintain a cache of namespaces that have been verified to already exist.
776
+ // Nb. This prevents us from making repetitive (and expensive) calls in listing all namespaces to ensure a namespace exists before creating a resource.
777
+ ensuredNamespaces := sets .New [string ]()
765
778
for _ , nodeToCreate := range group {
766
779
// Creates the Kubernetes object corresponding to the nodeToCreate.
767
780
// Nb. The operation is wrapped in a retry loop to make move more resilient to unexpected conditions.
768
781
err := retryWithExponentialBackoff (createTargetObjectBackoff , func () error {
769
- return o .createTargetObject (nodeToCreate , toProxy )
782
+ return o .createTargetObject (nodeToCreate , toProxy , mutators , ensuredNamespaces )
770
783
})
771
784
if err != nil {
772
785
errList = append (errList , err )
@@ -825,7 +838,7 @@ func (o *objectMover) restoreGroup(group moveGroup, toProxy Proxy) error {
825
838
}
826
839
827
840
// createTargetObject creates the Kubernetes object in the target Management cluster corresponding to the object graph node, taking care of restoring the OwnerReference with the owner nodes, if any.
828
- func (o * objectMover ) createTargetObject (nodeToCreate * node , toProxy Proxy ) error {
841
+ func (o * objectMover ) createTargetObject (nodeToCreate * node , toProxy Proxy , mutators [] ResourceMutatorFunc , ensuredNamespaces sets. Set [ string ] ) error {
829
842
log := logf .Log
830
843
log .V (1 ).Info ("Creating" , nodeToCreate .identity .Kind , nodeToCreate .identity .Name , "Namespace" , nodeToCreate .identity .Namespace )
831
844
@@ -858,7 +871,7 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
858
871
// Removes current OwnerReferences
859
872
obj .SetOwnerReferences (nil )
860
873
861
- // Rebuild the owne reference chain
874
+ // Rebuild the owner reference chain
862
875
o .buildOwnerChain (obj , nodeToCreate )
863
876
864
877
// FIXME Workaround for https://github.com/kubernetes/kubernetes/issues/32220. Remove when the issue is fixed.
@@ -873,6 +886,15 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
873
886
return err
874
887
}
875
888
889
+ for _ , mutator := range mutators {
890
+ mutator (obj )
891
+ }
892
+ // Applying mutators MAY change the namespace, so ensure the namespace exists before creating the resource.
893
+ if ! nodeToCreate .isGlobal && ! ensuredNamespaces .Has (obj .GetNamespace ()) {
894
+ if err = o .ensureNamespace (toProxy , obj .GetNamespace ()); err != nil {
895
+ return err
896
+ }
897
+ }
876
898
oldManagedFields := obj .GetManagedFields ()
877
899
if err := cTo .Create (ctx , obj ); err != nil {
878
900
if ! apierrors .IsAlreadyExists (err ) {
0 commit comments