Skip to content

Commit ece81de

Browse files
authored
Merge pull request #242 from andyzhangx/lock
fix: add lock for prevent account list throttling
2 parents 0c819cc + 89b0fdc commit ece81de

File tree

7 files changed

+138
-16
lines changed

7 files changed

+138
-16
lines changed

charts/latest/blob-csi-driver/templates/csi-blob-controller.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ spec:
3333
- "--csi-address=$(ADDRESS)"
3434
- "--enable-leader-election"
3535
- "--leader-election-type=leases"
36-
- "--timeout=30s"
36+
- "--timeout=60s"
3737
env:
3838
- name: ADDRESS
3939
value: /csi/csi.sock

deploy/csi-blob-controller.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ spec:
3232
- "--csi-address=$(ADDRESS)"
3333
- "--enable-leader-election"
3434
- "--leader-election-type=leases"
35-
- "--timeout=30s"
35+
- "--timeout=60s"
3636
env:
3737
- name: ADDRESS
3838
value: /csi/csi.sock

pkg/blob/blob.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"fmt"
2121
"strings"
2222

23+
"sigs.k8s.io/blob-csi-driver/pkg/util"
24+
2325
csicommon "sigs.k8s.io/blob-csi-driver/pkg/csi-common"
2426

2527
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -68,8 +70,9 @@ var (
6870
// Driver implements all interfaces of CSI drivers
6971
type Driver struct {
7072
csicommon.CSIDriver
71-
cloud *azure.Cloud
72-
mounter *mount.SafeFormatAndMount
73+
cloud *azure.Cloud
74+
mounter *mount.SafeFormatAndMount
75+
volLockMap *util.LockMap
7376
}
7477

7578
// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
@@ -79,6 +82,7 @@ func NewDriver(nodeID string) *Driver {
7982
driver.Name = DriverName
8083
driver.Version = driverVersion
8184
driver.NodeID = nodeID
85+
driver.volLockMap = util.NewLockMap()
8286
return &driver
8387
}
8488

pkg/blob/controllerserver.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,19 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
8989
return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
9090
}
9191

92+
enableHTTPSTrafficOnly := true
9293
if protocol == nfs {
9394
if account == "" {
9495
return nil, status.Errorf(codes.InvalidArgument, "storage account must be specified when provisioning nfs file share")
9596
}
97+
enableHTTPSTrafficOnly = false
9698
}
9799

98100
accountKind := string(storage.StorageV2)
99101
if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
100102
accountKind = string(storage.BlockBlobStorage)
101103
}
102104

103-
enableHTTPSTrafficOnly := true
104-
105105
tags, err := azure.ConvertTagsToMap(customTags)
106106
if err != nil {
107107
return nil, err
@@ -119,6 +119,10 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
119119

120120
var accountName, accountKey string
121121
if len(req.GetSecrets()) == 0 { // check whether account is provided by secret
122+
lockKey := account + storageAccountType + accountKind + resourceGroup + location
123+
d.volLockMap.LockEntry(lockKey)
124+
defer d.volLockMap.UnlockEntry(lockKey)
125+
122126
err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
123127
var retErr error
124128
accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(accountOptions, protocol)

pkg/blob/nodeserver.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,9 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
148148

149149
source := fmt.Sprintf("%s:/%s/%s", blobStorageEndPoint, accountName, containerName)
150150
mountOptions := util.JoinMountOptions(mountFlags, []string{"sec=sys,vers=3,nolock"})
151-
mountComplete := false
152-
err := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
153-
err := d.mounter.MountSensitive(source, targetPath, nfs, mountOptions, []string{})
154-
mountComplete = true
155-
return true, err
156-
})
157-
if !mountComplete {
158-
return nil, status.Error(codes.Internal, fmt.Sprintf("volume(%s) mount %q on %q failed with timeout(2m)", volumeID, source, targetPath))
159-
}
160-
if err != nil {
151+
if err := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
152+
return true, d.mounter.MountSensitive(source, targetPath, nfs, mountOptions, []string{})
153+
}); err != nil {
161154
return nil, status.Error(codes.Internal, fmt.Sprintf("volume(%s) mount %q on %q failed with %v", volumeID, source, targetPath, err))
162155
}
163156
klog.V(2).Infof("volume(%s) mount %q on %q succeeded", volumeID, source, targetPath)

pkg/util/util.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package util
1818

1919
import (
2020
"os"
21+
"sync"
2122
)
2223

2324
const (
@@ -80,3 +81,51 @@ func MakeDir(pathname string) error {
8081
}
8182
return nil
8283
}
84+
85+
// LockMap used to lock on entries
86+
type LockMap struct {
87+
sync.Mutex
88+
mutexMap map[string]*sync.Mutex
89+
}
90+
91+
// NewLockMap returns a new lock map
92+
func NewLockMap() *LockMap {
93+
return &LockMap{
94+
mutexMap: make(map[string]*sync.Mutex),
95+
}
96+
}
97+
98+
// LockEntry acquires a lock associated with the specific entry
99+
func (lm *LockMap) LockEntry(entry string) {
100+
lm.Lock()
101+
// check if entry does not exists, then add entry
102+
if _, exists := lm.mutexMap[entry]; !exists {
103+
lm.addEntry(entry)
104+
}
105+
106+
lm.Unlock()
107+
lm.lockEntry(entry)
108+
}
109+
110+
// UnlockEntry release the lock associated with the specific entry
111+
func (lm *LockMap) UnlockEntry(entry string) {
112+
lm.Lock()
113+
defer lm.Unlock()
114+
115+
if _, exists := lm.mutexMap[entry]; !exists {
116+
return
117+
}
118+
lm.unlockEntry(entry)
119+
}
120+
121+
func (lm *LockMap) addEntry(entry string) {
122+
lm.mutexMap[entry] = &sync.Mutex{}
123+
}
124+
125+
func (lm *LockMap) lockEntry(entry string) {
126+
lm.mutexMap[entry].Lock()
127+
}
128+
129+
func (lm *LockMap) unlockEntry(entry string) {
130+
lm.mutexMap[entry].Unlock()
131+
}

pkg/util/util_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package util
1919
import (
2020
"os"
2121
"testing"
22+
"time"
2223

2324
"github.com/stretchr/testify/assert"
2425
)
@@ -39,6 +40,77 @@ func TestRoundUpGiB(t *testing.T) {
3940
}
4041
}
4142

43+
func TestSimpleLockEntry(t *testing.T) {
44+
testLockMap := NewLockMap()
45+
46+
callbackChan1 := make(chan interface{})
47+
go testLockMap.lockAndCallback(t, "entry1", callbackChan1)
48+
ensureCallbackHappens(t, callbackChan1)
49+
}
50+
51+
func TestSimpleLockUnlockEntry(t *testing.T) {
52+
testLockMap := NewLockMap()
53+
54+
callbackChan1 := make(chan interface{})
55+
go testLockMap.lockAndCallback(t, "entry1", callbackChan1)
56+
ensureCallbackHappens(t, callbackChan1)
57+
testLockMap.UnlockEntry("entry1")
58+
}
59+
60+
func TestConcurrentLockEntry(t *testing.T) {
61+
testLockMap := NewLockMap()
62+
63+
callbackChan1 := make(chan interface{})
64+
callbackChan2 := make(chan interface{})
65+
66+
go testLockMap.lockAndCallback(t, "entry1", callbackChan1)
67+
ensureCallbackHappens(t, callbackChan1)
68+
69+
go testLockMap.lockAndCallback(t, "entry1", callbackChan2)
70+
ensureNoCallback(t, callbackChan2)
71+
72+
testLockMap.UnlockEntry("entry1")
73+
ensureCallbackHappens(t, callbackChan2)
74+
testLockMap.UnlockEntry("entry1")
75+
}
76+
77+
func (lm *LockMap) lockAndCallback(t *testing.T, entry string, callbackChan chan<- interface{}) {
78+
lm.LockEntry(entry)
79+
callbackChan <- true
80+
}
81+
82+
var callbackTimeout = 2 * time.Second
83+
84+
func ensureCallbackHappens(t *testing.T, callbackChan <-chan interface{}) bool {
85+
select {
86+
case <-callbackChan:
87+
return true
88+
case <-time.After(callbackTimeout):
89+
t.Fatalf("timed out waiting for callback")
90+
return false
91+
}
92+
}
93+
94+
func ensureNoCallback(t *testing.T, callbackChan <-chan interface{}) bool {
95+
select {
96+
case <-callbackChan:
97+
t.Fatalf("unexpected callback")
98+
return false
99+
case <-time.After(callbackTimeout):
100+
return true
101+
}
102+
}
103+
104+
func TestUnlockEntryNotExists(t *testing.T) {
105+
testLockMap := NewLockMap()
106+
107+
callbackChan1 := make(chan interface{})
108+
go testLockMap.lockAndCallback(t, "entry1", callbackChan1)
109+
ensureCallbackHappens(t, callbackChan1)
110+
// entry2 does not exist
111+
testLockMap.UnlockEntry("entry2")
112+
testLockMap.UnlockEntry("entry1")
113+
}
42114
func TestBytesToGiB(t *testing.T) {
43115
var sizeInBytes int64 = 5 * GiB
44116

0 commit comments

Comments
 (0)