@@ -20,6 +20,7 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"net/url"
23
+ "os"
23
24
"os/exec"
24
25
"strconv"
25
26
"strings"
@@ -49,6 +50,15 @@ import (
49
50
const (
50
51
privateEndpoint = "privateendpoint"
51
52
53
+ azcopyAutoLoginType = "AZCOPY_AUTO_LOGIN_TYPE"
54
+ azcopySPAApplicationID = "AZCOPY_SPA_APPLICATION_ID"
55
+ azcopySPAClientSecret = "AZCOPY_SPA_CLIENT_SECRET"
56
+ azcopyTenantID = "AZCOPY_TENANT_ID"
57
+ azcopyMSIClientID = "AZCOPY_MSI_CLIENT_ID"
58
+ MSI = "MSI"
59
+ SPN = "SPN"
60
+ authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
61
+
52
62
waitForCopyInterval = 5 * time.Second
53
63
waitForCopyTimeout = 3 * time.Minute
54
64
)
@@ -72,7 +82,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
72
82
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
73
83
// logging the job status if it's volume cloning
74
84
if req.GetVolumeContentSource() != nil {
75
- jobState, percent, err := d.azcopy.GetAzcopyJob(volName)
85
+ jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{} )
76
86
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
77
87
}
78
88
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
@@ -411,12 +421,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
411
421
}
412
422
413
423
if req.GetVolumeContentSource() != nil {
414
- if accountKey == "" {
415
- if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
416
- return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
417
- }
424
+ var accountSASToken string
425
+ if accountSASToken, err = d.getSASToken(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
426
+ return nil, status.Errorf(codes.Internal, "failed to getSASToken on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
418
427
}
419
- if err := d.copyVolume(ctx, req, accountKey , validContainerName, storageEndpointSuffix); err != nil {
428
+ if err := d.copyVolume(ctx, req, accountSASToken , validContainerName, storageEndpointSuffix); err != nil {
420
429
return nil, err
421
430
}
422
431
} else {
@@ -711,7 +720,7 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
711
720
}
712
721
713
722
// CopyBlobContainer copies a blob container in the same storage account
714
- func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeRequest, accountKey , dstContainerName, storageEndpointSuffix string) error {
723
+ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeRequest, accountSasToken , dstContainerName, storageEndpointSuffix string) error {
715
724
var sourceVolumeID string
716
725
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
717
726
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
@@ -725,18 +734,19 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
725
734
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
726
735
}
727
736
728
- klog.V(2).Infof("generate sas token for account(%s)", accountName)
729
- accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
730
- if genErr != nil {
731
- return genErr
737
+ var authAzcopyEnv []string
738
+ if accountSasToken == "" {
739
+ if authAzcopyEnv, err = d.authorizeAzcopyWithIdentity(); err != nil {
740
+ return err
741
+ }
732
742
}
733
743
734
744
timeAfter := time.After(waitForCopyTimeout)
735
745
timeTick := time.Tick(waitForCopyInterval)
736
746
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
737
747
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
738
748
739
- jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
749
+ jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv )
740
750
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
741
751
if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
742
752
return err
@@ -745,14 +755,18 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
745
755
for {
746
756
select {
747
757
case <-timeTick:
748
- jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
758
+ jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv )
749
759
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
750
760
switch jobState {
751
761
case util.AzcopyJobError, util.AzcopyJobCompleted:
752
762
return err
753
763
case util.AzcopyJobNotFound:
754
764
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
755
- out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
765
+ cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
766
+ if len(authAzcopyEnv) > 0 {
767
+ cmd.Env = append(os.Environ(), authAzcopyEnv...)
768
+ }
769
+ out, copyErr := cmd.CombinedOutput()
756
770
if copyErr != nil {
757
771
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
758
772
} else {
@@ -767,18 +781,77 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
767
781
}
768
782
769
783
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
770
- func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey , dstContainerName, storageEndpointSuffix string) error {
784
+ func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountSASToken , dstContainerName, storageEndpointSuffix string) error {
771
785
vs := req.VolumeContentSource
772
786
switch vs.Type.(type) {
773
787
case *csi.VolumeContentSource_Snapshot:
774
788
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
775
789
case *csi.VolumeContentSource_Volume:
776
- return d.copyBlobContainer(ctx, req, accountKey , dstContainerName, storageEndpointSuffix)
790
+ return d.copyBlobContainer(ctx, req, accountSASToken , dstContainerName, storageEndpointSuffix)
777
791
default:
778
792
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
779
793
}
780
794
}
781
795
796
+ func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
797
+ azureAuthConfig := d.cloud.Config.AzureAuthConfig
798
+ var authAzcopyEnv []string
799
+ if azureAuthConfig.UseManagedIdentityExtension {
800
+ authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
801
+ if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
802
+ klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
803
+ authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
804
+ } else {
805
+ klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
806
+ }
807
+ return authAzcopyEnv, nil
808
+ }
809
+ if len(azureAuthConfig.AADClientSecret) > 0 {
810
+ klog.V(2).Infof("use service principal to authorize azcopy")
811
+ authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
812
+ if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
813
+ return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
814
+ }
815
+ authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
816
+ authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
817
+ authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
818
+ klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
819
+
820
+ return authAzcopyEnv, nil
821
+ }
822
+ return []string{}, fmt.Errorf("service principle or managed identity are both not set")
823
+ }
824
+
825
+ // getSASToken will only generate sas token for azcopy in following conditions:
826
+ // 1. secrets is not empty
827
+ // 2. driver is not using managed identity and service principal
828
+ // 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
829
+ func (d *Driver) getSASToken(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, error) {
830
+ authAzcopyEnv, _ := d.authorizeAzcopyWithIdentity()
831
+ useSasTokenFallBack := false
832
+ if len(authAzcopyEnv) > 0 {
833
+ out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
834
+ if testErr != nil {
835
+ return "", fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
836
+ }
837
+ if strings.Contains(out, authorizationPermissionMismatch) {
838
+ klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
839
+ useSasTokenFallBack = true
840
+ }
841
+ }
842
+ if len(secrets) > 0 || len(authAzcopyEnv) == 0 || useSasTokenFallBack {
843
+ var err error
844
+ if accountKey == "" {
845
+ if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
846
+ return "", err
847
+ }
848
+ }
849
+ klog.V(2).Infof("generate sas token for account(%s)", accountName)
850
+ return generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
851
+ }
852
+ return "", nil
853
+ }
854
+
782
855
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
783
856
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
784
857
if len(volCaps) == 0 {
0 commit comments