Skip to content

Commit 461e952

Browse files
authored
Merge pull request #536 from andyzhangx/account-search-cache
feat: add account search cache to prevent account list throttling
2 parents 3dbe1bb + 8275f94 commit 461e952

File tree

3 files changed

+50
-20
lines changed

3 files changed

+50
-20
lines changed

pkg/blob/blob.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package blob
1919
import (
2020
"fmt"
2121
"strings"
22+
"sync"
23+
"time"
2224

2325
"golang.org/x/net/context"
2426

@@ -36,6 +38,7 @@ import (
3638

3739
csicommon "sigs.k8s.io/blob-csi-driver/pkg/csi-common"
3840
"sigs.k8s.io/blob-csi-driver/pkg/util"
41+
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
3942
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
4043
)
4144

@@ -137,6 +140,10 @@ type Driver struct {
137140
volumeLocks *volumeLocks
138141
// only for nfs feature
139142
subnetLockMap *util.LockMap
143+
// a map storing all volumes created by this driver <volumeName, accountName>
144+
volMap sync.Map
145+
// a timed cache storing acount search history (solve account list throttling issue)
146+
accountSearchCache *azcache.TimedCache
140147
}
141148

142149
// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
@@ -158,6 +165,12 @@ func NewDriver(options *DriverOptions) *Driver {
158165
d.Name = options.DriverName
159166
d.Version = driverVersion
160167
d.NodeID = options.NodeID
168+
169+
var err error
170+
getter := func(key string) (interface{}, error) { return nil, nil }
171+
if d.accountSearchCache, err = azcache.NewTimedcache(time.Minute, getter); err != nil {
172+
klog.Fatalf("%v", err)
173+
}
161174
return &d
162175
}
163176

pkg/blob/blob_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func TestNewDriver(t *testing.T) {
8787
fakedriver := NewFakeDriver()
8888
fakedriver.Name = DefaultDriverName
8989
fakedriver.Version = driverVersion
90+
fakedriver.accountSearchCache = driver.accountSearchCache
9091
assert.Equal(t, driver, fakedriver)
9192
}
9293

pkg/blob/controllerserver.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"k8s.io/klog/v2"
3434

3535
"sigs.k8s.io/blob-csi-driver/pkg/util"
36+
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
3637
"sigs.k8s.io/cloud-provider-azure/pkg/metrics"
3738
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
3839
)
@@ -45,18 +46,18 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
4546
}
4647

4748
volumeCapabilities := req.GetVolumeCapabilities()
48-
name := req.GetName()
49-
if len(name) == 0 {
49+
volName := req.GetName()
50+
if len(volName) == 0 {
5051
return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
5152
}
5253
if len(volumeCapabilities) == 0 {
5354
return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided")
5455
}
5556

56-
if acquired := d.volumeLocks.TryAcquire(name); !acquired {
57-
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, name)
57+
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
58+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
5859
}
59-
defer d.volumeLocks.Release(name)
60+
defer d.volumeLocks.Release(volName)
6061

6162
volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
6263
requestGiB := int(util.RoundUpGiB(volSizeBytes))
@@ -189,20 +190,35 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
189190
var accountKey string
190191
accountName := account
191192
if len(req.GetSecrets()) == 0 && accountName == "" {
192-
lockKey := storageAccountType + accountKind + resourceGroup + location
193-
d.volLockMap.LockEntry(lockKey)
194-
err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
195-
var retErr error
196-
accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
197-
if isRetriableError(retErr) {
198-
klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
199-
return false, nil
193+
if v, ok := d.volMap.Load(volName); ok {
194+
accountName = v.(string)
195+
} else {
196+
lockKey := fmt.Sprintf("%s%s%s%s%s", storageAccountType, accountKind, resourceGroup, location, protocol)
197+
// search in cache first
198+
cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
199+
if err != nil {
200+
return nil, err
201+
}
202+
if cache != nil {
203+
accountName = cache.(string)
204+
} else {
205+
d.volLockMap.LockEntry(lockKey)
206+
err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
207+
var retErr error
208+
accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
209+
if isRetriableError(retErr) {
210+
klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
211+
return false, nil
212+
}
213+
return true, retErr
214+
})
215+
d.volLockMap.UnlockEntry(lockKey)
216+
if err != nil {
217+
return nil, status.Errorf(codes.Internal, "failed to ensure storage account: %v", err)
218+
}
219+
d.accountSearchCache.Set(lockKey, accountName)
220+
d.volMap.Store(volName, accountName)
200221
}
201-
return true, retErr
202-
})
203-
d.volLockMap.UnlockEntry(lockKey)
204-
if err != nil {
205-
return nil, status.Errorf(codes.Internal, "failed to ensure storage account: %v", err)
206222
}
207223
}
208224
accountOptions.Name = accountName
@@ -215,7 +231,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
215231

216232
validContainerName := containerName
217233
if validContainerName == "" {
218-
validContainerName = getValidContainerName(name, protocol)
234+
validContainerName = getValidContainerName(volName, protocol)
219235
parameters[containerNameField] = validContainerName
220236
}
221237

@@ -251,7 +267,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
251267
if containerName != "" {
252268
// add volume name as suffix to differentiate volumeID since "containerName" is specified
253269
// not necessary for dynamic container name creation since volumeID already contains volume name
254-
volumeID = volumeID + "#" + name
270+
volumeID = volumeID + "#" + volName
255271
}
256272
klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
257273

0 commit comments

Comments
 (0)