Skip to content

Commit d685531

Browse files
authored
Merge pull request #1563 from umagnus/release-1.23-sas
[release-1.23] cleanup: refactor volume cloning
2 parents c686b81 + ab8a1fd commit d685531

File tree

3 files changed

+250
-71
lines changed

3 files changed

+250
-71
lines changed

pkg/blob/blob.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ const (
155155
var (
156156
supportedProtocolList = []string{Fuse, Fuse2, NFS, AZNFS}
157157
retriableErrors = []string{accountNotProvisioned, tooManyRequests, statusCodeNotFound, containerBeingDeletedDataplaneAPIError, containerBeingDeletedManagementAPIError, clientThrottled}
158+
159+
// azcopyCloneVolumeOptions used in volume cloning between different storage account and --check-length to false because volume data may be in changing state, copy volume is not same as current source volume,
160+
// set --s2s-preserve-access-tier=false to avoid BlobAccessTierNotSupportedForAccountType error in azcopy
161+
azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false", "--s2s-preserve-access-tier=false"}
158162
)
159163

160164
// DriverOptions defines driver parameters specified in driver deployment

pkg/blob/controllerserver.go

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -358,48 +358,18 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
358358
if volContentSource != nil {
359359
switch volContentSource.Type.(type) {
360360
case *csi.VolumeContentSource_Snapshot:
361-
requestName = "controller_create_volume_from_snapshot"
361+
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
362362
case *csi.VolumeContentSource_Volume:
363363
requestName = "controller_create_volume_from_volume"
364364
}
365365
}
366+
366367
mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
367368
isOperationSucceeded := false
368369
defer func() {
369370
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
370371
}()
371372

372-
var srcAzcopyAuthEnv []string
373-
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
374-
if volContentSource != nil {
375-
switch volContentSource.Type.(type) {
376-
case *csi.VolumeContentSource_Snapshot:
377-
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
378-
case *csi.VolumeContentSource_Volume:
379-
var srcVolumeID string
380-
if volContentSource.GetVolume() != nil {
381-
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
382-
}
383-
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
384-
if err != nil {
385-
return nil, status.Error(codes.NotFound, err.Error())
386-
}
387-
srcAccountOptions := &azure.AccountOptions{
388-
Name: srcAccountName,
389-
SubscriptionID: srcSubscriptionID,
390-
ResourceGroup: srcResourceGroupName,
391-
GetLatestAccountKey: getLatestAccountKey,
392-
}
393-
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
394-
if err != nil {
395-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
396-
}
397-
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
398-
default:
399-
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
400-
}
401-
}
402-
403373
var accountKey string
404374
accountName := account
405375
if len(secrets) == 0 && accountName == "" {
@@ -455,24 +425,16 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
455425
secrets = createStorageAccountSecret(accountName, accountKey)
456426
}
457427

458-
dstAzcopyAuthEnv := srcAzcopyAuthEnv
459-
dstAccountSASToken := srcAccountSASToken
460-
if volContentSource != nil {
461-
if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName {
462-
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
463-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
464-
}
465-
}
466-
}
467-
468428
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
469429
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
470430
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
471431
}
472-
473432
if volContentSource != nil {
474-
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
475-
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
433+
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
434+
if err != nil {
435+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
436+
}
437+
if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil {
476438
return nil, err
477439
}
478440
}
@@ -762,11 +724,36 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
762724
}
763725

764726
// copyBlobContainer copies source volume content into a destination volume
765-
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {
727+
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
728+
var sourceVolumeID string
729+
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
730+
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
766731

767-
if srcPath == "" || dstPath == "" || dstContainerName == "" {
768-
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
769732
}
733+
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
734+
if err != nil {
735+
return status.Error(codes.NotFound, err.Error())
736+
}
737+
if dstAccountName == "" {
738+
dstAccountName = srcAccountName
739+
}
740+
if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
741+
return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
742+
}
743+
srcAccountSasToken := dstAccountSasToken
744+
if srcAccountName != dstAccountName && dstAccountSasToken != "" {
745+
srcAccountOptions := &azure.AccountOptions{
746+
Name: srcAccountName,
747+
ResourceGroup: srcResourceGroupName,
748+
SubscriptionID: srcSubscriptionID,
749+
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
750+
}
751+
if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil {
752+
return err
753+
}
754+
}
755+
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
756+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
770757

771758
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
772759
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
@@ -776,32 +763,51 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
776763
case util.AzcopyJobRunning:
777764
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
778765
case util.AzcopyJobNotFound:
779-
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
766+
klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
780767
execFunc := func() error {
781-
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false")
782-
if len(authAzcopyEnv) > 0 {
783-
cmd.Env = append(os.Environ(), authAzcopyEnv...)
784-
}
785-
if out, err := cmd.CombinedOutput(); err != nil {
768+
if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
786769
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
787770
}
788771
return nil
789772
}
790773
timeoutFunc := func() error {
791774
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
792-
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstContainerName, percent)
775+
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
793776
}
794777
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
795778
if copyErr != nil {
796-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
779+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr)
797780
} else {
798-
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
781+
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
799782
}
800783
return copyErr
801784
}
802785
return err
803786
}
804787

788+
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
789+
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
790+
vs := req.VolumeContentSource
791+
switch vs.Type.(type) {
792+
case *csi.VolumeContentSource_Snapshot:
793+
return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
794+
case *csi.VolumeContentSource_Volume:
795+
return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
796+
default:
797+
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
798+
}
799+
}
800+
801+
// execAzcopyCopy exec azcopy copy command
802+
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
803+
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
804+
cmd.Args = append(cmd.Args, azcopyCopyOptions...)
805+
if len(authAzcopyEnv) > 0 {
806+
cmd.Env = append(os.Environ(), authAzcopyEnv...)
807+
}
808+
return cmd.CombinedOutput()
809+
}
810+
805811
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
806812
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
807813
azureAuthConfig := d.cloud.Config.AzureAuthConfig

0 commit comments

Comments
 (0)