Skip to content

Commit 85abd15

Browse files
committed
fix: copy volume content source to requested destination
1 parent fda1e78 commit 85abd15

File tree

2 files changed

+225
-58
lines changed

2 files changed

+225
-58
lines changed

pkg/blob/controllerserver.go

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,15 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
367367
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
368368
}()
369369

370+
secrets := req.GetSecrets()
371+
372+
volumeContentSourceSASToken, volumeContentSourcePath, err := d.validateVolumeContentSource(ctx, req, storageEndpointSuffix, secrets, secretName, secretNamespace, getLatestAccountKey)
373+
if err != nil {
374+
return nil, err
375+
}
376+
370377
var accountKey string
371378
accountName := account
372-
secrets := req.GetSecrets()
373379
if len(secrets) == 0 && accountName == "" {
374380
if v, ok := d.volMap.Load(volName); ok {
375381
accountName = v.(string)
@@ -435,19 +441,19 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
435441
setKeyValueInMap(parameters, containerNameField, validContainerName)
436442
}
437443

438-
if req.GetVolumeContentSource() != nil {
444+
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
445+
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
446+
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
447+
}
448+
449+
if volumeContentSourcePath != "" {
439450
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
440451
if err != nil {
441452
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
442453
}
443-
if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
454+
if err := d.copyVolumeContent(req, volumeContentSourceSASToken, volumeContentSourcePath, accountSASToken, accountName, validContainerName, authAzcopyEnv, storageEndpointSuffix); err != nil {
444455
return nil, err
445456
}
446-
} else {
447-
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
448-
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
449-
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
450-
}
451457
}
452458

453459
if storeAccountKey && len(req.GetSecrets()) == 0 {
@@ -755,68 +761,96 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
755761
})
756762
}
757763

758-
// CopyBlobContainer copies a blob container in the same storage account
759-
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
760-
var sourceVolumeID string
761-
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
762-
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
763-
764-
}
765-
resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
766-
if err != nil {
767-
return status.Error(codes.NotFound, err.Error())
764+
// copyBlobContainer copies a source path to the destination container
765+
func (d *Driver) copyBlobContainer(srcSasToken string, srcPath string, dstSasToken string, dstAccountName string, dstContainerName string, authAzcopyEnv []string, storageEndpointSuffix string) error {
766+
if srcPath == "" {
767+
return fmt.Errorf("srcPath(%s) is empty", srcPath)
768768
}
769-
if srcContainerName == "" || dstContainerName == "" {
770-
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
769+
if dstAccountName == "" || dstContainerName == "" {
770+
return fmt.Errorf("dstAccountName(%s) or dstContainerName(%s) is empty", dstAccountName, dstContainerName)
771771
}
772772

773-
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
774-
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
773+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", dstAccountName, storageEndpointSuffix, dstContainerName)
775774

776775
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
777-
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
776+
klog.V(2).Infof("copyBlobContainer(%s) status: %s, percent: %s%%, error: %v", dstContainerName, jobState, percent, err)
778777
switch jobState {
779778
case util.AzcopyJobError, util.AzcopyJobCompleted:
780779
return err
781780
case util.AzcopyJobRunning:
782-
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
781+
return fmt.Errorf("copyBlobContainer(%s) running, percent: %s%%", dstContainerName, percent)
783782
case util.AzcopyJobNotFound:
784-
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
783+
klog.V(2).Infof("copyBlobContainer(%s) begin, srcPath: %s, dstPath: %s", dstContainerName, srcPath, dstPath)
785784
execFunc := func() error {
786-
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
785+
cmd := exec.Command("azcopy", "copy", srcPath+srcSasToken, dstPath+dstSasToken, "--recursive", "--check-length=false")
787786
if len(authAzcopyEnv) > 0 {
788787
cmd.Env = append(os.Environ(), authAzcopyEnv...)
789788
}
790789
if out, err := cmd.CombinedOutput(); err != nil {
791-
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
790+
return fmt.Errorf("copyBlobContainer(%s) failed, error: %v, output: %v", dstContainerName, err, string(out))
792791
}
793792
return nil
794793
}
795794
timeoutFunc := func() error {
796795
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
797-
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
796+
return fmt.Errorf("timeout, percent: %s%%", percent)
798797
}
799798
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
800799
if copyErr != nil {
801-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
800+
klog.Warningf("copyBlobContainer(%s) failed, error: %v", dstContainerName, copyErr)
802801
} else {
803-
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
802+
klog.V(2).Infof("copyBlobContainer(%s) successful", dstContainerName)
804803
}
805804
return copyErr
806805
}
807806
return err
808807
}
809808

810-
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
811-
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
809+
// validateVolumeContentSource validates source type and returns source path
810+
func (d *Driver) validateVolumeContentSource(ctx context.Context, req *csi.CreateVolumeRequest, storageEndpointSuffix string, secrets map[string]string, secretName, secretNamespace string, getLatestAccountKey bool) (string, string, error) {
811+
vs := req.VolumeContentSource
812+
if vs == nil {
813+
return "", "", nil
814+
}
815+
switch vs.Type.(type) {
816+
case *csi.VolumeContentSource_Snapshot:
817+
return "", "", status.Errorf(codes.InvalidArgument, "volume content source is not supported: snapshot")
818+
case *csi.VolumeContentSource_Volume:
819+
var srcVolumeID string
820+
if vs.GetVolume() != nil {
821+
srcVolumeID = vs.GetVolume().GetVolumeId()
822+
}
823+
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionId, err := GetContainerInfo(srcVolumeID) //nolint:dogsled
824+
if err != nil {
825+
return "", "", status.Error(codes.NotFound, err.Error())
826+
}
827+
accountOptions := &azure.AccountOptions{
828+
Name: srcAccountName,
829+
SubscriptionID: srcSubscriptionId,
830+
ResourceGroup: srcResourceGroupName,
831+
GetLatestAccountKey: getLatestAccountKey,
832+
}
833+
srcAccountSASToken, _, err := d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
834+
if err != nil {
835+
return "", "", status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
836+
}
837+
srcPath := fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
838+
return srcAccountSASToken, srcPath, nil
839+
default:
840+
return "", "", status.Errorf(codes.InvalidArgument, "volume content source is not recognized: %v", vs)
841+
}
842+
}
843+
844+
// copyVolumeContent copies volume content from supported sources to the destination container
845+
func (d *Driver) copyVolumeContent(req *csi.CreateVolumeRequest, srcSASToken string, srcPath string, dstSASToken string, dstAccountName string, dstContainerName string, authAzcopyEnv []string, storageEndpointSuffix string) error {
812846
vs := req.VolumeContentSource
813847
switch vs.Type.(type) {
814848
case *csi.VolumeContentSource_Snapshot:
815-
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
849+
return status.Errorf(codes.InvalidArgument, "volume content source is not supported: snapshot")
816850
case *csi.VolumeContentSource_Volume:
817-
return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
851+
return d.copyBlobContainer(srcSASToken, srcPath, dstSASToken, dstAccountName, dstContainerName, authAzcopyEnv, storageEndpointSuffix)
818852
default:
819-
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
853+
return status.Errorf(codes.InvalidArgument, "volume content source is not recognized: %v", vs)
820854
}
821855
}
822856

0 commit comments

Comments
 (0)