@@ -81,19 +81,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
81
81
return nil , status .Error (codes .InvalidArgument , err .Error ())
82
82
}
83
83
84
- if acquired := d .volumeLocks .TryAcquire (volName ); ! acquired {
85
- // logging the job status if it's volume cloning
86
- if req .GetVolumeContentSource () != nil {
87
- jobState , percent , err := d .azcopy .GetAzcopyJob (volName , []string {})
88
- return nil , status .Errorf (codes .Aborted , volumeOperationAlreadyExistsWithAzcopyFmt , volName , jobState , percent , err )
89
- }
90
- return nil , status .Errorf (codes .Aborted , volumeOperationAlreadyExistsFmt , volName )
91
- }
92
- defer d .volumeLocks .Release (volName )
93
-
94
84
volSizeBytes := int64 (req .GetCapacityRange ().GetRequiredBytes ())
95
85
requestGiB := int (util .RoundUpGiB (volSizeBytes ))
96
86
87
+ volContentSource := req .GetVolumeContentSource ()
88
+ secrets := req .GetSecrets ()
89
+
97
90
parameters := req .GetParameters ()
98
91
if parameters == nil {
99
92
parameters = make (map [string ]string )
@@ -351,10 +344,31 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
351
344
GetLatestAccountKey : getLatestAccountKey ,
352
345
}
353
346
347
+ containerName = replaceWithMap (containerName , containerNameReplaceMap )
348
+ validContainerName := containerName
349
+ if validContainerName == "" {
350
+ validContainerName = volName
351
+ if containerNamePrefix != "" {
352
+ validContainerName = containerNamePrefix + "-" + volName
353
+ }
354
+ validContainerName = getValidContainerName (validContainerName , protocol )
355
+ setKeyValueInMap (parameters , containerNameField , validContainerName )
356
+ }
357
+
358
+ if acquired := d .volumeLocks .TryAcquire (volName ); ! acquired {
359
+ // logging the job status if it's volume cloning
360
+ if volContentSource != nil {
361
+ jobState , percent , err := d .azcopy .GetAzcopyJob (validContainerName , []string {})
362
+ return nil , status .Errorf (codes .Aborted , volumeOperationAlreadyExistsWithAzcopyFmt , volName , jobState , percent , err )
363
+ }
364
+ return nil , status .Errorf (codes .Aborted , volumeOperationAlreadyExistsFmt , volName )
365
+ }
366
+ defer d .volumeLocks .Release (volName )
367
+
354
368
var volumeID string
355
369
requestName := "controller_create_volume"
356
- if req . GetVolumeContentSource () != nil {
357
- switch req . VolumeContentSource .Type .(type ) {
370
+ if volContentSource != nil {
371
+ switch volContentSource .Type .(type ) {
358
372
case * csi.VolumeContentSource_Snapshot :
359
373
requestName = "controller_create_volume_from_snapshot"
360
374
case * csi.VolumeContentSource_Volume :
@@ -367,9 +381,39 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
367
381
mc .ObserveOperationWithResult (isOperationSucceeded , VolumeID , volumeID )
368
382
}()
369
383
384
+ var srcAzcopyAuthEnv []string
385
+ var srcSubscriptionID , srcResourceGroupName , srcAccountName , srcContainerName , srcPath , srcAccountSASToken string
386
+ if volContentSource != nil {
387
+ switch volContentSource .Type .(type ) {
388
+ case * csi.VolumeContentSource_Snapshot :
389
+ return nil , status .Errorf (codes .InvalidArgument , "VolumeContentSource Snapshot is not yet implemented" )
390
+ case * csi.VolumeContentSource_Volume :
391
+ var srcVolumeID string
392
+ if volContentSource .GetVolume () != nil {
393
+ srcVolumeID = volContentSource .GetVolume ().GetVolumeId ()
394
+ }
395
+ srcResourceGroupName , srcAccountName , srcContainerName , _ , srcSubscriptionID , err = GetContainerInfo (srcVolumeID )
396
+ if err != nil {
397
+ return nil , status .Error (codes .NotFound , err .Error ())
398
+ }
399
+ srcAccountOptions := & azure.AccountOptions {
400
+ Name : srcAccountName ,
401
+ SubscriptionID : srcSubscriptionID ,
402
+ ResourceGroup : srcResourceGroupName ,
403
+ GetLatestAccountKey : getLatestAccountKey ,
404
+ }
405
+ srcAccountSASToken , srcAzcopyAuthEnv , err = d .getAzcopyAuth (ctx , srcAccountName , "" , storageEndpointSuffix , srcAccountOptions , secrets , secretName , secretNamespace )
406
+ if err != nil {
407
+ return nil , status .Errorf (codes .Internal , "failed to getAzcopyAuth on account(%s) rg(%s), error: %v" , accountOptions .Name , accountOptions .ResourceGroup , err )
408
+ }
409
+ srcPath = fmt .Sprintf ("https://%s.blob.%s/%s" , srcAccountName , storageEndpointSuffix , srcContainerName )
410
+ default :
411
+ return nil , status .Errorf (codes .InvalidArgument , "VolumeContentSource is not recognized: %v" , volContentSource )
412
+ }
413
+ }
414
+
370
415
var accountKey string
371
416
accountName := account
372
- secrets := req .GetSecrets ()
373
417
if len (secrets ) == 0 && accountName == "" {
374
418
if v , ok := d .volMap .Load (volName ); ok {
375
419
accountName = v .(string )
@@ -423,31 +467,26 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
423
467
secrets = createStorageAccountSecret (accountName , accountKey )
424
468
}
425
469
426
- // replace pv/pvc name namespace metadata in subDir
427
- containerName = replaceWithMap ( containerName , containerNameReplaceMap )
428
- validContainerName := containerName
429
- if validContainerName == "" {
430
- validContainerName = volName
431
- if containerNamePrefix != "" {
432
- validContainerName = containerNamePrefix + "-" + volName
470
+ dstAzcopyAuthEnv := srcAzcopyAuthEnv
471
+ dstAccountSASToken := srcAccountSASToken
472
+ if volContentSource != nil {
473
+ if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName {
474
+ if dstAccountSASToken , dstAzcopyAuthEnv , err = d . getAzcopyAuth ( ctx , accountName , accountKey , storageEndpointSuffix , accountOptions , secrets , secretName , secretNamespace ); err != nil {
475
+ return nil , status . Errorf ( codes . Internal , "failed to getAzcopyAuth on account(%s) rg(%s), error: %v" , accountOptions . Name , accountOptions . ResourceGroup , err )
476
+ }
433
477
}
434
- validContainerName = getValidContainerName (validContainerName , protocol )
435
- setKeyValueInMap (parameters , containerNameField , validContainerName )
436
478
}
437
479
438
- if req .GetVolumeContentSource () != nil {
439
- accountSASToken , authAzcopyEnv , err := d .getAzcopyAuth (ctx , accountName , accountKey , storageEndpointSuffix , accountOptions , secrets , secretName , secretNamespace )
440
- if err != nil {
441
- return nil , status .Errorf (codes .Internal , "failed to getAzcopyAuth on account(%s) rg(%s), error: %v" , accountOptions .Name , accountOptions .ResourceGroup , err )
442
- }
443
- if err := d .copyVolume (req , accountSASToken , authAzcopyEnv , validContainerName , storageEndpointSuffix ); err != nil {
480
+ klog .V (2 ).Infof ("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)" , validContainerName , accountName , storageAccountType , subsID , resourceGroup , location , requestGiB )
481
+ if err := d .CreateBlobContainer (ctx , subsID , resourceGroup , accountName , validContainerName , secrets ); err != nil {
482
+ return nil , status .Errorf (codes .Internal , "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v" , validContainerName , accountName , storageAccountType , resourceGroup , location , requestGiB , err )
483
+ }
484
+
485
+ if volContentSource != nil {
486
+ dstPath := fmt .Sprintf ("https://%s.blob.%s/%s" , accountName , storageEndpointSuffix , validContainerName )
487
+ if err := d .copyBlobContainer (dstAzcopyAuthEnv , srcPath , srcAccountSASToken , dstPath , dstAccountSASToken , validContainerName ); err != nil {
444
488
return nil , err
445
489
}
446
- } else {
447
- klog .V (2 ).Infof ("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)" , validContainerName , accountName , storageAccountType , subsID , resourceGroup , location , requestGiB )
448
- if err := d .CreateBlobContainer (ctx , subsID , resourceGroup , accountName , validContainerName , secrets ); err != nil {
449
- return nil , status .Errorf (codes .Internal , "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v" , validContainerName , accountName , storageAccountType , resourceGroup , location , requestGiB , err )
450
- }
451
490
}
452
491
453
492
if storeAccountKey && len (req .GetSecrets ()) == 0 {
@@ -488,7 +527,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
488
527
VolumeId : volumeID ,
489
528
CapacityBytes : req .GetCapacityRange ().GetRequiredBytes (),
490
529
VolumeContext : parameters ,
491
- ContentSource : req . GetVolumeContentSource () ,
530
+ ContentSource : volContentSource ,
492
531
},
493
532
}, nil
494
533
}
@@ -755,24 +794,13 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
755
794
})
756
795
}
757
796
758
- // CopyBlobContainer copies a blob container in the same storage account
759
- func (d * Driver ) copyBlobContainer (req * csi.CreateVolumeRequest , accountSasToken string , authAzcopyEnv []string , dstContainerName , storageEndpointSuffix string ) error {
760
- var sourceVolumeID string
761
- if req .GetVolumeContentSource () != nil && req .GetVolumeContentSource ().GetVolume () != nil {
762
- sourceVolumeID = req .GetVolumeContentSource ().GetVolume ().GetVolumeId ()
797
+ // copyBlobContainer copies source volume content into a destination volume
798
+ func (d * Driver ) copyBlobContainer (authAzcopyEnv []string , srcPath string , srcAccountSASToken string , dstPath string , dstAccountSASToken string , dstContainerName string ) error {
763
799
764
- }
765
- resourceGroupName , accountName , srcContainerName , _ , _ , err := GetContainerInfo (sourceVolumeID ) //nolint:dogsled
766
- if err != nil {
767
- return status .Error (codes .NotFound , err .Error ())
768
- }
769
- if srcContainerName == "" || dstContainerName == "" {
770
- return fmt .Errorf ("srcContainerName(%s) or dstContainerName(%s) is empty" , srcContainerName , dstContainerName )
800
+ if srcPath == "" || dstPath == "" || dstContainerName == "" {
801
+ return fmt .Errorf ("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty" , srcPath , dstPath , dstContainerName )
771
802
}
772
803
773
- srcPath := fmt .Sprintf ("https://%s.blob.%s/%s%s" , accountName , storageEndpointSuffix , srcContainerName , accountSasToken )
774
- dstPath := fmt .Sprintf ("https://%s.blob.%s/%s%s" , accountName , storageEndpointSuffix , dstContainerName , accountSasToken )
775
-
776
804
jobState , percent , err := d .azcopy .GetAzcopyJob (dstContainerName , authAzcopyEnv )
777
805
klog .V (2 ).Infof ("azcopy job status: %s, copy percent: %s%%, error: %v" , jobState , percent , err )
778
806
switch jobState {
@@ -781,9 +809,9 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
781
809
case util .AzcopyJobRunning :
782
810
return fmt .Errorf ("wait for the existing AzCopy job to complete, current copy percentage is %s%%" , percent )
783
811
case util .AzcopyJobNotFound :
784
- klog .V (2 ).Infof ("copy blob container %s to %s" , srcContainerName , dstContainerName )
812
+ klog .V (2 ).Infof ("copy blob container %s to %s" , srcPath , dstContainerName )
785
813
execFunc := func () error {
786
- cmd := exec .Command ("azcopy" , "copy" , srcPath , dstPath , "--recursive" , "--check-length=false" )
814
+ cmd := exec .Command ("azcopy" , "copy" , srcPath + srcAccountSASToken , dstPath + dstAccountSASToken , "--recursive" , "--check-length=false" )
787
815
if len (authAzcopyEnv ) > 0 {
788
816
cmd .Env = append (os .Environ (), authAzcopyEnv ... )
789
817
}
@@ -794,32 +822,19 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
794
822
}
795
823
timeoutFunc := func () error {
796
824
_ , percent , _ := d .azcopy .GetAzcopyJob (dstContainerName , authAzcopyEnv )
797
- return fmt .Errorf ("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%" , srcContainerName , dstContainerName , percent )
825
+ return fmt .Errorf ("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%" , srcPath , dstContainerName , percent )
798
826
}
799
827
copyErr := util .WaitUntilTimeout (time .Duration (d .waitForAzCopyTimeoutMinutes )* time .Minute , execFunc , timeoutFunc )
800
828
if copyErr != nil {
801
- klog .Warningf ("CopyBlobContainer(%s, %s, %s) failed with error: %v" , resourceGroupName , accountName , dstPath , copyErr )
829
+ klog .Warningf ("CopyBlobContainer(%s, %s, %s) failed with error: %v" , srcPath , dstPath , dstContainerName , copyErr )
802
830
} else {
803
- klog .V (2 ).Infof ("copied blob container %s to %s successfully" , srcContainerName , dstContainerName )
831
+ klog .V (2 ).Infof ("copied blob container %s to %s successfully" , srcPath , dstContainerName )
804
832
}
805
833
return copyErr
806
834
}
807
835
return err
808
836
}
809
837
810
- // copyVolume copies a volume form volume or snapshot, snapshot is not supported now
811
- func (d * Driver ) copyVolume (req * csi.CreateVolumeRequest , accountSASToken string , authAzcopyEnv []string , dstContainerName , storageEndpointSuffix string ) error {
812
- vs := req .VolumeContentSource
813
- switch vs .Type .(type ) {
814
- case * csi.VolumeContentSource_Snapshot :
815
- return status .Errorf (codes .InvalidArgument , "copy volume from volumeSnapshot is not supported" )
816
- case * csi.VolumeContentSource_Volume :
817
- return d .copyBlobContainer (req , accountSASToken , authAzcopyEnv , dstContainerName , storageEndpointSuffix )
818
- default :
819
- return status .Errorf (codes .InvalidArgument , "%v is not a proper volume source" , vs )
820
- }
821
- }
822
-
823
838
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
824
839
func (d * Driver ) authorizeAzcopyWithIdentity () ([]string , error ) {
825
840
azureAuthConfig := d .cloud .Config .AzureAuthConfig
0 commit comments