Skip to content

Commit abf84ac

Browse files
committed
chore: cleanup node batching queue when node max disk limit is hit
1 parent 81f5d4e commit abf84ac

File tree

2 files changed

+68
-25
lines changed

2 files changed

+68
-25
lines changed

pkg/azuredisk/azure_controller_common.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -222,23 +222,25 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
222222
if err != nil {
223223
klog.Errorf("failed to get node info from labels: %v", err)
224224
} else if instanceType != "" {
225-
maxNumDisks, instanceExists := GetMaxDataDiskCount(instanceType)
225+
maxNodeDisks, instanceExists := GetMaxDataDiskCount(instanceType)
226226
if instanceExists {
227227
attachedDisks, _, err := c.GetNodeDataDisks(ctx, nodeName, azcache.CacheReadTypeDefault)
228228
if err != nil {
229229
return -1, err
230230
}
231-
numDisksAttached := len(attachedDisks)
232-
if int(maxNumDisks) > numDisksAttached {
233-
numDisksAllowed = int(maxNumDisks) - numDisksAttached
234-
} else {
235-
return -1, fmt.Errorf("Maximum number of disks %s %d", util.MaximumDataDiskExceededMsg, maxNumDisks)
231+
currentNodeDisks := len(attachedDisks)
232+
maxNodeDisks := int(maxNodeDisks)
233+
if currentNodeDisks > maxNodeDisks {
234+
// if node already has max number of disks, clear the attach disk requests and return an early error
235+
c.clearAttachDiskRequests(node)
236+
return -1, fmt.Errorf("Maximum number of disks %s %d", util.MaximumDataDiskExceededMsg, maxNodeDisks)
236237
}
238+
numDisksAllowed = maxNodeDisks - currentNodeDisks
237239
}
238240
}
239241
}
240242

241-
diskMap, err := c.retrieveAttachBatchedDiskRequests(node, diskuri, numDisksAllowed)
243+
diskMap, err := c.retrieveBatchedAttachDiskRequests(node, diskuri, numDisksAllowed)
242244
if err != nil {
243245
return -1, err
244246
}
@@ -290,7 +292,7 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
290292
return lun, nil
291293
}
292294

293-
// insertAttachDiskRequest return (attachDiskRequestQueueLength, error)
295+
// batchAttachDiskRequest batches the attach disk requests for the node.
294296
func (c *controllerCommon) batchAttachDiskRequest(diskURI, nodeName string, options *provider.AttachDiskOptions) (int, error) {
295297
var diskMap map[string]*provider.AttachDiskOptions
296298
attachDiskMapKey := nodeName + attachDiskMapKeySuffix
@@ -315,9 +317,12 @@ func (c *controllerCommon) batchAttachDiskRequest(diskURI, nodeName string, opti
315317
return len(diskMap), nil
316318
}
317319

318-
// clean up attach disk requests
319-
// return original attach disk requests
320-
func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI string, numDisksAllowed int) (map[string]*provider.AttachDiskOptions, error) {
320+
// retrieveBatchedAttachDiskRequests removes the current attach disk requests for the node
321+
// and returns it for processing. If the number of disks in the batch exceeds
322+
// the number of disks the node can support, it will remove the extra disks from the batch
323+
// and return the remaining disks for processing. The requested diskURI will always be returned in the batch if it exists.
324+
// The batch will be empty after this call.
325+
func (c *controllerCommon) retrieveBatchedAttachDiskRequests(nodeName, diskURI string, numDisksAllowed int) (map[string]*provider.AttachDiskOptions, error) {
321326
var diskMap map[string]*provider.AttachDiskOptions
322327

323328
attachDiskMapKey := nodeName + attachDiskMapKeySuffix
@@ -336,7 +341,6 @@ func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI s
336341
}
337342

338343
// Remove disks from the batch if the number is more than the number of disks node can support
339-
disksToKeepInQueue := make(map[string]*provider.AttachDiskOptions)
340344
removeDisks := len(diskMap) - numDisksAllowed
341345
if removeDisks > 0 {
342346
klog.V(2).Infof("too many disks to attach, remove %d disks from the request", removeDisks)
@@ -346,17 +350,24 @@ func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI s
346350
}
347351
if options != nil && currDiskURI != diskURI {
348352
klog.V(2).Infof("remove disk(%s) from current batch request from node(%s) but requeue", currDiskURI, nodeName)
349-
disksToKeepInQueue[currDiskURI] = options
350353
delete(diskMap, currDiskURI)
351354
removeDisks--
352355
}
353356
}
354357
}
355358

356-
c.attachDiskMap.Store(nodeName, disksToKeepInQueue)
359+
c.attachDiskMap.Store(nodeName, make(map[string]*provider.AttachDiskOptions))
357360
return diskMap, nil
358361
}
359362

363+
// clearAttachDiskRequests clears the attach disk requests for the node.
364+
func (c *controllerCommon) clearAttachDiskRequests(nodeName string) {
365+
attachDiskMapKey := nodeName + attachDiskMapKeySuffix
366+
c.lockMap.LockEntry(attachDiskMapKey)
367+
defer c.lockMap.UnlockEntry(attachDiskMapKey)
368+
c.attachDiskMap.Store(nodeName, make(map[string]*provider.AttachDiskOptions))
369+
}
370+
360371
// DetachDisk detaches a disk from VM
361372
func (c *controllerCommon) DetachDisk(ctx context.Context, diskName, diskURI string, nodeName types.NodeName) error {
362373
if _, err := c.cloud.InstanceID(ctx, nodeName); err != nil {
@@ -390,7 +401,7 @@ func (c *controllerCommon) DetachDisk(ctx context.Context, diskName, diskURI str
390401
time.Sleep(time.Duration(c.AttachDetachInitialDelayInMs) * time.Millisecond)
391402
}
392403

393-
diskMap, err := c.retrieveDetachBatchedDiskRequests(node, formattedDiskURI)
404+
diskMap, err := c.retrieveBatchedDetachDiskRequests(node, formattedDiskURI)
394405
if err != nil {
395406
return err
396407
}
@@ -455,23 +466,23 @@ func (c *controllerCommon) verifyDetach(ctx context.Context, diskName, diskURI s
455466
lun, vmState, errGetLun := c.GetDiskLun(ctx, diskName, diskURI, nodeName)
456467
if errGetLun != nil {
457468
if strings.Contains(errGetLun.Error(), consts.CannotFindDiskLUN) {
458-
klog.V(2).Infof("azureDisk - detach disk(%s, %s) succeeded", diskName, diskURI)
469+
klog.V(2).Infof("detach disk(%s, %s) succeeded", diskName, diskURI)
459470
return nil
460471
}
461-
klog.Errorf("azureDisk - detach disk(%s, %s) failed, error: %v", diskName, diskURI, errGetLun)
472+
klog.Errorf("detach disk(%s, %s) failed, error: %v", diskName, diskURI, errGetLun)
462473
return errGetLun
463474
}
464475

465-
return fmt.Errorf("disk(%s) is still attached to node(%s) on lun(%d), vmState: %s", diskURI, nodeName, lun, ptr.Deref(vmState, ""))
476+
return fmt.Errorf("detach disk(%s) failed, disk is still attached to node(%s) on lun(%d), vmState: %s", diskURI, nodeName, lun, ptr.Deref(vmState, ""))
466477
}
467478

468479
// verifyAttach verifies if the disk is attached to the node by checking the disk lun.
469480
func (c *controllerCommon) verifyAttach(ctx context.Context, diskName, diskURI string, nodeName types.NodeName) (int32, error) {
470481
lun, vmState, errGetLun := c.GetDiskLun(ctx, diskName, diskURI, nodeName)
471482
if errGetLun != nil {
472-
return -1, fmt.Errorf("disk(%s) could not be found on node(%s), vmState: %s, error: %w", diskURI, nodeName, ptr.Deref(vmState, ""), errGetLun)
483+
return -1, fmt.Errorf("attach disk(%s) failed, disk could not be found on node(%s), vmState: %s, error: %w", diskURI, nodeName, ptr.Deref(vmState, ""), errGetLun)
473484
}
474-
klog.V(2).Infof("azureDisk - verify attach disk(%s, %s) succeeded on lun(%d)", diskName, diskURI, lun)
485+
klog.V(2).Infof("attach disk(%s, %s) succeeded on lun(%d)", diskName, diskURI, lun)
475486
return lun, nil
476487
}
477488

@@ -503,9 +514,9 @@ func (c *controllerCommon) batchDetachDiskRequest(diskName, diskURI, nodeName st
503514
return len(diskMap), nil
504515
}
505516

506-
// retrieveDetachBatchedDiskRequests removes the current detach disk requests for the node
507-
// and returns it for processing
508-
func (c *controllerCommon) retrieveDetachBatchedDiskRequests(nodeName, diskURI string) (map[string]string, error) {
517+
// retrieveBatchedDetachDiskRequests removes the current detach disk requests for the node
518+
// and returns it for processing.
519+
func (c *controllerCommon) retrieveBatchedDetachDiskRequests(nodeName, diskURI string) (map[string]string, error) {
509520
var diskMap map[string]string
510521

511522
detachDiskMapKey := nodeName + detachDiskMapKeySuffix

pkg/azuredisk/azure_controller_common_test.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ func TestAttachDiskRequest(t *testing.T) {
717717
}
718718

719719
diskURI := fmt.Sprintf("%s%d", test.diskURI, test.diskNum)
720-
diskMap, err := common.retrieveAttachBatchedDiskRequests(test.nodeName, diskURI, test.numDisksAllowed)
720+
diskMap, err := common.retrieveBatchedAttachDiskRequests(test.nodeName, diskURI, test.numDisksAllowed)
721721
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s", i, test.desc)
722722
if test.diskNum > test.numDisksAllowed {
723723
assert.Equal(t, test.numDisksAllowed, len(diskMap), "TestCase[%d]: %s", i, test.desc)
@@ -797,7 +797,7 @@ func TestDetachDiskRequest(t *testing.T) {
797797
}
798798
}
799799

800-
diskMap, err := common.retrieveDetachBatchedDiskRequests(test.nodeName, diskURI)
800+
diskMap, err := common.retrieveBatchedDetachDiskRequests(test.nodeName, diskURI)
801801
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s", i, test.desc)
802802
assert.Equal(t, test.diskNum, len(diskMap), "TestCase[%d]: %s", i, test.desc)
803803
for diskURI, diskName := range diskMap {
@@ -1128,6 +1128,38 @@ func TestConcurrentDetachDisk(t *testing.T) {
11281128
assert.Error(t, err, "DetachDisk should return an error for the non-batched disk detach")
11291129
}
11301130

1131+
func TestClearAttachDiskRequests(t *testing.T) {
1132+
ctrl := gomock.NewController(t)
1133+
defer ctrl.Finish()
1134+
1135+
testCloud := provider.GetTestCloud(ctrl)
1136+
common := &controllerCommon{
1137+
cloud: testCloud,
1138+
lockMap: newLockMap(),
1139+
}
1140+
1141+
nodeName := "testnode"
1142+
// Prepare attachDiskMap with some entries
1143+
diskMap := map[string]*provider.AttachDiskOptions{
1144+
"diskURI1": {DiskName: "disk1"},
1145+
"diskURI2": {DiskName: "disk2"},
1146+
}
1147+
common.attachDiskMap.Store(nodeName, diskMap)
1148+
1149+
// Ensure attachDiskMap has entries before clearing
1150+
if v, ok := common.attachDiskMap.Load(nodeName); !ok || v == nil {
1151+
t.Fatalf("attachDiskMap should have entries before clear")
1152+
}
1153+
1154+
// Call clearAttachDiskRequests
1155+
common.clearAttachDiskRequests(nodeName)
1156+
1157+
// After clearing, attachDiskMap should have an empty map for the nodeName
1158+
v, ok := common.attachDiskMap.Load(nodeName)
1159+
assert.True(t, ok, "attachDiskMap should have an empty map for nodeName after clearAttachDiskRequests")
1160+
assert.Empty(t, v, "attachDiskMap should be empty for nodeName after clearAttachDiskRequests")
1161+
}
1162+
11311163
// setTestVirtualMachines sets test virtual machine with powerstate.
11321164
func setTestVirtualMachines(c *provider.Cloud, vmList map[string]string, isDataDisksFull bool) []armcompute.VirtualMachine {
11331165
expectedVMs := make([]armcompute.VirtualMachine, 0)

0 commit comments

Comments
 (0)