@@ -81,19 +81,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
81
81
return nil , status .Error (codes .InvalidArgument , err .Error ())
82
82
}
83
83
84
- if acquired := d .volumeLocks .TryAcquire (volName ); ! acquired {
85
- // logging the job status if it's volume cloning
86
- if req .GetVolumeContentSource () != nil {
87
- jobState , percent , err := d .azcopy .GetAzcopyJob (volName , []string {})
88
- return nil , status .Errorf (codes .Aborted , volumeOperationAlreadyExistsWithAzcopyFmt , volName , jobState , percent , err )
89
- }
90
- return nil , status .Errorf (codes .Aborted , volumeOperationAlreadyExistsFmt , volName )
91
- }
92
- defer d .volumeLocks .Release (volName )
93
-
94
84
volSizeBytes := int64 (req .GetCapacityRange ().GetRequiredBytes ())
95
85
requestGiB := int (util .RoundUpGiB (volSizeBytes ))
96
86
87
+ volContentSource := req .GetVolumeContentSource ()
88
+ secrets := req .GetSecrets ()
89
+
97
90
parameters := req .GetParameters ()
98
91
if parameters == nil {
99
92
parameters = make (map [string ]string )
@@ -351,10 +344,31 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
351
344
GetLatestAccountKey : getLatestAccountKey ,
352
345
}
353
346
347
+ containerName = replaceWithMap (containerName , containerNameReplaceMap )
348
+ validContainerName := containerName
349
+ if validContainerName == "" {
350
+ validContainerName = volName
351
+ if containerNamePrefix != "" {
352
+ validContainerName = containerNamePrefix + "-" + volName
353
+ }
354
+ validContainerName = getValidContainerName (validContainerName , protocol )
355
+ setKeyValueInMap (parameters , containerNameField , validContainerName )
356
+ }
357
+
358
+ if acquired := d .volumeLocks .TryAcquire (volName ); ! acquired {
359
+ // logging the job status if it's volume cloning
360
+ if volContentSource != nil {
361
+ jobState , percent , err := d .azcopy .GetAzcopyJob (validContainerName , []string {})
362
+ return nil , status .Errorf (codes .Aborted , volumeOperationAlreadyExistsWithAzcopyFmt , volName , jobState , percent , err )
363
+ }
364
+ return nil , status .Errorf (codes .Aborted , volumeOperationAlreadyExistsFmt , volName )
365
+ }
366
+ defer d .volumeLocks .Release (volName )
367
+
354
368
var volumeID string
355
369
requestName := "controller_create_volume"
356
- if req . GetVolumeContentSource () != nil {
357
- switch req . VolumeContentSource .Type .(type ) {
370
+ if volContentSource != nil {
371
+ switch volContentSource .Type .(type ) {
358
372
case * csi.VolumeContentSource_Snapshot :
359
373
requestName = "controller_create_volume_from_snapshot"
360
374
case * csi.VolumeContentSource_Volume :
@@ -367,9 +381,39 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
367
381
mc .ObserveOperationWithResult (isOperationSucceeded , VolumeID , volumeID )
368
382
}()
369
383
384
+ var srcAzcopyAuthEnv []string
385
+ var srcSubscriptionID , srcResourceGroupName , srcAccountName , srcContainerName , srcPath , srcAccountSASToken string
386
+ if volContentSource != nil {
387
+ switch volContentSource .Type .(type ) {
388
+ case * csi.VolumeContentSource_Snapshot :
389
+ return nil , status .Errorf (codes .InvalidArgument , "VolumeContentSource Snapshot is not yet implemented" )
390
+ case * csi.VolumeContentSource_Volume :
391
+ var srcVolumeID string
392
+ if volContentSource .GetVolume () != nil {
393
+ srcVolumeID = volContentSource .GetVolume ().GetVolumeId ()
394
+ }
395
+ srcResourceGroupName , srcAccountName , srcContainerName , _ , srcSubscriptionID , err = GetContainerInfo (srcVolumeID )
396
+ if err != nil {
397
+ return nil , status .Error (codes .NotFound , err .Error ())
398
+ }
399
+ srcAccountOptions := & azure.AccountOptions {
400
+ Name : srcAccountName ,
401
+ SubscriptionID : srcSubscriptionID ,
402
+ ResourceGroup : srcResourceGroupName ,
403
+ GetLatestAccountKey : getLatestAccountKey ,
404
+ }
405
+ srcAccountSASToken , srcAzcopyAuthEnv , err = d .getAzcopyAuth (ctx , srcAccountName , "" , storageEndpointSuffix , srcAccountOptions , secrets , secretName , secretNamespace )
406
+ if err != nil {
407
+ return nil , status .Errorf (codes .Internal , "failed to getAzcopyAuth on account(%s) rg(%s), error: %v" , accountOptions .Name , accountOptions .ResourceGroup , err )
408
+ }
409
+ srcPath = fmt .Sprintf ("https://%s.blob.%s/%s" , srcAccountName , storageEndpointSuffix , srcContainerName )
410
+ default :
411
+ return nil , status .Errorf (codes .InvalidArgument , "VolumeContentSource is not recognized: %v" , volContentSource )
412
+ }
413
+ }
414
+
370
415
var accountKey string
371
416
accountName := account
372
- secrets := req .GetSecrets ()
373
417
if len (secrets ) == 0 && accountName == "" {
374
418
if v , ok := d .volMap .Load (volName ); ok {
375
419
accountName = v .(string )
@@ -423,31 +467,24 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
423
467
secrets = createStorageAccountSecret (accountName , accountKey )
424
468
}
425
469
426
- // replace pv/pvc name namespace metadata in subDir
427
- containerName = replaceWithMap (containerName , containerNameReplaceMap )
428
- validContainerName := containerName
429
- if validContainerName == "" {
430
- validContainerName = volName
431
- if containerNamePrefix != "" {
432
- validContainerName = containerNamePrefix + "-" + volName
433
- }
434
- validContainerName = getValidContainerName (validContainerName , protocol )
435
- setKeyValueInMap (parameters , containerNameField , validContainerName )
470
+ klog .V (2 ).Infof ("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)" , validContainerName , accountName , storageAccountType , subsID , resourceGroup , location , requestGiB )
471
+ if err := d .CreateBlobContainer (ctx , subsID , resourceGroup , accountName , validContainerName , secrets ); err != nil {
472
+ return nil , status .Errorf (codes .Internal , "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v" , validContainerName , accountName , storageAccountType , resourceGroup , location , requestGiB , err )
436
473
}
437
474
438
- if req .GetVolumeContentSource () != nil {
439
- accountSASToken , authAzcopyEnv , err := d .getAzcopyAuth (ctx , accountName , accountKey , storageEndpointSuffix , accountOptions , secrets , secretName , secretNamespace )
440
- if err != nil {
441
- return nil , status .Errorf (codes .Internal , "failed to getAzcopyAuth on account(%s) rg(%s), error: %v" , accountOptions .Name , accountOptions .ResourceGroup , err )
475
+ if volContentSource != nil {
476
+ dstAzcopyAuthEnv := srcAzcopyAuthEnv
477
+ dstAccountSASToken := srcAccountSASToken
478
+ if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName {
479
+ if dstAccountSASToken , dstAzcopyAuthEnv , err = d .getAzcopyAuth (ctx , accountName , accountKey , storageEndpointSuffix , accountOptions , secrets , secretName , secretNamespace ); err != nil {
480
+ return nil , status .Errorf (codes .Internal , "failed to getAzcopyAuth on account(%s) rg(%s), error: %v" , accountOptions .Name , accountOptions .ResourceGroup , err )
481
+ }
442
482
}
443
- if err := d .copyVolume (req , accountSASToken , authAzcopyEnv , validContainerName , storageEndpointSuffix ); err != nil {
483
+
484
+ dstPath := fmt .Sprintf ("https://%s.blob.%s/%s" , accountName , storageEndpointSuffix , validContainerName )
485
+ if err := d .copyBlobContainer (dstAzcopyAuthEnv , srcPath , srcAccountSASToken , dstPath , dstAccountSASToken , validContainerName ); err != nil {
444
486
return nil , err
445
487
}
446
- } else {
447
- klog .V (2 ).Infof ("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)" , validContainerName , accountName , storageAccountType , subsID , resourceGroup , location , requestGiB )
448
- if err := d .CreateBlobContainer (ctx , subsID , resourceGroup , accountName , validContainerName , secrets ); err != nil {
449
- return nil , status .Errorf (codes .Internal , "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v" , validContainerName , accountName , storageAccountType , resourceGroup , location , requestGiB , err )
450
- }
451
488
}
452
489
453
490
if storeAccountKey && len (req .GetSecrets ()) == 0 {
@@ -488,7 +525,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
488
525
VolumeId : volumeID ,
489
526
CapacityBytes : req .GetCapacityRange ().GetRequiredBytes (),
490
527
VolumeContext : parameters ,
491
- ContentSource : req . GetVolumeContentSource () ,
528
+ ContentSource : volContentSource ,
492
529
},
493
530
}, nil
494
531
}
@@ -755,23 +792,12 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
755
792
})
756
793
}
757
794
758
- // CopyBlobContainer copies a blob container in the same storage account
759
- func (d * Driver ) copyBlobContainer (req * csi.CreateVolumeRequest , accountSasToken string , authAzcopyEnv []string , dstContainerName , storageEndpointSuffix string ) error {
760
- var sourceVolumeID string
761
- if req .GetVolumeContentSource () != nil && req .GetVolumeContentSource ().GetVolume () != nil {
762
- sourceVolumeID = req .GetVolumeContentSource ().GetVolume ().GetVolumeId ()
795
+ // copyBlobContainer copies source volume content into a destination volume
796
+ func (d * Driver ) copyBlobContainer (authAzcopyEnv []string , srcPath string , srcAccountSASToken string , dstPath string , dstAccountSASToken string , dstContainerName string ) error {
763
797
798
+ if srcPath == "" || dstPath == "" || dstContainerName == "" {
799
+ return fmt .Errorf ("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty" , srcPath , dstPath , dstContainerName )
764
800
}
765
- resourceGroupName , accountName , srcContainerName , _ , _ , err := GetContainerInfo (sourceVolumeID ) //nolint:dogsled
766
- if err != nil {
767
- return status .Error (codes .NotFound , err .Error ())
768
- }
769
- if srcContainerName == "" || dstContainerName == "" {
770
- return fmt .Errorf ("srcContainerName(%s) or dstContainerName(%s) is empty" , srcContainerName , dstContainerName )
771
- }
772
-
773
- srcPath := fmt .Sprintf ("https://%s.blob.%s/%s%s" , accountName , storageEndpointSuffix , srcContainerName , accountSasToken )
774
- dstPath := fmt .Sprintf ("https://%s.blob.%s/%s%s" , accountName , storageEndpointSuffix , dstContainerName , accountSasToken )
775
801
776
802
jobState , percent , err := d .azcopy .GetAzcopyJob (dstContainerName , authAzcopyEnv )
777
803
klog .V (2 ).Infof ("azcopy job status: %s, copy percent: %s%%, error: %v" , jobState , percent , err )
@@ -781,9 +807,9 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
781
807
case util .AzcopyJobRunning :
782
808
return fmt .Errorf ("wait for the existing AzCopy job to complete, current copy percentage is %s%%" , percent )
783
809
case util .AzcopyJobNotFound :
784
- klog .V (2 ).Infof ("copy blob container %s to %s" , srcContainerName , dstContainerName )
810
+ klog .V (2 ).Infof ("copy blob container %s to %s" , srcPath , dstContainerName )
785
811
execFunc := func () error {
786
- cmd := exec .Command ("azcopy" , "copy" , srcPath , dstPath , "--recursive" , "--check-length=false" )
812
+ cmd := exec .Command ("azcopy" , "copy" , srcPath + srcAccountSASToken , dstPath + dstAccountSASToken , "--recursive" , "--check-length=false" )
787
813
if len (authAzcopyEnv ) > 0 {
788
814
cmd .Env = append (os .Environ (), authAzcopyEnv ... )
789
815
}
@@ -794,32 +820,19 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
794
820
}
795
821
timeoutFunc := func () error {
796
822
_ , percent , _ := d .azcopy .GetAzcopyJob (dstContainerName , authAzcopyEnv )
797
- return fmt .Errorf ("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%" , srcContainerName , dstContainerName , percent )
823
+ return fmt .Errorf ("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%" , srcPath , dstContainerName , percent )
798
824
}
799
825
copyErr := util .WaitUntilTimeout (time .Duration (d .waitForAzCopyTimeoutMinutes )* time .Minute , execFunc , timeoutFunc )
800
826
if copyErr != nil {
801
- klog .Warningf ("CopyBlobContainer(%s, %s, %s) failed with error: %v" , resourceGroupName , accountName , dstPath , copyErr )
827
+ klog .Warningf ("CopyBlobContainer(%s, %s, %s) failed with error: %v" , srcPath , dstPath , dstContainerName , copyErr )
802
828
} else {
803
- klog .V (2 ).Infof ("copied blob container %s to %s successfully" , srcContainerName , dstContainerName )
829
+ klog .V (2 ).Infof ("copied blob container %s to %s successfully" , srcPath , dstContainerName )
804
830
}
805
831
return copyErr
806
832
}
807
833
return err
808
834
}
809
835
810
- // copyVolume copies a volume form volume or snapshot, snapshot is not supported now
811
- func (d * Driver ) copyVolume (req * csi.CreateVolumeRequest , accountSASToken string , authAzcopyEnv []string , dstContainerName , storageEndpointSuffix string ) error {
812
- vs := req .VolumeContentSource
813
- switch vs .Type .(type ) {
814
- case * csi.VolumeContentSource_Snapshot :
815
- return status .Errorf (codes .InvalidArgument , "copy volume from volumeSnapshot is not supported" )
816
- case * csi.VolumeContentSource_Volume :
817
- return d .copyBlobContainer (req , accountSASToken , authAzcopyEnv , dstContainerName , storageEndpointSuffix )
818
- default :
819
- return status .Errorf (codes .InvalidArgument , "%v is not a proper volume source" , vs )
820
- }
821
- }
822
-
823
836
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
824
837
func (d * Driver ) authorizeAzcopyWithIdentity () ([]string , error ) {
825
838
azureAuthConfig := d .cloud .Config .AzureAuthConfig
0 commit comments