Skip to content

Commit d69464a

Browse files
committed
chore: only remove disks that can be processed from the batch
1 parent 11fbd9e commit d69464a

File tree

3 files changed

+68
-43
lines changed

3 files changed

+68
-43
lines changed

pkg/azuredisk/azure_controller_common.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,13 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
232232
if int(maxNumDisks) > numDisksAttached {
233233
numDisksAllowed = int(maxNumDisks) - numDisksAttached
234234
} else {
235-
numDisksAllowed = 0
235+
return -1, fmt.Errorf("Maximum number of disks %s %d", util.MaximumDataDiskExceededMsg, maxNumDisks)
236236
}
237237
}
238238
}
239239
}
240240

241-
diskMap, err := c.retrieveAttachBatchedDiskRequests(node, diskuri)
241+
diskMap, err := c.retrieveAttachBatchedDiskRequests(node, diskuri, numDisksAllowed)
242242
if err != nil {
243243
return -1, err
244244
}
@@ -248,22 +248,6 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
248248
return c.verifyAttach(ctx, diskName, diskURI, nodeName)
249249
}
250250

251-
// Remove some disks from the batch if the number is more than the max number of disks allowed
252-
removeDisks := len(diskMap) - numDisksAllowed
253-
if removeDisks > 0 {
254-
klog.V(2).Infof("too many disks to attach, remove %d disks from the request", removeDisks)
255-
for diskURI, options := range diskMap {
256-
if removeDisks == 0 {
257-
break
258-
}
259-
if options != nil {
260-
klog.V(2).Infof("remove disk(%s) from attach request from node(%s)", diskURI, nodeName)
261-
delete(diskMap, diskURI)
262-
}
263-
removeDisks--
264-
}
265-
}
266-
267251
lun, setLunErr := c.SetDiskLun(ctx, nodeName, diskuri, diskMap, occupiedLuns)
268252
if setLunErr != nil {
269253
return -1, setLunErr
@@ -333,7 +317,7 @@ func (c *controllerCommon) batchAttachDiskRequest(diskURI, nodeName string, opti
333317

334318
// clean up attach disk requests
335319
// return original attach disk requests
336-
func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI string) (map[string]*provider.AttachDiskOptions, error) {
320+
func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI string, numDisksAllowed int) (map[string]*provider.AttachDiskOptions, error) {
337321
var diskMap map[string]*provider.AttachDiskOptions
338322

339323
attachDiskMapKey := nodeName + attachDiskMapKeySuffix
@@ -350,7 +334,26 @@ func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI s
350334
klog.V(2).Infof("no attach disk(%s) request on node(%s), diskMap len:%d, %+v", diskURI, nodeName, len(diskMap), diskMap)
351335
return nil, nil
352336
}
353-
c.attachDiskMap.Store(nodeName, make(map[string]*provider.AttachDiskOptions))
337+
338+
// Remove disks from the batch if the number is more than the number of disks node can support
339+
disksToKeepInQueue := make(map[string]*provider.AttachDiskOptions)
340+
removeDisks := len(diskMap) - numDisksAllowed
341+
if removeDisks > 0 {
342+
klog.V(2).Infof("too many disks to attach, remove %d disks from the request", removeDisks)
343+
for currDiskURI, options := range diskMap {
344+
if removeDisks == 0 {
345+
break
346+
}
347+
if options != nil && currDiskURI != diskURI {
348+
klog.V(2).Infof("remove disk(%s) from current batch request from node(%s) but requeue", currDiskURI, nodeName)
349+
disksToKeepInQueue[currDiskURI] = options
350+
delete(diskMap, currDiskURI)
351+
}
352+
removeDisks--
353+
}
354+
}
355+
356+
c.attachDiskMap.Store(nodeName, disksToKeepInQueue)
354357
return diskMap, nil
355358
}
356359

pkg/azuredisk/azure_controller_common_test.go

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -646,32 +646,45 @@ func TestAttachDiskRequest(t *testing.T) {
646646
nodeName string
647647
diskName string
648648
diskNum int
649+
numDisksAllowed int
649650
duplicateDiskRequest bool
650651
expectedErr bool
651652
}{
652653
{
653-
desc: "one disk request in queue",
654-
diskURI: "diskURI",
655-
nodeName: "nodeName",
656-
diskName: "diskName",
657-
diskNum: 1,
658-
expectedErr: false,
654+
desc: "one disk request in queue",
655+
diskURI: "diskURI",
656+
nodeName: "nodeName",
657+
diskName: "diskName",
658+
diskNum: 1,
659+
numDisksAllowed: 8,
660+
expectedErr: false,
659661
},
660662
{
661-
desc: "multiple disk requests in queue",
662-
diskURI: "diskURI",
663-
nodeName: "nodeName",
664-
diskName: "diskName",
665-
diskNum: 10,
666-
expectedErr: false,
663+
desc: "multiple disk requests in queue",
664+
diskURI: "diskURI",
665+
nodeName: "nodeName",
666+
diskName: "diskName",
667+
diskNum: 10,
668+
numDisksAllowed: 16,
669+
expectedErr: false,
667670
},
668671
{
669-
desc: "zero disk request in queue",
670-
diskURI: "diskURI",
671-
nodeName: "nodeName",
672-
diskName: "diskName",
673-
diskNum: 0,
674-
expectedErr: false,
672+
desc: "multiple disk requests in queue but exceeds node limit",
673+
diskURI: "diskURI",
674+
nodeName: "nodeName",
675+
diskName: "diskName",
676+
diskNum: 10,
677+
numDisksAllowed: 8,
678+
expectedErr: false,
679+
},
680+
{
681+
desc: "zero disk request in queue",
682+
diskURI: "diskURI",
683+
nodeName: "nodeName",
684+
diskName: "diskName",
685+
diskNum: 0,
686+
numDisksAllowed: 8,
687+
expectedErr: false,
675688
},
676689
{
677690
desc: "multiple disk requests in queue",
@@ -680,6 +693,7 @@ func TestAttachDiskRequest(t *testing.T) {
680693
diskName: "diskName",
681694
duplicateDiskRequest: true,
682695
diskNum: 10,
696+
numDisksAllowed: 16,
683697
expectedErr: false,
684698
},
685699
}
@@ -703,9 +717,13 @@ func TestAttachDiskRequest(t *testing.T) {
703717
}
704718

705719
diskURI := fmt.Sprintf("%s%d", test.diskURI, test.diskNum)
706-
diskMap, err := common.retrieveAttachBatchedDiskRequests(test.nodeName, diskURI)
720+
diskMap, err := common.retrieveAttachBatchedDiskRequests(test.nodeName, diskURI, test.numDisksAllowed)
707721
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s", i, test.desc)
708-
assert.Equal(t, test.diskNum, len(diskMap), "TestCase[%d]: %s", i, test.desc)
722+
if test.diskNum > test.numDisksAllowed {
723+
assert.Equal(t, test.numDisksAllowed, len(diskMap), "TestCase[%d]: %s", i, test.desc)
724+
} else {
725+
assert.Equal(t, test.diskNum, len(diskMap), "TestCase[%d]: %s", i, test.desc)
726+
}
709727
for diskURI, opt := range diskMap {
710728
assert.Equal(t, strings.Contains(diskURI, test.diskURI), true, "TestCase[%d]: %s", i, test.desc)
711729
assert.Equal(t, strings.Contains(opt.DiskName, test.diskName), true, "TestCase[%d]: %s", i, test.desc)
@@ -1067,8 +1085,8 @@ func TestConcurrentDetachDisk(t *testing.T) {
10671085
func(_ context.Context, _ string, name string, params armcompute.VirtualMachine) (*armcompute.VirtualMachine, error) {
10681086
if atomic.AddInt32(&callCount, 1) == 1 {
10691087
klog.Info("First call to CreateOrUpdate succeeded", "VM Name:", name, "Params:", params)
1070-
time.Sleep(100 * time.Millisecond) // Simulate some processing time to hold the node lock while the 3rd detach request is made
1071-
return nil, nil // First call succeeds
1088+
time.Sleep(1000 * time.Millisecond) // Simulate some processing time to hold the node lock while the 3rd detach request is made
1089+
return nil, nil // First call succeeds
10721090
}
10731091
return nil, errors.New("internal error") // Subsequent calls fail
10741092
}).
@@ -1090,7 +1108,7 @@ func TestConcurrentDetachDisk(t *testing.T) {
10901108
}()
10911109
}
10921110

1093-
time.Sleep(1005 * time.Millisecond) // Wait for the batching timeout
1111+
time.Sleep(1100 * time.Millisecond) // Wait for the batching timeout
10941112
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/%s",
10951113
testCloud.SubscriptionID, testCloud.ResourceGroup, "disk-not-batched")
10961114
klog.Info("Calling DetachDisk for non-batched disk detach", expectedVM)

pkg/azuredisk/controllerserver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
4343
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
4444
"sigs.k8s.io/azuredisk-csi-driver/pkg/optimization"
45+
"sigs.k8s.io/azuredisk-csi-driver/pkg/util"
4546
volumehelper "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
4647
azureconsts "sigs.k8s.io/cloud-provider-azure/pkg/consts"
4748
"sigs.k8s.io/cloud-provider-azure/pkg/metrics"
@@ -571,6 +572,9 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
571572
if len(errMsg) > maxErrMsgLength {
572573
errMsg = errMsg[:maxErrMsgLength]
573574
}
575+
if strings.Contains(err.Error(), util.MaximumDataDiskExceededMsg) {
576+
return nil, status.Errorf(codes.ResourceExhausted, "%v", errMsg)
577+
}
574578
return nil, status.Errorf(codes.Internal, "%v", errMsg)
575579
}
576580
}

0 commit comments

Comments
 (0)