Skip to content

Commit 89a10e7

Browse files
committed
fix: clone volume content to requested volume
1 parent ef62ad2 commit 89a10e7

File tree

2 files changed

+95
-218
lines changed

2 files changed

+95
-218
lines changed

pkg/blob/controllerserver.go

Lines changed: 82 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
8181
return nil, status.Error(codes.InvalidArgument, err.Error())
8282
}
8383

84-
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
85-
// logging the job status if it's volume cloning
86-
if req.GetVolumeContentSource() != nil {
87-
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
88-
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
89-
}
90-
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
91-
}
92-
defer d.volumeLocks.Release(volName)
93-
9484
volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
9585
requestGiB := int(util.RoundUpGiB(volSizeBytes))
9686

87+
volContentSource := req.GetVolumeContentSource()
88+
secrets := req.GetSecrets()
89+
9790
parameters := req.GetParameters()
9891
if parameters == nil {
9992
parameters = make(map[string]string)
@@ -351,10 +344,31 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
351344
GetLatestAccountKey: getLatestAccountKey,
352345
}
353346

347+
containerName = replaceWithMap(containerName, containerNameReplaceMap)
348+
validContainerName := containerName
349+
if validContainerName == "" {
350+
validContainerName = volName
351+
if containerNamePrefix != "" {
352+
validContainerName = containerNamePrefix + "-" + volName
353+
}
354+
validContainerName = getValidContainerName(validContainerName, protocol)
355+
setKeyValueInMap(parameters, containerNameField, validContainerName)
356+
}
357+
358+
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
359+
// logging the job status if it's volume cloning
360+
if volContentSource != nil {
361+
jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
362+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
363+
}
364+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
365+
}
366+
defer d.volumeLocks.Release(volName)
367+
354368
var volumeID string
355369
requestName := "controller_create_volume"
356-
if req.GetVolumeContentSource() != nil {
357-
switch req.VolumeContentSource.Type.(type) {
370+
if volContentSource != nil {
371+
switch volContentSource.Type.(type) {
358372
case *csi.VolumeContentSource_Snapshot:
359373
requestName = "controller_create_volume_from_snapshot"
360374
case *csi.VolumeContentSource_Volume:
@@ -367,9 +381,39 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
367381
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
368382
}()
369383

384+
var srcAzcopyAuthEnv []string
385+
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
386+
if volContentSource != nil {
387+
switch volContentSource.Type.(type) {
388+
case *csi.VolumeContentSource_Snapshot:
389+
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
390+
case *csi.VolumeContentSource_Volume:
391+
var srcVolumeID string
392+
if volContentSource.GetVolume() != nil {
393+
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
394+
}
395+
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
396+
if err != nil {
397+
return nil, status.Error(codes.NotFound, err.Error())
398+
}
399+
srcAccountOptions := &azure.AccountOptions{
400+
Name: srcAccountName,
401+
SubscriptionID: srcSubscriptionID,
402+
ResourceGroup: srcResourceGroupName,
403+
GetLatestAccountKey: getLatestAccountKey,
404+
}
405+
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
406+
if err != nil {
407+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
408+
}
409+
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
410+
default:
411+
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
412+
}
413+
}
414+
370415
var accountKey string
371416
accountName := account
372-
secrets := req.GetSecrets()
373417
if len(secrets) == 0 && accountName == "" {
374418
if v, ok := d.volMap.Load(volName); ok {
375419
accountName = v.(string)
@@ -423,31 +467,26 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
423467
secrets = createStorageAccountSecret(accountName, accountKey)
424468
}
425469

426-
// replace pv/pvc name namespace metadata in subDir
427-
containerName = replaceWithMap(containerName, containerNameReplaceMap)
428-
validContainerName := containerName
429-
if validContainerName == "" {
430-
validContainerName = volName
431-
if containerNamePrefix != "" {
432-
validContainerName = containerNamePrefix + "-" + volName
470+
dstAzcopyAuthEnv := srcAzcopyAuthEnv
471+
dstAccountSASToken := srcAccountSASToken
472+
if volContentSource != nil {
473+
if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName {
474+
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
475+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
476+
}
433477
}
434-
validContainerName = getValidContainerName(validContainerName, protocol)
435-
setKeyValueInMap(parameters, containerNameField, validContainerName)
436478
}
437479

438-
if req.GetVolumeContentSource() != nil {
439-
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
440-
if err != nil {
441-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
442-
}
443-
if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
480+
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
481+
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
482+
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
483+
}
484+
485+
if volContentSource != nil {
486+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
487+
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
444488
return nil, err
445489
}
446-
} else {
447-
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
448-
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
449-
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
450-
}
451490
}
452491

453492
if storeAccountKey && len(req.GetSecrets()) == 0 {
@@ -488,7 +527,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
488527
VolumeId: volumeID,
489528
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
490529
VolumeContext: parameters,
491-
ContentSource: req.GetVolumeContentSource(),
530+
ContentSource: volContentSource,
492531
},
493532
}, nil
494533
}
@@ -755,24 +794,13 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
755794
})
756795
}
757796

758-
// CopyBlobContainer copies a blob container in the same storage account
759-
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
760-
var sourceVolumeID string
761-
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
762-
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
797+
// copyBlobContainer copies source volume content into a destination volume
798+
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {
763799

764-
}
765-
resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
766-
if err != nil {
767-
return status.Error(codes.NotFound, err.Error())
768-
}
769-
if srcContainerName == "" || dstContainerName == "" {
770-
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
800+
if srcPath == "" || dstPath == "" || dstContainerName == "" {
801+
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
771802
}
772803

773-
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
774-
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
775-
776804
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
777805
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
778806
switch jobState {
@@ -781,9 +809,9 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
781809
case util.AzcopyJobRunning:
782810
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
783811
case util.AzcopyJobNotFound:
784-
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
812+
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
785813
execFunc := func() error {
786-
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
814+
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false")
787815
if len(authAzcopyEnv) > 0 {
788816
cmd.Env = append(os.Environ(), authAzcopyEnv...)
789817
}
@@ -794,32 +822,19 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
794822
}
795823
timeoutFunc := func() error {
796824
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
797-
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
825+
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstContainerName, percent)
798826
}
799827
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
800828
if copyErr != nil {
801-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
829+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
802830
} else {
803-
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
831+
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
804832
}
805833
return copyErr
806834
}
807835
return err
808836
}
809837

810-
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
811-
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
812-
vs := req.VolumeContentSource
813-
switch vs.Type.(type) {
814-
case *csi.VolumeContentSource_Snapshot:
815-
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
816-
case *csi.VolumeContentSource_Volume:
817-
return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
818-
default:
819-
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
820-
}
821-
}
822-
823838
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
824839
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
825840
azureAuthConfig := d.cloud.Config.AzureAuthConfig

0 commit comments

Comments
 (0)