88 "os"
99 "time"
1010
11- "github.com/konveyor/crane-lib/state_transfer"
1211 "github.com/konveyor/crane-lib/state_transfer/endpoint"
12+ "github.com/konveyor/crane-lib/state_transfer/endpoint/ingress"
1313 "github.com/konveyor/crane-lib/state_transfer/endpoint/route"
1414 "github.com/konveyor/crane-lib/state_transfer/meta"
1515 metadata "github.com/konveyor/crane-lib/state_transfer/meta"
@@ -21,9 +21,12 @@ import (
2121 "github.com/sirupsen/logrus"
2222 "github.com/spf13/cobra"
2323 corev1 "k8s.io/api/core/v1"
24+ networkingv1 "k8s.io/api/networking/v1"
2425 "k8s.io/apimachinery/pkg/api/errors"
2526 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2628 "k8s.io/apimachinery/pkg/types"
29+ errorsutil "k8s.io/apimachinery/pkg/util/errors"
2730 "k8s.io/apimachinery/pkg/util/wait"
2831 "k8s.io/cli-runtime/pkg/genericclioptions"
2932 "k8s.io/client-go/kubernetes"
@@ -42,6 +45,7 @@ type TransferPVCOptions struct {
4245 DestinationContext string
4346 PVCName string
4447 PVCNamespace string
48+ Endpoint string
4549
4650 // TODO: add more fields for PVC mapping/think of a config file to get inputs?
4751 sourceContext * clientcmdapi.Context
@@ -83,6 +87,7 @@ func addFlagsForTransferPVCOptions(t *TransferPVCOptions, cmd *cobra.Command) {
8387 cmd .Flags ().StringVar (& t .DestinationContext , "destination-context" , "" , "The name of destination context current kubeconfig" )
8488 cmd .Flags ().StringVar (& t .PVCNamespace , "pvc-namespace" , "" , "The namespace of the pvc which is to be transferred, if empty it will try to use the namespace in source-context, if both are empty it will error" )
8589 cmd .Flags ().StringVar (& t .PVCName , "pvc-name" , "" , "The pvc name which is to be transferred on the source" )
90+ cmd .Flags ().StringVar (& t .Endpoint , "endpoint" , "nignx-ingress" , "The type of networking endpoing to use to accept traffic in destination cluster. The options available are `nginx-ingress` and `route`" )
8691}
8792
8893func (t * TransferPVCOptions ) Complete (c * cobra.Command , args []string ) error {
@@ -181,12 +186,6 @@ func (t *TransferPVCOptions) run() error {
181186 log .Fatal (err , "unable to get destination client" )
182187 }
183188
184- // quiesce the applications if needed on the source side
185- err = state_transfer .QuiesceApplications (srcCfg , t .PVCNamespace )
186- if err != nil {
187- log .Fatal (err , "unable to quiesce application on source cluster" )
188- }
189-
190189 // set up the PVC on destination to receive the data
191190 pvc := & corev1.PersistentVolumeClaim {}
192191 err = srcClient .Get (context .TODO (), client.ObjectKey {Namespace : t .PVCNamespace , Name : t .PVCName }, pvc )
@@ -211,39 +210,15 @@ func (t *TransferPVCOptions) run() error {
211210 log .Fatal (err , "invalid pvc list" )
212211 }
213212
214- // create a route for data transfer
215- // TODO: pass in subdomain instead of ""
216- r := route .NewEndpoint (
217- types.NamespacedName {
218- Namespace : pvc .Namespace ,
219- Name : pvc .Name ,
220- }, route .EndpointTypePassthrough , metadata .Labels , "" )
221- e , err := endpoint .Create (r , destClient )
222- if err != nil {
223- log .Fatal (err , "unable to create route endpoint" )
224- }
225-
226- _ = wait .PollUntil (time .Second * 5 , func () (done bool , err error ) {
227- e , err := route .GetEndpointFromKubeObjects (destClient , e .NamespacedName ())
228- if err != nil {
229- log .Println (err , "unable to check route health, retrying..." )
230- return false , nil
231- }
232- ready , err := e .IsHealthy (destClient )
233- if err != nil {
234- log .Println (err , "unable to check route health, retrying..." )
235- return false , nil
236- }
237- return ready , nil
238- }, make (<- chan struct {}))
239-
240- e , err = route .GetEndpointFromKubeObjects (destClient , e .NamespacedName ())
241- if err != nil {
242- log .Fatal (err , "unable to get the route object" )
243- } else {
244- log .Println ("route endpoint is created and is healthy" )
213+ var e endpoint.Endpoint
214+ switch t .Endpoint {
215+ case "route" :
216+ e = createAndWaitForRoute (pvc , destClient )
217+ case "nignx-ingress" :
218+ e = createAndWaitForIngress (pvc , destClient )
219+ default :
220+ log .Fatalf ("unsupported endpoint type %s\n " , t .Endpoint )
245221 }
246-
247222 // create an stunnel transport to carry the data over the route
248223
249224 s := stunnel .NewTransport (meta .NewNamespacedPair (
@@ -278,7 +253,7 @@ func (t *TransferPVCOptions) run() error {
278253 rsync .Username ("root" ),
279254 }
280255
281- rsyncTransfer , err := rsync .NewTransfer (s , r , srcCfg , destCfg , pvcList , rsyncTransferOptions ... )
256+ rsyncTransfer , err := rsync .NewTransfer (s , e , srcCfg , destCfg , pvcList , rsyncTransferOptions ... )
282257 if err != nil {
283258 log .Fatal (err , "error creating rsync transfer" )
284259 } else {
@@ -312,6 +287,80 @@ func (t *TransferPVCOptions) run() error {
312287 log .Fatal (err , "error following rsync client logs" )
313288 }
314289
290+ log .Println ("followed the logs, garbage collecting created resources on both source and destination" )
291+ return garbageCollect (srcClient , destClient , map [string ]string {"app" : "crane2" }, t .Endpoint , t .PVCNamespace )
292+ }
293+
294+ func garbageCollect (srcClient client.Client , destClient client.Client , labels map [string ]string , endpoint , namespace string ) error {
295+ srcGVK := []client.Object {
296+ & corev1.Pod {},
297+ & corev1.ConfigMap {},
298+ & corev1.Secret {},
299+ }
300+ destGVK := []client.Object {
301+ & corev1.Pod {},
302+ & corev1.ConfigMap {},
303+ & corev1.Secret {},
304+ }
305+ switch endpoint {
306+ case "route" :
307+ destGVK = append (destGVK , & routev1.Route {})
308+ case "nignx-ingress" :
309+ destGVK = append (destGVK , & networkingv1.Ingress {})
310+ }
311+
312+ err := deleteResourcesForGVK (srcClient , srcGVK , labels , namespace )
313+ if err != nil {
314+ return err
315+ }
316+
317+ err = deleteResourcesForGVK (destClient , destGVK , labels , namespace )
318+ if err != nil {
319+ return err
320+ }
321+
322+ return deleteResourcesIteratively (destClient , []client.Object {
323+ & corev1.Service {
324+ TypeMeta : metav1.TypeMeta {
325+ Kind : "Service" ,
326+ APIVersion : corev1 .SchemeGroupVersion .Version ,
327+ },
328+ }}, labels , namespace )
329+ }
330+
331+ func deleteResourcesIteratively (c client.Client , iterativeTypes []client.Object , labels map [string ]string , namespace string ) error {
332+ listOptions := []client.ListOption {
333+ client .MatchingLabels (labels ),
334+ client .InNamespace (namespace ),
335+ }
336+ errs := []error {}
337+ for _ , objList := range iterativeTypes {
338+ ulist := & unstructured.UnstructuredList {}
339+ ulist .SetGroupVersionKind (objList .GetObjectKind ().GroupVersionKind ())
340+ err := c .List (context .TODO (), ulist , listOptions ... )
341+ if err != nil {
342+ // if we hit error with one api still try all others
343+ errs = append (errs , err )
344+ continue
345+ }
346+ for _ , item := range ulist .Items {
347+ err = c .Delete (context .TODO (), & item , client .PropagationPolicy (metav1 .DeletePropagationBackground ))
348+ if err != nil {
349+ // if we hit error deleting on continue delete others
350+ errs = append (errs , err )
351+ }
352+ }
353+ }
354+ return errorsutil .NewAggregate (errs )
355+ }
356+
357+ func deleteResourcesForGVK (c client.Client , gvk []client.Object , labels map [string ]string , namespace string ) error {
358+ for _ , obj := range gvk {
359+ err := c .DeleteAllOf (context .TODO (), obj , client .InNamespace (namespace ), client .MatchingLabels (labels ))
360+ if err != nil {
361+ return err
362+ }
363+ }
315364 return nil
316365}
317366
@@ -334,6 +383,10 @@ func followClientLogs(srcConfig *rest.Config, c client.Client, namespace string,
334383 clientPod = & clientPodList .Items [0 ]
335384
336385 for _ , containerStatus := range clientPod .Status .ContainerStatuses {
386+ if containerStatus .State .Terminated != nil && containerStatus .State .Terminated .ExitCode == 0 {
387+ log .Printf ("container %s in pod %s completed successfully" , containerStatus .Name , client.ObjectKey {Namespace : namespace , Name : clientPod .Name })
388+ break
389+ }
337390 if ! containerStatus .Ready {
338391 log .Println (fmt .Errorf ("container %s in pod %s is not ready" , containerStatus .Name , client.ObjectKey {Namespace : namespace , Name : clientPod .Name }))
339392 return false , nil
@@ -370,6 +423,80 @@ func followClientLogs(srcConfig *rest.Config, c client.Client, namespace string,
370423 return err
371424}
372425
426+ func createAndWaitForIngress (pvc * corev1.PersistentVolumeClaim , destClient client.Client ) endpoint.Endpoint {
427+ // create a route for data transfer
428+ // TODO: add a config flag for subdomain
429+ r := ingress .NewEndpoint (
430+ types.NamespacedName {
431+ Namespace : pvc .Namespace ,
432+ Name : pvc .Name ,
433+ }, metadata .Labels , "crane.dev" )
434+ e , err := endpoint .Create (r , destClient )
435+ if err != nil {
436+ log .Fatal (err , "unable to create endpoint" )
437+ }
438+
439+ _ = wait .PollUntil (time .Second * 5 , func () (done bool , err error ) {
440+ e , err := ingress .GetEndpointFromKubeObjects (destClient , e .NamespacedName ())
441+ if err != nil {
442+ log .Println (err , "unable to check health, retrying..." )
443+ return false , nil
444+ }
445+ ready , err := e .IsHealthy (destClient )
446+ if err != nil {
447+ log .Println (err , "unable to check health, retrying..." )
448+ return false , nil
449+ }
450+ return ready , nil
451+ }, make (<- chan struct {}))
452+
453+ e , err = ingress .GetEndpointFromKubeObjects (destClient , e .NamespacedName ())
454+ if err != nil {
455+ log .Fatal (err , "unable to get the route object" )
456+ } else {
457+ log .Println ("endpoint is created and is healthy" )
458+ }
459+
460+ return e
461+ }
462+
463+ func createAndWaitForRoute (pvc * corev1.PersistentVolumeClaim , destClient client.Client ) endpoint.Endpoint {
464+ // create a route for data transfer
465+ // TODO: pass in subdomain instead of ""
466+ r := route .NewEndpoint (
467+ types.NamespacedName {
468+ Namespace : pvc .Namespace ,
469+ Name : pvc .Name ,
470+ }, route .EndpointTypePassthrough , metadata .Labels , "" )
471+ e , err := endpoint .Create (r , destClient )
472+ if err != nil {
473+ log .Fatal (err , "unable to create route endpoint" )
474+ }
475+
476+ _ = wait .PollUntil (time .Second * 5 , func () (done bool , err error ) {
477+ e , err := route .GetEndpointFromKubeObjects (destClient , e .NamespacedName ())
478+ if err != nil {
479+ log .Println (err , "unable to check route health, retrying..." )
480+ return false , nil
481+ }
482+ ready , err := e .IsHealthy (destClient )
483+ if err != nil {
484+ log .Println (err , "unable to check route health, retrying..." )
485+ return false , nil
486+ }
487+ return ready , nil
488+ }, make (<- chan struct {}))
489+
490+ e , err = route .GetEndpointFromKubeObjects (destClient , e .NamespacedName ())
491+ if err != nil {
492+ log .Fatal (err , "unable to get the route object" )
493+ } else {
494+ log .Println ("route endpoint is created and is healthy" )
495+ }
496+
497+ return e
498+ }
499+
373500func clearDestPVC (destPVC * corev1.PersistentVolumeClaim ) {
374501 // TODO: some of this needs to be configuration option exposed to the user
375502 destPVC .ResourceVersion = ""
0 commit comments