Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 50 additions & 36 deletions pkg/azuredisk/azure_controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,25 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
if err != nil {
klog.Errorf("failed to get node info from labels: %v", err)
} else if instanceType != "" {
maxNumDisks, instanceExists := GetMaxDataDiskCount(instanceType)
maxNodeDisks, instanceExists := GetMaxDataDiskCount(instanceType)
if instanceExists {
attachedDisks, _, err := c.GetNodeDataDisks(ctx, nodeName, azcache.CacheReadTypeDefault)
if err != nil {
return -1, err
}
numDisksAttached := len(attachedDisks)
if int(maxNumDisks) > numDisksAttached {
numDisksAllowed = int(maxNumDisks) - numDisksAttached
} else {
numDisksAllowed = 0
currentNodeDisks := len(attachedDisks)
maxNodeDisks := int(maxNodeDisks)
if currentNodeDisks > maxNodeDisks {
Copy link

Copilot AI Aug 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition should use >= instead of > since reaching the maximum number of disks should also trigger the error. If currentNodeDisks equals maxNodeDisks, there's no room for additional disks.

Suggested change
if currentNodeDisks > maxNodeDisks {
if currentNodeDisks >= maxNodeDisks {

Copilot uses AI. Check for mistakes.
// if node already has max number of disks, clear the attach disk requests and return an early error
c.clearAttachDiskRequests(node)
return -1, fmt.Errorf("Maximum number of disks %s %d", util.MaximumDataDiskExceededMsg, maxNodeDisks)
}
numDisksAllowed = maxNodeDisks - currentNodeDisks
}
}
}

diskMap, err := c.retrieveAttachBatchedDiskRequests(node, diskuri)
diskMap, err := c.retrieveBatchedAttachDiskRequests(node, diskuri, numDisksAllowed)
if err != nil {
return -1, err
}
Expand All @@ -248,22 +250,6 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
return c.verifyAttach(ctx, diskName, diskURI, nodeName)
}

// Remove some disks from the batch if the number is more than the max number of disks allowed
removeDisks := len(diskMap) - numDisksAllowed
if removeDisks > 0 {
klog.V(2).Infof("too many disks to attach, remove %d disks from the request", removeDisks)
for diskURI, options := range diskMap {
if removeDisks == 0 {
break
}
if options != nil {
klog.V(2).Infof("remove disk(%s) from attach request from node(%s)", diskURI, nodeName)
delete(diskMap, diskURI)
}
removeDisks--
}
}

lun, setLunErr := c.SetDiskLun(ctx, nodeName, diskuri, diskMap, occupiedLuns)
if setLunErr != nil {
return -1, setLunErr
Expand Down Expand Up @@ -306,7 +292,7 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
return lun, nil
}

// insertAttachDiskRequest return (attachDiskRequestQueueLength, error)
// batchAttachDiskRequest batches the attach disk requests for the node.
func (c *controllerCommon) batchAttachDiskRequest(diskURI, nodeName string, options *provider.AttachDiskOptions) (int, error) {
var diskMap map[string]*provider.AttachDiskOptions
attachDiskMapKey := nodeName + attachDiskMapKeySuffix
Expand All @@ -331,9 +317,12 @@ func (c *controllerCommon) batchAttachDiskRequest(diskURI, nodeName string, opti
return len(diskMap), nil
}

// clean up attach disk requests
// return original attach disk requests
func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI string) (map[string]*provider.AttachDiskOptions, error) {
// retrieveBatchedAttachDiskRequests removes the current attach disk requests for the node
// and returns it for processing. If the number of disks in the batch exceeds
// the number of disks the node can support, it will remove the extra disks from the batch
// and return the remaining disks for processing. The requested diskURI will always be returned in the batch if it exists.
// The batch will be empty after this call.
func (c *controllerCommon) retrieveBatchedAttachDiskRequests(nodeName, diskURI string, numDisksAllowed int) (map[string]*provider.AttachDiskOptions, error) {
var diskMap map[string]*provider.AttachDiskOptions

attachDiskMapKey := nodeName + attachDiskMapKeySuffix
Expand All @@ -350,10 +339,35 @@ func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI s
klog.V(2).Infof("no attach disk(%s) request on node(%s), diskMap len:%d, %+v", diskURI, nodeName, len(diskMap), diskMap)
return nil, nil
}

// Remove disks from the batch if the number is more than the number of disks node can support
removeDisks := len(diskMap) - numDisksAllowed
if removeDisks > 0 {
klog.V(2).Infof("too many disks to attach, remove %d disks from the request", removeDisks)
for currDiskURI, options := range diskMap {
if removeDisks == 0 {
break
}
if options != nil && currDiskURI != diskURI {
klog.V(2).Infof("remove disk(%s) from current batch request from node(%s) but requeue", currDiskURI, nodeName)
delete(diskMap, currDiskURI)
removeDisks--
}
}
}

c.attachDiskMap.Store(nodeName, make(map[string]*provider.AttachDiskOptions))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

original logic is always clearing up the queue since if removeDisks > 0, keep the remaining disk in the queue would not succeed in the end. and CSI driver has retry logic. clearing up the queue would make the logic more straightfoward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wanted to avoid dropping the disks that haven't been processed, even though I agree that k8s will eventually retry but more changes need to be made to do this safely, so I am adding the cleaning up the queue part back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this approach, the next attach request that comes, would add to this batch of already existing disks in the attachDiskMap? So, we won't create a fresh batch, but reuse the originally dropped disks?
Is that the correct understanding?

return diskMap, nil
}

// clearAttachDiskRequests clears the attach disk requests for the node.
func (c *controllerCommon) clearAttachDiskRequests(nodeName string) {
attachDiskMapKey := nodeName + attachDiskMapKeySuffix
c.lockMap.LockEntry(attachDiskMapKey)
defer c.lockMap.UnlockEntry(attachDiskMapKey)
c.attachDiskMap.Store(nodeName, make(map[string]*provider.AttachDiskOptions))
}

// DetachDisk detaches a disk from VM
func (c *controllerCommon) DetachDisk(ctx context.Context, diskName, diskURI string, nodeName types.NodeName) error {
if _, err := c.cloud.InstanceID(ctx, nodeName); err != nil {
Expand Down Expand Up @@ -387,7 +401,7 @@ func (c *controllerCommon) DetachDisk(ctx context.Context, diskName, diskURI str
time.Sleep(time.Duration(c.AttachDetachInitialDelayInMs) * time.Millisecond)
}

diskMap, err := c.retrieveDetachBatchedDiskRequests(node, formattedDiskURI)
diskMap, err := c.retrieveBatchedDetachDiskRequests(node, formattedDiskURI)
if err != nil {
return err
}
Expand Down Expand Up @@ -452,23 +466,23 @@ func (c *controllerCommon) verifyDetach(ctx context.Context, diskName, diskURI s
lun, vmState, errGetLun := c.GetDiskLun(ctx, diskName, diskURI, nodeName)
if errGetLun != nil {
if strings.Contains(errGetLun.Error(), consts.CannotFindDiskLUN) {
klog.V(2).Infof("azureDisk - detach disk(%s, %s) succeeded", diskName, diskURI)
klog.V(2).Infof("detach disk(%s, %s) succeeded", diskName, diskURI)
return nil
}
klog.Errorf("azureDisk - detach disk(%s, %s) failed, error: %v", diskName, diskURI, errGetLun)
klog.Errorf("detach disk(%s, %s) failed, error: %v", diskName, diskURI, errGetLun)
return errGetLun
}

return fmt.Errorf("disk(%s) is still attached to node(%s) on lun(%d), vmState: %s", diskURI, nodeName, lun, ptr.Deref(vmState, ""))
return fmt.Errorf("detach disk(%s) failed, disk is still attached to node(%s) on lun(%d), vmState: %s", diskURI, nodeName, lun, ptr.Deref(vmState, ""))
}

// verifyAttach verifies if the disk is attached to the node by checking the disk lun.
func (c *controllerCommon) verifyAttach(ctx context.Context, diskName, diskURI string, nodeName types.NodeName) (int32, error) {
lun, vmState, errGetLun := c.GetDiskLun(ctx, diskName, diskURI, nodeName)
if errGetLun != nil {
return -1, fmt.Errorf("disk(%s) could not be found on node(%s), vmState: %s, error: %w", diskURI, nodeName, ptr.Deref(vmState, ""), errGetLun)
return -1, fmt.Errorf("attach disk(%s) failed, disk could not be found on node(%s), vmState: %s, error: %w", diskURI, nodeName, ptr.Deref(vmState, ""), errGetLun)
}
klog.V(2).Infof("azureDisk - verify attach disk(%s, %s) succeeded on lun(%d)", diskName, diskURI, lun)
klog.V(2).Infof("attach disk(%s, %s) succeeded on lun(%d)", diskName, diskURI, lun)
return lun, nil
}

Expand Down Expand Up @@ -500,9 +514,9 @@ func (c *controllerCommon) batchDetachDiskRequest(diskName, diskURI, nodeName st
return len(diskMap), nil
}

// retrieveDetachBatchedDiskRequests removes the current detach disk requests for the node
// and returns it for processing
func (c *controllerCommon) retrieveDetachBatchedDiskRequests(nodeName, diskURI string) (map[string]string, error) {
// retrieveBatchedDetachDiskRequests removes the current detach disk requests for the node
// and returns it for processing.
func (c *controllerCommon) retrieveBatchedDetachDiskRequests(nodeName, diskURI string) (map[string]string, error) {
var diskMap map[string]string

detachDiskMapKey := nodeName + detachDiskMapKeySuffix
Expand Down
100 changes: 75 additions & 25 deletions pkg/azuredisk/azure_controller_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,32 +646,45 @@ func TestAttachDiskRequest(t *testing.T) {
nodeName string
diskName string
diskNum int
numDisksAllowed int
duplicateDiskRequest bool
expectedErr bool
}{
{
desc: "one disk request in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 1,
expectedErr: false,
desc: "one disk request in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 1,
numDisksAllowed: 8,
expectedErr: false,
},
{
desc: "multiple disk requests in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 10,
expectedErr: false,
desc: "multiple disk requests in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 10,
numDisksAllowed: 16,
expectedErr: false,
},
{
desc: "zero disk request in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 0,
expectedErr: false,
desc: "multiple disk requests in queue but exceeds node limit",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 10,
numDisksAllowed: 8,
expectedErr: false,
},
{
desc: "zero disk request in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 0,
numDisksAllowed: 8,
expectedErr: false,
},
{
desc: "multiple disk requests in queue",
Expand All @@ -680,6 +693,7 @@ func TestAttachDiskRequest(t *testing.T) {
diskName: "diskName",
duplicateDiskRequest: true,
diskNum: 10,
numDisksAllowed: 16,
expectedErr: false,
},
}
Expand All @@ -703,9 +717,13 @@ func TestAttachDiskRequest(t *testing.T) {
}

diskURI := fmt.Sprintf("%s%d", test.diskURI, test.diskNum)
diskMap, err := common.retrieveAttachBatchedDiskRequests(test.nodeName, diskURI)
diskMap, err := common.retrieveBatchedAttachDiskRequests(test.nodeName, diskURI, test.numDisksAllowed)
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.diskNum, len(diskMap), "TestCase[%d]: %s", i, test.desc)
if test.diskNum > test.numDisksAllowed {
assert.Equal(t, test.numDisksAllowed, len(diskMap), "TestCase[%d]: %s", i, test.desc)
} else {
assert.Equal(t, test.diskNum, len(diskMap), "TestCase[%d]: %s", i, test.desc)
}
for diskURI, opt := range diskMap {
assert.Equal(t, strings.Contains(diskURI, test.diskURI), true, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, strings.Contains(opt.DiskName, test.diskName), true, "TestCase[%d]: %s", i, test.desc)
Expand Down Expand Up @@ -779,7 +797,7 @@ func TestDetachDiskRequest(t *testing.T) {
}
}

diskMap, err := common.retrieveDetachBatchedDiskRequests(test.nodeName, diskURI)
diskMap, err := common.retrieveBatchedDetachDiskRequests(test.nodeName, diskURI)
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.diskNum, len(diskMap), "TestCase[%d]: %s", i, test.desc)
for diskURI, diskName := range diskMap {
Expand Down Expand Up @@ -988,7 +1006,7 @@ func TestVerifyDetach(t *testing.T) {
nodeName: "node3",
expectedErr: true,
expectedVM: map[string]string{"node3": "PowerState/Running"},
expectedErrMsg: "disk(diskuri) is still attached to node(node3) on lun(2), vmState: Succeeded",
expectedErrMsg: "disk is still attached to node(node3) on lun(2), vmState: Succeeded",
},
}

Expand Down Expand Up @@ -1067,8 +1085,8 @@ func TestConcurrentDetachDisk(t *testing.T) {
func(_ context.Context, _ string, name string, params armcompute.VirtualMachine) (*armcompute.VirtualMachine, error) {
if atomic.AddInt32(&callCount, 1) == 1 {
klog.Info("First call to CreateOrUpdate succeeded", "VM Name:", name, "Params:", params)
time.Sleep(100 * time.Millisecond) // Simulate some processing time to hold the node lock while the 3rd detach request is made
return nil, nil // First call succeeds
time.Sleep(1000 * time.Millisecond) // Simulate some processing time to hold the node lock while the 3rd detach request is made
Copy link

Copilot AI Aug 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep duration was increased from 100ms to 1000ms (10x increase) without explanation. This significantly slows down the test. Consider using a more reasonable duration or explaining why such a long delay is necessary.

Suggested change
time.Sleep(1000 * time.Millisecond) // Simulate some processing time to hold the node lock while the 3rd detach request is made
time.Sleep(100 * time.Millisecond) // Simulate processing time to hold the node lock while the 3rd detach request is made; 100ms is sufficient for concurrency in tests

Copilot uses AI. Check for mistakes.
return nil, nil // First call succeeds
}
return nil, errors.New("internal error") // Subsequent calls fail
}).
Expand All @@ -1090,7 +1108,7 @@ func TestConcurrentDetachDisk(t *testing.T) {
}()
}

time.Sleep(1005 * time.Millisecond) // Wait for the batching timeout
time.Sleep(1100 * time.Millisecond) // Wait for the batching timeout
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/%s",
testCloud.SubscriptionID, testCloud.ResourceGroup, "disk-not-batched")
klog.Info("Calling DetachDisk for non-batched disk detach", expectedVM)
Expand All @@ -1110,6 +1128,38 @@ func TestConcurrentDetachDisk(t *testing.T) {
assert.Error(t, err, "DetachDisk should return an error for the non-batched disk detach")
}

func TestClearAttachDiskRequests(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testCloud := provider.GetTestCloud(ctrl)
common := &controllerCommon{
cloud: testCloud,
lockMap: newLockMap(),
}

nodeName := "testnode"
// Prepare attachDiskMap with some entries
diskMap := map[string]*provider.AttachDiskOptions{
"diskURI1": {DiskName: "disk1"},
"diskURI2": {DiskName: "disk2"},
}
common.attachDiskMap.Store(nodeName, diskMap)

// Ensure attachDiskMap has entries before clearing
if v, ok := common.attachDiskMap.Load(nodeName); !ok || v == nil {
t.Fatalf("attachDiskMap should have entries before clear")
}

// Call clearAttachDiskRequests
common.clearAttachDiskRequests(nodeName)

// After clearing, attachDiskMap should have an empty map for the nodeName
v, ok := common.attachDiskMap.Load(nodeName)
assert.True(t, ok, "attachDiskMap should have an empty map for nodeName after clearAttachDiskRequests")
assert.Empty(t, v, "attachDiskMap should be empty for nodeName after clearAttachDiskRequests")
}

// setTestVirtualMachines sets test virtual machine with powerstate.
func setTestVirtualMachines(c *provider.Cloud, vmList map[string]string, isDataDisksFull bool) []armcompute.VirtualMachine {
expectedVMs := make([]armcompute.VirtualMachine, 0)
Expand Down
4 changes: 4 additions & 0 deletions pkg/azuredisk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/azuredisk-csi-driver/pkg/optimization"
"sigs.k8s.io/azuredisk-csi-driver/pkg/util"
volumehelper "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
azureconsts "sigs.k8s.io/cloud-provider-azure/pkg/consts"
"sigs.k8s.io/cloud-provider-azure/pkg/metrics"
Expand Down Expand Up @@ -571,6 +572,9 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
if len(errMsg) > maxErrMsgLength {
errMsg = errMsg[:maxErrMsgLength]
}
if strings.Contains(err.Error(), util.MaximumDataDiskExceededMsg) {
return nil, status.Errorf(codes.ResourceExhausted, "%v", errMsg)
}
return nil, status.Errorf(codes.Internal, "%v", errMsg)
}
}
Expand Down
Loading