Skip to content

Commit 1b6d4eb

Browse files
committed
use azcopy for volume cloning
1 parent d6e08f4 commit 1b6d4eb

23 files changed

+1267
-10
lines changed

deploy/csi-blob-controller.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ spec:
4040
- "--csi-address=$(ADDRESS)"
4141
- "--leader-election"
4242
- "--leader-election-namespace=kube-system"
43-
- "--timeout=1200s"
43+
- "--timeout=600s"
4444
- "--extra-create-metadata=true"
4545
- "--kube-api-qps=50"
4646
- "--kube-api-burst=100"
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
apiVersion: v1
3+
kind: PersistentVolumeClaim
4+
metadata:
5+
name: pvc-blob-clone
6+
namespace: default
7+
spec:
8+
accessModes:
9+
- ReadWriteMany
10+
resources:
11+
requests:
12+
storage: 100Gi
13+
storageClassName: blob-fuse
14+
dataSource:
15+
kind: PersistentVolumeClaim
16+
name: pvc-blob
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// /*
2+
// Copyright The Kubernetes Authors.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
// */

hack/boilerplate/boilerplate.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ def get_refs():
6565

6666
return refs
6767

68+
def is_generated_file(filename, data, regexs):
69+
for d in skipped_ungenerated_files:
70+
if d in filename:
71+
return False
72+
73+
p = regexs["generated"]
74+
return p.search(data)
75+
6876
def file_passes(filename, refs, regexs):
6977
try:
7078
f = open(filename, 'r')
@@ -75,15 +83,21 @@ def file_passes(filename, refs, regexs):
7583
data = f.read()
7684
f.close()
7785

86+
# determine if the file is automatically generated
87+
generated = is_generated_file(filename, data, regexs)
88+
7889
basename = os.path.basename(filename)
7990
extension = file_extension(filename)
91+
if generated:
92+
if extension == "go":
93+
extension = "gomock"
8094
if extension != "":
8195
ref = refs[extension]
8296
else:
8397
ref = refs[basename]
8498

8599
# remove build tags from the top of Go files
86-
if extension == "go":
100+
if extension == "go" or extension == "gomock":
87101
p = regexs["go_build_constraints"]
88102
(data, found) = p.subn("", data, 1)
89103
if is_autogenerated(data, regexs):
@@ -142,6 +156,10 @@ def file_extension(filename):
142156
'cluster/env.sh', 'vendor', 'test/e2e/generated/bindata.go',
143157
'repo-infra/verify/boilerplate/test', '.glide']
144158

159+
# list all the files contain 'DO NOT EDIT', but are not generated
160+
skipped_ungenerated_files = [
161+
'hack/boilerplate/boilerplate.py']
162+
145163
def normalize_files(files):
146164
newfiles = []
147165
for pathname in files:
@@ -191,6 +209,8 @@ def get_regexs():
191209
regexs["go_build_constraints"] = re.compile(r"^(// \+build.*\n)+\n", re.MULTILINE)
192210
# strip #!.* from shell scripts
193211
regexs["shebang"] = re.compile(r"^(#!.*\n)\n*", re.MULTILINE)
212+
# Search for generated files
213+
regexs["generated"] = re.compile('DO NOT EDIT')
194214
return regexs
195215

196216

hack/update-mock.sh

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/bin/bash
2+
3+
# Copyright 2020 The Kubernetes Authors.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
set -euo pipefail
18+
19+
REPO_ROOT=$(realpath $(dirname ${BASH_SOURCE})/..)
20+
COPYRIGHT_FILE="${REPO_ROOT}/hack/boilerplate/boilerplate.generatego.txt"
21+
22+
if ! type mockgen &> /dev/null; then
23+
echo "mockgen not exist, install it"
24+
go install github.com/golang/mock/[email protected]
25+
fi
26+
27+
echo "Updating mocks for util.go"
28+
mockgen -copyright_file=$COPYRIGHT_FILE -source=pkg/util/util.go -package=util -destination=pkg/util/util_mock.go

pkg/blob/blob.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ type DriverOptions struct {
171171
KubeAPIBurst int
172172
EnableAznfsMount bool
173173
VolStatsCacheExpireInMinutes int
174+
SasTokenExpirationMinutes int
174175
}
175176

176177
// Driver implements all interfaces of CSI drivers
@@ -211,6 +212,10 @@ type Driver struct {
211212
accountSearchCache azcache.Resource
212213
// a timed cache storing volume stats <volumeID, volumeStats>
213214
volStatsCache azcache.Resource
215+
// sas expiry time for azcopy in volume clone
216+
sasTokenExpirationMinutes int
217+
// azcopy for provide exec mock for ut
218+
azcopy *util.Azcopy
214219
}
215220

216221
// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
@@ -236,6 +241,8 @@ func NewDriver(options *DriverOptions) *Driver {
236241
kubeAPIQPS: options.KubeAPIQPS,
237242
kubeAPIBurst: options.KubeAPIBurst,
238243
enableAznfsMount: options.EnableAznfsMount,
244+
sasTokenExpirationMinutes: options.SasTokenExpirationMinutes,
245+
azcopy: &util.Azcopy{},
239246
}
240247
d.Name = options.DriverName
241248
d.Version = driverVersion
@@ -288,6 +295,7 @@ func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
288295
//csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
289296
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
290297
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
298+
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
291299
})
292300
d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
293301
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,

pkg/blob/controllerserver.go

Lines changed: 132 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@ package blob
1919
import (
2020
"context"
2121
"fmt"
22+
"net/url"
23+
"os/exec"
2224
"strconv"
2325
"strings"
26+
"time"
2427

2528
"google.golang.org/grpc/codes"
2629
"google.golang.org/grpc/status"
2730

31+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
32+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
33+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
2834
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
2935
azstorage "github.com/Azure/azure-sdk-for-go/storage"
3036
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -42,6 +48,9 @@ import (
4248

4349
const (
4450
privateEndpoint = "privateendpoint"
51+
52+
waitForCopyInterval = 5 * time.Second
53+
waitForCopyTimeout = 3 * time.Minute
4554
)
4655

4756
// CreateVolume provisions a volume
@@ -61,6 +70,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
6170
}
6271

6372
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
73+
// logging the job status if it's volume cloning
74+
if req.GetVolumeContentSource() != nil {
75+
jobState, percent, err := d.azcopy.GetAzcopyJob(volName)
76+
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
77+
}
6478
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
6579
}
6680
defer d.volumeLocks.Release(volName)
@@ -313,7 +327,16 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
313327
}
314328

315329
var volumeID string
316-
mc := metrics.NewMetricContext(blobCSIDriverName, "controller_create_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
330+
requestName := "controller_create_volume"
331+
if req.GetVolumeContentSource() != nil {
332+
switch req.VolumeContentSource.Type.(type) {
333+
case *csi.VolumeContentSource_Snapshot:
334+
requestName = "controller_create_volume_from_snapshot"
335+
case *csi.VolumeContentSource_Volume:
336+
requestName = "controller_create_volume_from_volume"
337+
}
338+
}
339+
mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
317340
isOperationSucceeded := false
318341
defer func() {
319342
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
@@ -387,9 +410,20 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
387410
setKeyValueInMap(parameters, containerNameField, validContainerName)
388411
}
389412

390-
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
391-
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
392-
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
413+
if req.GetVolumeContentSource() != nil {
414+
if accountKey == "" {
415+
if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
416+
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
417+
}
418+
}
419+
if err := d.copyVolume(ctx, req, accountKey, validContainerName, storageEndpointSuffix); err != nil {
420+
return nil, err
421+
}
422+
} else {
423+
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
424+
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
425+
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
426+
}
393427
}
394428

395429
if storeAccountKey && len(req.GetSecrets()) == 0 {
@@ -430,6 +464,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
430464
VolumeId: volumeID,
431465
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
432466
VolumeContext: parameters,
467+
ContentSource: req.GetVolumeContentSource(),
433468
},
434469
}, nil
435470
}
@@ -675,6 +710,75 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
675710
})
676711
}
677712

713+
// CopyBlobContainer copies a blob container in the same storage account
714+
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
715+
var sourceVolumeID string
716+
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
717+
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
718+
719+
}
720+
resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
721+
if err != nil {
722+
return status.Error(codes.NotFound, err.Error())
723+
}
724+
if srcContainerName == "" || dstContainerName == "" {
725+
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
726+
}
727+
728+
klog.V(2).Infof("generate sas token for account(%s)", accountName)
729+
accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
730+
if genErr != nil {
731+
return genErr
732+
}
733+
734+
timeAfter := time.After(waitForCopyTimeout)
735+
timeTick := time.Tick(waitForCopyInterval)
736+
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
737+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
738+
739+
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
740+
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
741+
if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
742+
return err
743+
}
744+
klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
745+
for {
746+
select {
747+
case <-timeTick:
748+
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
749+
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
750+
switch jobState {
751+
case util.AzcopyJobError, util.AzcopyJobCompleted:
752+
return err
753+
case util.AzcopyJobNotFound:
754+
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
755+
out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
756+
if copyErr != nil {
757+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
758+
} else {
759+
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
760+
}
761+
return copyErr
762+
}
763+
case <-timeAfter:
764+
return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName)
765+
}
766+
}
767+
}
768+
769+
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
770+
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
771+
vs := req.VolumeContentSource
772+
switch vs.Type.(type) {
773+
case *csi.VolumeContentSource_Snapshot:
774+
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
775+
case *csi.VolumeContentSource_Volume:
776+
return d.copyBlobContainer(ctx, req, accountKey, dstContainerName, storageEndpointSuffix)
777+
default:
778+
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
779+
}
780+
}
781+
678782
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
679783
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
680784
if len(volCaps) == 0 {
@@ -699,3 +803,27 @@ func parseDays(dayStr string) (int32, error) {
699803

700804
return int32(days), nil
701805
}
806+
807+
// generateSASToken generate a sas token for storage account
808+
func generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
809+
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
810+
if err != nil {
811+
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
812+
}
813+
serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, nil)
814+
if err != nil {
815+
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
816+
}
817+
sasURL, err := serviceClient.GetSASURL(
818+
sas.AccountResourceTypes{Object: true, Service: false, Container: true},
819+
sas.AccountPermissions{Read: true, List: true, Write: true},
820+
sas.AccountServices{Blob: true}, time.Now(), time.Now().Add(time.Duration(expiryTime)*time.Minute))
821+
if err != nil {
822+
return "", err
823+
}
824+
u, err := url.Parse(sasURL)
825+
if err != nil {
826+
return "", err
827+
}
828+
return "?" + u.RawQuery, nil
829+
}

0 commit comments

Comments
 (0)