Skip to content

Commit d68e7a2

Browse files
authored
Merge pull request #1509 from k8s-infra-cherrypick-robot/cherry-pick-1504-to-release-1.24
[release-1.24] fix: clone volume content to requested volume
2 parents e618177 + 54820ea commit d68e7a2

File tree

2 files changed

+95
-218
lines changed

2 files changed

+95
-218
lines changed

pkg/blob/controllerserver.go

Lines changed: 82 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
8181
return nil, status.Error(codes.InvalidArgument, err.Error())
8282
}
8383

84-
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
85-
// logging the job status if it's volume cloning
86-
if req.GetVolumeContentSource() != nil {
87-
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
88-
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
89-
}
90-
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
91-
}
92-
defer d.volumeLocks.Release(volName)
93-
9484
volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
9585
requestGiB := int(util.RoundUpGiB(volSizeBytes))
9686

87+
volContentSource := req.GetVolumeContentSource()
88+
secrets := req.GetSecrets()
89+
9790
parameters := req.GetParameters()
9891
if parameters == nil {
9992
parameters = make(map[string]string)
@@ -351,10 +344,31 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
351344
GetLatestAccountKey: getLatestAccountKey,
352345
}
353346

347+
containerName = replaceWithMap(containerName, containerNameReplaceMap)
348+
validContainerName := containerName
349+
if validContainerName == "" {
350+
validContainerName = volName
351+
if containerNamePrefix != "" {
352+
validContainerName = containerNamePrefix + "-" + volName
353+
}
354+
validContainerName = getValidContainerName(validContainerName, protocol)
355+
setKeyValueInMap(parameters, containerNameField, validContainerName)
356+
}
357+
358+
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
359+
// logging the job status if it's volume cloning
360+
if volContentSource != nil {
361+
jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
362+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
363+
}
364+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
365+
}
366+
defer d.volumeLocks.Release(volName)
367+
354368
var volumeID string
355369
requestName := "controller_create_volume"
356-
if req.GetVolumeContentSource() != nil {
357-
switch req.VolumeContentSource.Type.(type) {
370+
if volContentSource != nil {
371+
switch volContentSource.Type.(type) {
358372
case *csi.VolumeContentSource_Snapshot:
359373
requestName = "controller_create_volume_from_snapshot"
360374
case *csi.VolumeContentSource_Volume:
@@ -367,9 +381,39 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
367381
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
368382
}()
369383

384+
var srcAzcopyAuthEnv []string
385+
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
386+
if volContentSource != nil {
387+
switch volContentSource.Type.(type) {
388+
case *csi.VolumeContentSource_Snapshot:
389+
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
390+
case *csi.VolumeContentSource_Volume:
391+
var srcVolumeID string
392+
if volContentSource.GetVolume() != nil {
393+
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
394+
}
395+
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
396+
if err != nil {
397+
return nil, status.Error(codes.NotFound, err.Error())
398+
}
399+
srcAccountOptions := &azure.AccountOptions{
400+
Name: srcAccountName,
401+
SubscriptionID: srcSubscriptionID,
402+
ResourceGroup: srcResourceGroupName,
403+
GetLatestAccountKey: getLatestAccountKey,
404+
}
405+
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
406+
if err != nil {
407+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
408+
}
409+
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
410+
default:
411+
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
412+
}
413+
}
414+
370415
var accountKey string
371416
accountName := account
372-
secrets := req.GetSecrets()
373417
if len(secrets) == 0 && accountName == "" {
374418
if v, ok := d.volMap.Load(volName); ok {
375419
accountName = v.(string)
@@ -423,31 +467,26 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
423467
secrets = createStorageAccountSecret(accountName, accountKey)
424468
}
425469

426-
// replace pv/pvc name namespace metadata in subDir
427-
containerName = replaceWithMap(containerName, containerNameReplaceMap)
428-
validContainerName := containerName
429-
if validContainerName == "" {
430-
validContainerName = volName
431-
if containerNamePrefix != "" {
432-
validContainerName = containerNamePrefix + "-" + volName
470+
dstAzcopyAuthEnv := srcAzcopyAuthEnv
471+
dstAccountSASToken := srcAccountSASToken
472+
if volContentSource != nil {
473+
if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName {
474+
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
475+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
476+
}
433477
}
434-
validContainerName = getValidContainerName(validContainerName, protocol)
435-
setKeyValueInMap(parameters, containerNameField, validContainerName)
436478
}
437479

438-
if req.GetVolumeContentSource() != nil {
439-
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
440-
if err != nil {
441-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
442-
}
443-
if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
480+
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
481+
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
482+
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
483+
}
484+
485+
if volContentSource != nil {
486+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
487+
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
444488
return nil, err
445489
}
446-
} else {
447-
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
448-
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
449-
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
450-
}
451490
}
452491

453492
if storeAccountKey && len(req.GetSecrets()) == 0 {
@@ -488,7 +527,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
488527
VolumeId: volumeID,
489528
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
490529
VolumeContext: parameters,
491-
ContentSource: req.GetVolumeContentSource(),
530+
ContentSource: volContentSource,
492531
},
493532
}, nil
494533
}
@@ -750,24 +789,13 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
750789
})
751790
}
752791

753-
// CopyBlobContainer copies a blob container in the same storage account
754-
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
755-
var sourceVolumeID string
756-
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
757-
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
792+
// copyBlobContainer copies source volume content into a destination volume
793+
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {
758794

759-
}
760-
resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
761-
if err != nil {
762-
return status.Error(codes.NotFound, err.Error())
763-
}
764-
if srcContainerName == "" || dstContainerName == "" {
765-
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
795+
if srcPath == "" || dstPath == "" || dstContainerName == "" {
796+
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
766797
}
767798

768-
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
769-
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
770-
771799
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
772800
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
773801
switch jobState {
@@ -776,9 +804,9 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
776804
case util.AzcopyJobRunning:
777805
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
778806
case util.AzcopyJobNotFound:
779-
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
807+
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
780808
execFunc := func() error {
781-
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
809+
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false")
782810
if len(authAzcopyEnv) > 0 {
783811
cmd.Env = append(os.Environ(), authAzcopyEnv...)
784812
}
@@ -789,32 +817,19 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
789817
}
790818
timeoutFunc := func() error {
791819
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
792-
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
820+
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstContainerName, percent)
793821
}
794822
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
795823
if copyErr != nil {
796-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
824+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
797825
} else {
798-
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
826+
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
799827
}
800828
return copyErr
801829
}
802830
return err
803831
}
804832

805-
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
806-
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
807-
vs := req.VolumeContentSource
808-
switch vs.Type.(type) {
809-
case *csi.VolumeContentSource_Snapshot:
810-
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
811-
case *csi.VolumeContentSource_Volume:
812-
return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
813-
default:
814-
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
815-
}
816-
}
817-
818833
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
819834
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
820835
azureAuthConfig := d.cloud.Config.AzureAuthConfig

0 commit comments

Comments
 (0)