@@ -29,6 +29,7 @@ import (
29
29
vmoperatortypes "github.com/vmware-tanzu/vm-operator/api/v1alpha5"
30
30
cnstypes "github.com/vmware/govmomi/cns/types"
31
31
"go.uber.org/zap"
32
+ syncgroup "golang.org/x/sync/errgroup"
32
33
corev1 "k8s.io/api/core/v1"
33
34
apierrors "k8s.io/apimachinery/pkg/api/errors"
34
35
"k8s.io/apimachinery/pkg/runtime"
@@ -292,7 +293,7 @@ func (r *ReconcileVirtualMachineSnapshot) reconcileNormal(ctx context.Context, l
292
293
// if found fetch vmsnapshot and pvcs and pvs
293
294
vmKey := apitypes.NamespacedName {
294
295
Namespace : vmsnapshot .Namespace ,
295
- Name : vmsnapshot .Spec .VMRef . Name ,
296
+ Name : vmsnapshot .Spec .VMName ,
296
297
}
297
298
log .Infof ("reconcileNormal: get virtulal machine %s/%s" , vmKey .Namespace , vmKey .Name )
298
299
virtualMachine , _ , err := utils .GetVirtualMachineAllApiVersions (ctx , vmKey ,
@@ -426,6 +427,7 @@ func (r *ReconcileVirtualMachineSnapshot) syncVolumesAndUpdateCNSVolumeInfo(ctx
426
427
var err error
427
428
cnsVolumeIds := []cnstypes.CnsVolumeId {}
428
429
syncMode := []string {string (cnstypes .CnsSyncVolumeModeSPACE_USAGE )}
430
+ syncgrp , sgctx := syncgroup .WithContext (ctx )
429
431
for _ , vmVolume := range vm .Spec .Volumes {
430
432
if vmVolume .VirtualMachineVolumeSource .PersistentVolumeClaim == nil {
431
433
continue
@@ -461,15 +463,10 @@ func (r *ReconcileVirtualMachineSnapshot) syncVolumesAndUpdateCNSVolumeInfo(ctx
461
463
SyncMode : syncMode ,
462
464
},
463
465
}
464
- // Trigger CNS VolumeSync API for identified volume-lds and Fetch Latest Aggregated snapshot size
465
- log .Infof ("syncVolumesAndUpdateCNSVolumeInfo: Trigger CNS VolumeSync API for volume %s" ,
466
- cnsVolId )
467
- syncVolumeFaultType , err := r .volumeManager .SyncVolume (ctx , syncVolumeSpecs )
468
- if err != nil {
469
- log .Errorf ("syncVolumesAndUpdateCNSVolumeInfo: error while sync volume %s " +
470
- "cnsfault %s. error: %v" , cnsVolId , syncVolumeFaultType , err )
471
- return err
472
- }
466
+ // parallelize sync volume api calls
467
+ syncgrp .Go (func () error {
468
+ return r .invokeSyncVolume (sgctx , log , syncVolumeSpecs )
469
+ })
473
470
}
474
471
} else {
475
472
err = fmt .Errorf ("could not find the PV associated with PVC %s/%s" ,
@@ -478,10 +475,19 @@ func (r *ReconcileVirtualMachineSnapshot) syncVolumesAndUpdateCNSVolumeInfo(ctx
478
475
return err
479
476
}
480
477
}
478
+ // if syncvolume api is not invoked for any volume, no need to update CNSVolumeInfos
481
479
if len (cnsVolumeIds ) == 0 {
482
- log .Infof ("syncVolumesAndUpdateCNSVolumeInfo: no volumes found to sync, skipping volume sync" )
480
+ log .Info ("syncVolumesAndUpdateCNSVolumeInfo: no volumes found to sync, skipped volume sync" )
483
481
return nil
484
482
}
483
+ log .Info ("syncVolumesAndUpdateCNSVolumeInfo: wait for syncvolume operation to be completed" )
484
+ if err := syncgrp .Wait (); err != nil {
485
+ log .Errorf ("syncVolumesAndUpdateCNSVolumeInfo: error while sync volume " +
486
+ "error: %v" , err )
487
+ return err
488
+ }
489
+ log .Info ("syncVolumesAndUpdateCNSVolumeInfo: volumes are synced successfully. " +
490
+ "will update related CNSVolumeInfos" )
485
491
// fetch updated cns volumes
486
492
queryFilter := cnstypes.CnsQueryFilter {
487
493
VolumeIds : cnsVolumeIds ,
@@ -500,7 +506,7 @@ func (r *ReconcileVirtualMachineSnapshot) syncVolumesAndUpdateCNSVolumeInfo(ctx
500
506
501
507
// Update CNSVolumeInfo with latest aggregated Size and Update SPU used value.
502
508
patch , err := common .GetCNSVolumeInfoPatch (ctx , val .AggregatedSnapshotCapacityInMb ,
503
- cnsvolume .VolumeId .Id ) // TODO: UDPATE to value returned
509
+ cnsvolume .VolumeId .Id )
504
510
if err != nil {
505
511
log .Errorf ("syncVolumesAndUpdateCNSVolumeInfo: failed to get cnsvolumeinfo patch for " +
506
512
"volume %s, error: %v" , cnsvolume .VolumeId .Id , err )
@@ -529,3 +535,24 @@ func (r *ReconcileVirtualMachineSnapshot) syncVolumesAndUpdateCNSVolumeInfo(ctx
529
535
}
530
536
return nil
531
537
}
538
+
539
+ func (r * ReconcileVirtualMachineSnapshot ) invokeSyncVolume (ctx context.Context , log * zap.SugaredLogger ,
540
+ syncVolumeSpecs []cnstypes.CnsSyncVolumeSpec ) error {
541
+ select {
542
+ case <- ctx .Done ():
543
+ log .Infof ("invokeSyncVolume: Sync Volume Operation cancelled for volume %s" ,
544
+ syncVolumeSpecs [0 ])
545
+ return ctx .Err ()
546
+ default :
547
+ // Trigger CNS VolumeSync API for identified volume-lds and Fetch Latest Aggregated snapshot size
548
+ log .Infof ("invokeSyncVolume: Trigger CNS SyncVolume API for volume %s" ,
549
+ syncVolumeSpecs [0 ])
550
+ syncVolumeFaultType , err := r .volumeManager .SyncVolume (ctx , syncVolumeSpecs )
551
+ if err != nil {
552
+ log .Errorf ("invokeSyncVolume: error while sync volume %s " +
553
+ "cnsfault %s. error: %v" , syncVolumeSpecs [0 ], syncVolumeFaultType , err )
554
+ return err
555
+ }
556
+ return nil
557
+ }
558
+ }
0 commit comments