diff --git a/pkg/blob/blob.go b/pkg/blob/blob.go index a0dcba8e3..0343a9505 100644 --- a/pkg/blob/blob.go +++ b/pkg/blob/blob.go @@ -155,6 +155,10 @@ const ( var ( supportedProtocolList = []string{Fuse, Fuse2, NFS, AZNFS} retriableErrors = []string{accountNotProvisioned, tooManyRequests, statusCodeNotFound, containerBeingDeletedDataplaneAPIError, containerBeingDeletedManagementAPIError, clientThrottled} + + // azcopyCloneVolumeOptions used in volume cloning between different storage account and --check-length to false because volume data may be in changing state, copy volume is not same as current source volume, + // set --s2s-preserve-access-tier=false to avoid BlobAccessTierNotSupportedForAccountType error in azcopy + azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false", "--s2s-preserve-access-tier=false"} ) // DriverOptions defines driver parameters specified in driver deployment diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index 8174b2206..69d9e48c2 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -358,48 +358,18 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) if volContentSource != nil { switch volContentSource.Type.(type) { case *csi.VolumeContentSource_Snapshot: - requestName = "controller_create_volume_from_snapshot" + return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") case *csi.VolumeContentSource_Volume: requestName = "controller_create_volume_from_volume" } } + mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name) isOperationSucceeded := false defer func() { mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID) }() - var srcAzcopyAuthEnv []string - var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string - if volContentSource != nil { - switch volContentSource.Type.(type) { - case *csi.VolumeContentSource_Snapshot: - return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") - case *csi.VolumeContentSource_Volume: - var srcVolumeID string - if volContentSource.GetVolume() != nil { - srcVolumeID = volContentSource.GetVolume().GetVolumeId() - } - srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID) - if err != nil { - return nil, status.Error(codes.NotFound, err.Error()) - } - srcAccountOptions := &azure.AccountOptions{ - Name: srcAccountName, - SubscriptionID: srcSubscriptionID, - ResourceGroup: srcResourceGroupName, - GetLatestAccountKey: getLatestAccountKey, - } - srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) - } - srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName) - default: - return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource) - } - } - var accountKey string accountName := account if len(secrets) == 0 && accountName == "" { @@ -455,24 +425,16 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) secrets = createStorageAccountSecret(accountName, accountKey) } - dstAzcopyAuthEnv := srcAzcopyAuthEnv - dstAccountSASToken := srcAccountSASToken - if volContentSource != nil { - if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName { - if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil { - return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) - } - } - } - klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB) if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil { return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err) } - if volContentSource != nil { - dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName) - if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil { + accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) + } + if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil { return nil, err } } @@ -762,11 +724,36 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN } // copyBlobContainer copies source volume content into a destination volume -func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error { +func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error { + var sourceVolumeID string + if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil { + sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId() - if srcPath == "" || dstPath == "" || dstContainerName == "" { - return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName) } + srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled + if err != nil { + return status.Error(codes.NotFound, err.Error()) + } + if dstAccountName == "" { + dstAccountName = srcAccountName + } + if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" { + return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName) + } + srcAccountSasToken := dstAccountSasToken + if srcAccountName != dstAccountName && dstAccountSasToken != "" { + srcAccountOptions := &azure.AccountOptions{ + Name: srcAccountName, + ResourceGroup: srcResourceGroupName, + SubscriptionID: srcSubscriptionID, + GetLatestAccountKey: accountOptions.GetLatestAccountKey, + } + if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil { + return err + } + } + srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken) + dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken) jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) @@ -776,32 +763,51 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc case util.AzcopyJobRunning: return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent) case util.AzcopyJobNotFound: - klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName) + klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName) execFunc := func() error { - cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false") - if len(authAzcopyEnv) > 0 { - cmd.Env = append(os.Environ(), authAzcopyEnv...) - } - if out, err := cmd.CombinedOutput(); err != nil { + if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil { return fmt.Errorf("exec error: %v, output: %v", err, string(out)) } return nil } timeoutFunc := func() error { _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) - return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstContainerName, percent) + return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent) } copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc) if copyErr != nil { - klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr) + klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr) } else { - klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName) + klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) } return copyErr } return err } +// copyVolume copies a volume form volume or snapshot, snapshot is not supported now +func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error { + vs := req.VolumeContentSource + switch vs.Type.(type) { + case *csi.VolumeContentSource_Snapshot: + return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") + case *csi.VolumeContentSource_Volume: + return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix) + default: + return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs) + } +} + +// execAzcopyCopy exec azcopy copy command +func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) { + cmd := exec.Command("azcopy", "copy", srcPath, dstPath) + cmd.Args = append(cmd.Args, azcopyCopyOptions...) + if len(authAzcopyEnv) > 0 { + cmd.Env = append(os.Environ(), authAzcopyEnv...) + } + return cmd.CombinedOutput() +} + // authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) { azureAuthConfig := d.cloud.Config.AzureAuthConfig diff --git a/pkg/blob/controllerserver_test.go b/pkg/blob/controllerserver_test.go index 173d09290..e2d45229d 100644 --- a/pkg/blob/controllerserver_test.go +++ b/pkg/blob/controllerserver_test.go @@ -818,6 +818,9 @@ func TestCreateVolume(t *testing.T) { defer ctrl.Finish() m := util.NewMockEXEC(ctrl) + + listStr := "no error" + m.EXPECT().RunCommand(gomock.Any(), gomock.Any()).Return(listStr, nil) d.azcopy.ExecCmd = m expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #") @@ -1434,38 +1437,167 @@ func TestDeleteBlobContainer(t *testing.T) { } func TestCopyVolume(t *testing.T) { + stdVolumeCapability := &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + } + stdVolumeCapabilities := []*csi.VolumeCapability{ + stdVolumeCapability, + } testCases := []struct { name string testFunc func(t *testing.T) }{ { - name: "src path is empty", + name: "copy volume from volumeSnapshot is not supported", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() - expectedErr := fmt.Errorf("srcPath() or dstPath(dstPath) or dstContainerName(dstContainer) is empty") - err := d.copyBlobContainer([]string{}, "", "", "dstPath", "", "dstContainer") + mp := map[string]string{} + + volumeSnapshotSource := &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: "unit-test", + } + volumeContentSourceSnapshotSource := &csi.VolumeContentSource_Snapshot{ + Snapshot: volumeSnapshotSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceSnapshotSource, + } + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") + err := d.copyVolume(ctx, req, "", "", nil, "", "", nil, "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "copy volume from volume not found", + testFunc: func(t *testing.T) { + ctx := context.Background() + d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "unit-test", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #") + err := d.copyVolume(ctx, req, "", "", nil, "dstContainer", "", nil, "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "src blob container is empty", + testFunc: func(t *testing.T) { + ctx := context.Background() + d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "rg#unit-test##", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := fmt.Errorf("srcAccountName(unit-test) or srcContainerName() or dstContainerName(dstContainer) is empty") + err := d.copyVolume(ctx, req, "", "", nil, "dstContainer", "", nil, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } }, }, { - name: "dst path is empty", + name: "dst blob container is empty", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() - expectedErr := fmt.Errorf("srcPath(srcPath) or dstPath() or dstContainerName(dstContainer) is empty") - err := d.copyBlobContainer([]string{}, "srcPath", "", "", "", "dstContainer") + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := fmt.Errorf("srcAccountName(f5713de20cde511e8ba4900) or srcContainerName(fileshare) or dstContainerName() is empty") + err := d.copyVolume(ctx, req, "", "", nil, "", "", nil, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } }, }, { - name: "dst container is empty", + name: "dstAccount Name is different and storageAccountClient is nil", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() - expectedErr := fmt.Errorf("srcPath(srcPath) or dstPath(dstPath) or dstContainerName() is empty") - err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "") + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := fmt.Errorf("StorageAccountClient is nil") + err := d.copyVolume(ctx, req, "dstAccount", "sastoken", nil, "dstContainer", "", &azure.AccountOptions{Name: "dstAccount", ResourceGroup: "rg", SubscriptionID: "subsID", GetLatestAccountKey: true}, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } @@ -1474,21 +1606,38 @@ func TestCopyVolume(t *testing.T) { { name: "azcopy job is already completed", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + ctrl := gomock.NewController(t) defer ctrl.Finish() m := util.NewMockEXEC(ctrl) listStr := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr, nil) - // if test.enableShow { - // m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstContainer -B 3")).Return(test.showStr, test.showErr) - // } d.azcopy.ExecCmd = m var expectedErr error - err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "dstContainer") + err := d.copyVolume(ctx, req, "", "sastoken", nil, "dstContainer", "", nil, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } @@ -1497,7 +1646,27 @@ func TestCopyVolume(t *testing.T) { { name: "azcopy job is first in progress and then be completed", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1509,7 +1678,7 @@ func TestCopyVolume(t *testing.T) { d.azcopy.ExecCmd = m expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%") - err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "dstContainer") + err := d.copyVolume(ctx, req, "", "sastoken", nil, "dstContainer", "", nil, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) }