@@ -27,6 +27,7 @@ import (
27
27
apierrors "k8s.io/apimachinery/pkg/api/errors"
28
28
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
29
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
+ "k8s.io/apimachinery/pkg/runtime"
30
31
"k8s.io/apimachinery/pkg/types"
31
32
kerrors "k8s.io/apimachinery/pkg/util/errors"
32
33
"k8s.io/apimachinery/pkg/util/sets"
@@ -42,10 +43,13 @@ import (
42
43
"sigs.k8s.io/cluster-api/util/yaml"
43
44
)
44
45
46
+ // ResourceMutatorFunc holds the type for mutators to be applied on resources during a move operation.
47
+ type ResourceMutatorFunc func (u * unstructured.Unstructured ) error
48
+
45
49
// ObjectMover defines methods for moving Cluster API objects to another management cluster.
46
50
type ObjectMover interface {
47
51
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
48
- Move (namespace string , toCluster Client , dryRun bool ) error
52
+ Move (namespace string , toCluster Client , dryRun bool , mutators ... ResourceMutatorFunc ) error
49
53
50
54
// ToDirectory writes all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target directory.
51
55
ToDirectory (namespace string , directory string ) error
@@ -64,7 +68,7 @@ type objectMover struct {
64
68
// ensure objectMover implements the ObjectMover interface.
65
69
var _ ObjectMover = & objectMover {}
66
70
67
- func (o * objectMover ) Move (namespace string , toCluster Client , dryRun bool ) error {
71
+ func (o * objectMover ) Move (namespace string , toCluster Client , dryRun bool , mutators ... ResourceMutatorFunc ) error {
68
72
log := logf .Log
69
73
log .Info ("Performing move..." )
70
74
o .dryRun = dryRun
@@ -92,7 +96,7 @@ func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) erro
92
96
proxy = toCluster .Proxy ()
93
97
}
94
98
95
- return o .move (objectGraph , proxy )
99
+ return o .move (objectGraph , proxy , mutators ... )
96
100
}
97
101
98
102
func (o * objectMover ) ToDirectory (namespace string , directory string ) error {
@@ -309,7 +313,7 @@ func getMachineObj(proxy Proxy, machine *node, machineObj *clusterv1.Machine) er
309
313
}
310
314
311
315
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
312
- func (o * objectMover ) move (graph * objectGraph , toProxy Proxy ) error {
316
+ func (o * objectMover ) move (graph * objectGraph , toProxy Proxy , mutators ... ResourceMutatorFunc ) error {
313
317
log := logf .Log
314
318
315
319
clusters := graph .getClusters ()
@@ -329,11 +333,9 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
329
333
return errors .Wrap (err , "error pausing ClusterClasses" )
330
334
}
331
335
332
- // Ensure all the expected target namespaces are in place before creating objects.
333
- log .V (1 ).Info ("Creating target namespaces, if missing" )
334
- if err := o .ensureNamespaces (graph , toProxy ); err != nil {
335
- return err
336
- }
336
+ // Nb. DO NOT call ensureNamespaces at this point because:
337
+ // - namespace will be ensured to exist before creating the resource.
338
+ // - If it's done here, we might create a namespace that can end up unused on target cluster (due to mutators).
337
339
338
340
// Define the move sequence by processing the ownerReference chain, so we ensure that a Kubernetes object is moved only after its owners.
339
341
// The sequence is bases on object graph nodes, each one representing a Kubernetes object; nodes are grouped, so bulk of nodes can be moved in parallel. e.g.
@@ -345,11 +347,15 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
345
347
// Create all objects group by group, ensuring all the ownerReferences are re-created.
346
348
log .Info ("Creating objects in the target cluster" )
347
349
for groupIndex := 0 ; groupIndex < len (moveSequence .groups ); groupIndex ++ {
348
- if err := o .createGroup (moveSequence .getGroup (groupIndex ), toProxy ); err != nil {
350
+ if err := o .createGroup (moveSequence .getGroup (groupIndex ), toProxy , mutators ... ); err != nil {
349
351
return err
350
352
}
351
353
}
352
354
355
+ // Nb. mutators used after this point (after creating the resources on target clusters) are mainly intended for
356
+ // using the right namespace to fetch the resource from the target cluster.
357
+ // mutators affecting non metadata fields are no-op after this point.
358
+
353
359
// Delete all objects group by group in reverse order.
354
360
log .Info ("Deleting objects from the source cluster" )
355
361
for groupIndex := len (moveSequence .groups ) - 1 ; groupIndex >= 0 ; groupIndex -- {
@@ -360,13 +366,13 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
360
366
361
367
// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
362
368
log .V (1 ).Info ("Resuming the target ClusterClasses" )
363
- if err := setClusterClassPause (toProxy , clusterClasses , false , o .dryRun ); err != nil {
369
+ if err := setClusterClassPause (toProxy , clusterClasses , false , o .dryRun , mutators ... ); err != nil {
364
370
return errors .Wrap (err , "error resuming ClusterClasses" )
365
371
}
366
372
367
373
// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
368
374
log .V (1 ).Info ("Resuming the target cluster" )
369
- return setClusterPause (toProxy , clusters , false , o .dryRun )
375
+ return setClusterPause (toProxy , clusters , false , o .dryRun , mutators ... )
370
376
}
371
377
372
378
func (o * objectMover ) toDirectory (graph * objectGraph , directory string ) error {
@@ -533,7 +539,7 @@ func getMoveSequence(graph *objectGraph) *moveSequence {
533
539
}
534
540
535
541
// setClusterPause sets the paused field on nodes referring to Cluster objects.
536
- func setClusterPause (proxy Proxy , clusters []* node , value bool , dryRun bool ) error {
542
+ func setClusterPause (proxy Proxy , clusters []* node , value bool , dryRun bool , mutators ... ResourceMutatorFunc ) error {
537
543
if dryRun {
538
544
return nil
539
545
}
@@ -554,7 +560,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err
554
560
555
561
// Nb. The operation is wrapped in a retry loop to make setClusterPause more resilient to unexpected conditions.
556
562
if err := retryWithExponentialBackoff (setClusterPauseBackoff , func () error {
557
- return patchCluster (proxy , cluster , patch )
563
+ return patchCluster (proxy , cluster , patch , mutators ... )
558
564
}); err != nil {
559
565
return errors .Wrapf (err , "error setting Cluster.Spec.Paused=%t" , value )
560
566
}
@@ -563,7 +569,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err
563
569
}
564
570
565
571
// setClusterClassPause sets the paused annotation on nodes referring to ClusterClass objects.
566
- func setClusterClassPause (proxy Proxy , clusterclasses []* node , pause bool , dryRun bool ) error {
572
+ func setClusterClassPause (proxy Proxy , clusterclasses []* node , pause bool , dryRun bool , mutators ... ResourceMutatorFunc ) error {
567
573
if dryRun {
568
574
return nil
569
575
}
@@ -581,7 +587,7 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu
581
587
582
588
// Nb. The operation is wrapped in a retry loop to make setClusterClassPause more resilient to unexpected conditions.
583
589
if err := retryWithExponentialBackoff (setClusterClassPauseBackoff , func () error {
584
- return pauseClusterClass (proxy , clusterclass , pause )
590
+ return pauseClusterClass (proxy , clusterclass , pause , mutators ... )
585
591
}); err != nil {
586
592
return errors .Wrapf (err , "error updating ClusterClass %s/%s" , clusterclass .identity .Namespace , clusterclass .identity .Name )
587
593
}
@@ -590,19 +596,29 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu
590
596
}
591
597
592
598
// patchCluster applies a patch to a node referring to a Cluster object.
593
- func patchCluster (proxy Proxy , cluster * node , patch client.Patch ) error {
599
+ func patchCluster (proxy Proxy , n * node , patch client.Patch , mutators ... ResourceMutatorFunc ) error {
594
600
cFrom , err := proxy .NewClient ()
595
601
if err != nil {
596
602
return err
597
603
}
598
604
599
- clusterObj := & clusterv1.Cluster {}
600
- clusterObjKey := client.ObjectKey {
601
- Namespace : cluster .identity .Namespace ,
602
- Name : cluster .identity .Name ,
605
+ // Since the patch has been generated already in caller of this function, the ONLY affect that mutators can have
606
+ // here is on namespace of the resource.
607
+ clusterObj , err := applyMutators (& clusterv1.Cluster {
608
+ TypeMeta : metav1.TypeMeta {
609
+ Kind : clusterv1 .ClusterKind ,
610
+ APIVersion : clusterv1 .GroupVersion .String (),
611
+ },
612
+ ObjectMeta : metav1.ObjectMeta {
613
+ Name : n .identity .Name ,
614
+ Namespace : n .identity .Namespace ,
615
+ },
616
+ }, mutators ... )
617
+ if err != nil {
618
+ return err
603
619
}
604
620
605
- if err := cFrom .Get (ctx , clusterObjKey , clusterObj ); err != nil {
621
+ if err := cFrom .Get (ctx , client . ObjectKeyFromObject ( clusterObj ) , clusterObj ); err != nil {
606
622
return errors .Wrapf (err , "error reading Cluster %s/%s" ,
607
623
clusterObj .GetNamespace (), clusterObj .GetName ())
608
624
}
@@ -615,18 +631,35 @@ func patchCluster(proxy Proxy, cluster *node, patch client.Patch) error {
615
631
return nil
616
632
}
617
633
618
- func pauseClusterClass (proxy Proxy , n * node , pause bool ) error {
634
+ func pauseClusterClass (proxy Proxy , n * node , pause bool , mutators ... ResourceMutatorFunc ) error {
619
635
cFrom , err := proxy .NewClient ()
620
636
if err != nil {
621
637
return errors .Wrap (err , "error creating client" )
622
638
}
623
639
624
- // Get the ClusterClass from the server
640
+ // Get a mutated copy of the ClusterClass to identify the target namespace.
641
+ // The ClusterClass could have been moved to a different namespace after the move.
642
+ mutatedClusterClass , err := applyMutators (& clusterv1.ClusterClass {
643
+ TypeMeta : metav1.TypeMeta {
644
+ Kind : clusterv1 .ClusterClassKind ,
645
+ APIVersion : clusterv1 .GroupVersion .String (),
646
+ },
647
+ ObjectMeta : metav1.ObjectMeta {
648
+ Name : n .identity .Name ,
649
+ Namespace : n .identity .Namespace ,
650
+ }}, mutators ... )
651
+ if err != nil {
652
+ return err
653
+ }
654
+
625
655
clusterClass := & clusterv1.ClusterClass {}
656
+ // Construct an object key using the mutatedClusterClass reflecting any changes to the namespace.
626
657
clusterClassObjKey := client.ObjectKey {
627
- Name : n . identity . Name ,
628
- Namespace : n . identity . Namespace ,
658
+ Name : mutatedClusterClass . GetName () ,
659
+ Namespace : mutatedClusterClass . GetNamespace () ,
629
660
}
661
+ // Get a copy of the ClusterClass.
662
+ // This will ensure that any other changes from the mutator are ignored here as we work with a fresh copy of the cluster class.
630
663
if err := cFrom .Get (ctx , clusterClassObjKey , clusterClass ); err != nil {
631
664
return errors .Wrapf (err , "error reading ClusterClass %s/%s" , n .identity .Namespace , n .identity .Name )
632
665
}
@@ -736,7 +769,7 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
736
769
return err
737
770
}
738
771
739
- // If the namespace does not exists , create it.
772
+ // If the namespace does not exist , create it.
740
773
ns = & corev1.Namespace {
741
774
TypeMeta : metav1.TypeMeta {
742
775
APIVersion : "v1" ,
@@ -754,15 +787,18 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
754
787
}
755
788
756
789
// createGroup creates all the Kubernetes objects into the target management cluster corresponding to the object graph nodes in a moveGroup.
757
- func (o * objectMover ) createGroup (group moveGroup , toProxy Proxy ) error {
790
+ func (o * objectMover ) createGroup (group moveGroup , toProxy Proxy , mutators ... ResourceMutatorFunc ) error {
758
791
createTargetObjectBackoff := newWriteBackoff ()
759
792
errList := []error {}
760
793
794
+ // Maintain a cache of namespaces that have been verified to already exist.
795
+ // Nb. This prevents us from making repetitive (and expensive) calls in listing all namespaces to ensure a namespace exists before creating a resource.
796
+ existingNamespaces := sets .New [string ]()
761
797
for _ , nodeToCreate := range group {
762
798
// Creates the Kubernetes object corresponding to the nodeToCreate.
763
799
// Nb. The operation is wrapped in a retry loop to make move more resilient to unexpected conditions.
764
800
err := retryWithExponentialBackoff (createTargetObjectBackoff , func () error {
765
- return o .createTargetObject (nodeToCreate , toProxy )
801
+ return o .createTargetObject (nodeToCreate , toProxy , mutators , existingNamespaces )
766
802
})
767
803
if err != nil {
768
804
errList = append (errList , err )
@@ -821,7 +857,7 @@ func (o *objectMover) restoreGroup(group moveGroup, toProxy Proxy) error {
821
857
}
822
858
823
859
// createTargetObject creates the Kubernetes object in the target Management cluster corresponding to the object graph node, taking care of restoring the OwnerReference with the owner nodes, if any.
824
- func (o * objectMover ) createTargetObject (nodeToCreate * node , toProxy Proxy ) error {
860
+ func (o * objectMover ) createTargetObject (nodeToCreate * node , toProxy Proxy , mutators [] ResourceMutatorFunc , existingNamespaces sets. Set [ string ] ) error {
825
861
log := logf .Log
826
862
log .V (1 ).Info ("Creating" , nodeToCreate .identity .Kind , nodeToCreate .identity .Name , "Namespace" , nodeToCreate .identity .Namespace )
827
863
@@ -854,7 +890,7 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
854
890
// Removes current OwnerReferences
855
891
obj .SetOwnerReferences (nil )
856
892
857
- // Rebuild the owne reference chain
893
+ // Rebuild the owner reference chain
858
894
o .buildOwnerChain (obj , nodeToCreate )
859
895
860
896
// FIXME Workaround for https://github.com/kubernetes/kubernetes/issues/32220. Remove when the issue is fixed.
@@ -869,6 +905,17 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
869
905
return err
870
906
}
871
907
908
+ obj , err = applyMutators (obj , mutators ... )
909
+ if err != nil {
910
+ return err
911
+ }
912
+ // Applying mutators MAY change the namespace, so ensure the namespace exists before creating the resource.
913
+ if ! nodeToCreate .isGlobal && ! existingNamespaces .Has (obj .GetNamespace ()) {
914
+ if err = o .ensureNamespace (toProxy , obj .GetNamespace ()); err != nil {
915
+ return err
916
+ }
917
+ existingNamespaces .Insert (obj .GetNamespace ())
918
+ }
872
919
oldManagedFields := obj .GetManagedFields ()
873
920
if err := cTo .Create (ctx , obj ); err != nil {
874
921
if ! apierrors .IsAlreadyExists (err ) {
@@ -1195,3 +1242,22 @@ func patchTopologyManagedFields(ctx context.Context, oldManagedFields []metav1.M
1195
1242
}
1196
1243
return nil
1197
1244
}
1245
+
1246
+ func applyMutators (object client.Object , mutators ... ResourceMutatorFunc ) (* unstructured.Unstructured , error ) {
1247
+ if object == nil {
1248
+ return nil , nil
1249
+ }
1250
+ u := & unstructured.Unstructured {}
1251
+ to , err := runtime .DefaultUnstructuredConverter .ToUnstructured (object )
1252
+ if err != nil {
1253
+ return nil , err
1254
+ }
1255
+ u .SetUnstructuredContent (to )
1256
+ for _ , mutator := range mutators {
1257
+ if err := mutator (u ); err != nil {
1258
+ return nil , errors .Wrapf (err , "error applying resource mutator to %q %s/%s" ,
1259
+ u .GroupVersionKind (), object .GetNamespace (), object .GetName ())
1260
+ }
1261
+ }
1262
+ return u , nil
1263
+ }
0 commit comments