Skip to content

Commit 1b0f8de

Browse files
authored
Merge pull request #1532 from k8s-infra-cherrypick-robot/cherry-pick-1521-to-release-1.24
[release-1.24] cleanup: refactor volume cloning
2 parents 79501bc + bdcf778 commit 1b0f8de

File tree

3 files changed

+252
-60
lines changed

3 files changed

+252
-60
lines changed

pkg/blob/blob.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ var (
162162
supportedProtocolList = []string{Fuse, Fuse2, NFS, AZNFS}
163163
retriableErrors = []string{accountNotProvisioned, tooManyRequests, statusCodeNotFound, containerBeingDeletedDataplaneAPIError, containerBeingDeletedManagementAPIError, clientThrottled}
164164
supportedFSGroupChangePolicyList = []string{FSGroupChangeNone, string(v1.FSGroupChangeAlways), string(v1.FSGroupChangeOnRootMismatch)}
165+
166+
// azcopyCloneVolumeOptions used in volume cloning between different storage account and --check-length to false because volume data may be in changing state, copy volume is not same as current source volume,
167+
// set --s2s-preserve-access-tier=false to avoid BlobAccessTierNotSupportedForAccountType error in azcopy
168+
azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false", "--s2s-preserve-access-tier=false"}
165169
)
166170

167171
// DriverOptions defines driver parameters specified in driver deployment

pkg/blob/controllerserver.go

Lines changed: 59 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -366,36 +366,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
366366
defer d.volumeLocks.Release(volName)
367367

368368
requestName := "controller_create_volume"
369-
370-
var srcAzcopyAuthEnv []string
371-
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
372369
if volContentSource != nil {
373370
switch volContentSource.Type.(type) {
374371
case *csi.VolumeContentSource_Snapshot:
375372
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
376373
case *csi.VolumeContentSource_Volume:
377374
requestName = "controller_create_volume_from_volume"
378-
var srcVolumeID string
379-
if volContentSource.GetVolume() != nil {
380-
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
381-
}
382-
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
383-
if err != nil {
384-
return nil, status.Error(codes.NotFound, err.Error())
385-
}
386-
srcAccountOptions := &azure.AccountOptions{
387-
Name: srcAccountName,
388-
SubscriptionID: srcSubscriptionID,
389-
ResourceGroup: srcResourceGroupName,
390-
GetLatestAccountKey: getLatestAccountKey,
391-
}
392-
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
393-
if err != nil {
394-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
395-
}
396-
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
397-
default:
398-
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
399375
}
400376
}
401377

@@ -466,16 +442,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
466442
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
467443
}
468444
if volContentSource != nil {
469-
dstAzcopyAuthEnv := srcAzcopyAuthEnv
470-
dstAccountSASToken := srcAccountSASToken
471-
if srcAccountName != accountName {
472-
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
473-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
474-
}
445+
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
446+
if err != nil {
447+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
475448
}
476-
477-
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
478-
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
449+
if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil {
479450
return nil, err
480451
}
481452
}
@@ -781,11 +752,36 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
781752
}
782753

783754
// copyBlobContainer copies source volume content into a destination volume
784-
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {
755+
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
756+
var sourceVolumeID string
757+
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
758+
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
785759

786-
if srcPath == "" || dstPath == "" || dstContainerName == "" {
787-
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
788760
}
761+
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
762+
if err != nil {
763+
return status.Error(codes.NotFound, err.Error())
764+
}
765+
if dstAccountName == "" {
766+
dstAccountName = srcAccountName
767+
}
768+
if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
769+
return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
770+
}
771+
srcAccountSasToken := dstAccountSasToken
772+
if srcAccountName != dstAccountName && dstAccountSasToken != "" {
773+
srcAccountOptions := &azure.AccountOptions{
774+
Name: srcAccountName,
775+
ResourceGroup: srcResourceGroupName,
776+
SubscriptionID: srcSubscriptionID,
777+
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
778+
}
779+
if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil {
780+
return err
781+
}
782+
}
783+
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
784+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
789785

790786
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
791787
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
@@ -795,13 +791,9 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
795791
case util.AzcopyJobRunning:
796792
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
797793
case util.AzcopyJobNotFound:
798-
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
794+
klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
799795
execFunc := func() error {
800-
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false")
801-
if len(authAzcopyEnv) > 0 {
802-
cmd.Env = append(os.Environ(), authAzcopyEnv...)
803-
}
804-
if out, err := cmd.CombinedOutput(); err != nil {
796+
if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
805797
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
806798
}
807799
return nil
@@ -812,15 +804,38 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
812804
}
813805
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
814806
if copyErr != nil {
815-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
807+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr)
816808
} else {
817-
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
809+
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
818810
}
819811
return copyErr
820812
}
821813
return err
822814
}
823815

816+
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
817+
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
818+
vs := req.VolumeContentSource
819+
switch vs.Type.(type) {
820+
case *csi.VolumeContentSource_Snapshot:
821+
return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
822+
case *csi.VolumeContentSource_Volume:
823+
return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
824+
default:
825+
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
826+
}
827+
}
828+
829+
// execAzcopyCopy exec azcopy copy command
830+
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
831+
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
832+
cmd.Args = append(cmd.Args, azcopyCopyOptions...)
833+
if len(authAzcopyEnv) > 0 {
834+
cmd.Env = append(os.Environ(), authAzcopyEnv...)
835+
}
836+
return cmd.CombinedOutput()
837+
}
838+
824839
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
825840
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
826841
azureAuthConfig := d.cloud.Config.AzureAuthConfig

0 commit comments

Comments
 (0)