Skip to content

Commit 6544a40

Browse files
committed
fix: copy volume content source to requested volume
1 parent ef62ad2 commit 6544a40

File tree

2 files changed

+93
-199
lines changed

2 files changed

+93
-199
lines changed

pkg/blob/controllerserver.go

Lines changed: 80 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ const (
6767

6868
// CreateVolume provisions a volume
6969
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
70+
71+
// Request
72+
7073
if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
7174
klog.Errorf("invalid create volume req: %v", req)
7275
return nil, err
@@ -81,9 +84,10 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
8184
return nil, status.Error(codes.InvalidArgument, err.Error())
8285
}
8386

87+
volContentSource := req.GetVolumeContentSource()
8488
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
8589
// logging the job status if it's volume cloning
86-
if req.GetVolumeContentSource() != nil {
90+
if volContentSource != nil {
8791
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
8892
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
8993
}
@@ -94,6 +98,10 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
9498
volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
9599
requestGiB := int(util.RoundUpGiB(volSizeBytes))
96100

101+
secrets := req.GetSecrets()
102+
103+
// Parameters
104+
97105
parameters := req.GetParameters()
98106
if parameters == nil {
99107
parameters = make(map[string]string)
@@ -351,10 +359,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
351359
GetLatestAccountKey: getLatestAccountKey,
352360
}
353361

362+
// Telemetry
363+
354364
var volumeID string
355365
requestName := "controller_create_volume"
356-
if req.GetVolumeContentSource() != nil {
357-
switch req.VolumeContentSource.Type.(type) {
366+
if volContentSource != nil {
367+
switch volContentSource.Type.(type) {
358368
case *csi.VolumeContentSource_Snapshot:
359369
requestName = "controller_create_volume_from_snapshot"
360370
case *csi.VolumeContentSource_Volume:
@@ -367,9 +377,43 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
367377
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
368378
}()
369379

380+
// Volume Source
381+
382+
var srcAzcopyAuthEnv []string
383+
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
384+
if volContentSource != nil {
385+
switch volContentSource.Type.(type) {
386+
case *csi.VolumeContentSource_Snapshot:
387+
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
388+
case *csi.VolumeContentSource_Volume:
389+
var srcVolumeID string
390+
if volContentSource.GetVolume() != nil {
391+
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
392+
}
393+
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
394+
if err != nil {
395+
return nil, status.Error(codes.NotFound, err.Error())
396+
}
397+
srcAccountOptions := &azure.AccountOptions{
398+
Name: srcAccountName,
399+
SubscriptionID: srcSubscriptionID,
400+
ResourceGroup: srcResourceGroupName,
401+
GetLatestAccountKey: getLatestAccountKey,
402+
}
403+
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
404+
if err != nil {
405+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
406+
}
407+
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
408+
default:
409+
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
410+
}
411+
}
412+
413+
// Storage Account
414+
370415
var accountKey string
371416
accountName := account
372-
secrets := req.GetSecrets()
373417
if len(secrets) == 0 && accountName == "" {
374418
if v, ok := d.volMap.Load(volName); ok {
375419
accountName = v.(string)
@@ -423,6 +467,8 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
423467
secrets = createStorageAccountSecret(accountName, accountKey)
424468
}
425469

470+
// Blob Container
471+
426472
// replace pv/pvc name namespace metadata in subDir
427473
containerName = replaceWithMap(containerName, containerNameReplaceMap)
428474
validContainerName := containerName
@@ -434,22 +480,30 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
434480
validContainerName = getValidContainerName(validContainerName, protocol)
435481
setKeyValueInMap(parameters, containerNameField, validContainerName)
436482
}
483+
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
484+
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
485+
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
486+
}
437487

438-
if req.GetVolumeContentSource() != nil {
439-
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
440-
if err != nil {
441-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
488+
// Volume Content
489+
490+
if volContentSource != nil {
491+
dstAzcopyAuthEnv := srcAzcopyAuthEnv
492+
dstAccountSASToken := srcAccountSASToken
493+
if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName {
494+
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
495+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
496+
}
442497
}
443-
if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
498+
499+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
500+
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
444501
return nil, err
445502
}
446-
} else {
447-
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
448-
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
449-
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
450-
}
451503
}
452504

505+
// Secrets
506+
453507
if storeAccountKey && len(req.GetSecrets()) == 0 {
454508
if accountKey == "" {
455509
if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
@@ -466,6 +520,8 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
466520
}
467521
}
468522

523+
// Response
524+
469525
var uuid string
470526
if containerName != "" {
471527
// add volume name as suffix to differentiate volumeID since "containerName" is specified
@@ -488,7 +544,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
488544
VolumeId: volumeID,
489545
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
490546
VolumeContext: parameters,
491-
ContentSource: req.GetVolumeContentSource(),
547+
ContentSource: volContentSource,
492548
},
493549
}, nil
494550
}
@@ -755,24 +811,13 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
755811
})
756812
}
757813

758-
// CopyBlobContainer copies a blob container in the same storage account
759-
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
760-
var sourceVolumeID string
761-
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
762-
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
814+
// copyBlobContainer copies source volume content into a destination volume
815+
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {
763816

764-
}
765-
resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
766-
if err != nil {
767-
return status.Error(codes.NotFound, err.Error())
768-
}
769-
if srcContainerName == "" || dstContainerName == "" {
770-
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
817+
if srcPath == "" || dstPath == "" || dstContainerName == "" {
818+
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
771819
}
772820

773-
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
774-
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
775-
776821
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
777822
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
778823
switch jobState {
@@ -781,9 +826,9 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
781826
case util.AzcopyJobRunning:
782827
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
783828
case util.AzcopyJobNotFound:
784-
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
829+
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
785830
execFunc := func() error {
786-
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
831+
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false")
787832
if len(authAzcopyEnv) > 0 {
788833
cmd.Env = append(os.Environ(), authAzcopyEnv...)
789834
}
@@ -794,32 +839,19 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
794839
}
795840
timeoutFunc := func() error {
796841
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
797-
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
842+
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstContainerName, percent)
798843
}
799844
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
800845
if copyErr != nil {
801-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
846+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
802847
} else {
803-
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
848+
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
804849
}
805850
return copyErr
806851
}
807852
return err
808853
}
809854

810-
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
811-
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
812-
vs := req.VolumeContentSource
813-
switch vs.Type.(type) {
814-
case *csi.VolumeContentSource_Snapshot:
815-
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
816-
case *csi.VolumeContentSource_Volume:
817-
return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
818-
default:
819-
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
820-
}
821-
}
822-
823855
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
824856
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
825857
azureAuthConfig := d.cloud.Config.AzureAuthConfig

0 commit comments

Comments
 (0)