Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions pkg/azuredisk/azure_controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,13 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
if int(maxNumDisks) > numDisksAttached {
numDisksAllowed = int(maxNumDisks) - numDisksAttached
} else {
numDisksAllowed = 0
return -1, fmt.Errorf("Maximum number of disks %s %d", util.MaximumDataDiskExceededMsg, maxNumDisks)
}
}
}
}

diskMap, err := c.retrieveAttachBatchedDiskRequests(node, diskuri)
diskMap, err := c.retrieveAttachBatchedDiskRequests(node, diskuri, numDisksAllowed)
if err != nil {
return -1, err
}
Expand All @@ -248,22 +248,6 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, diskName, diskURI str
return c.verifyAttach(ctx, diskName, diskURI, nodeName)
}

// Remove some disks from the batch if the number is more than the max number of disks allowed
removeDisks := len(diskMap) - numDisksAllowed
if removeDisks > 0 {
klog.V(2).Infof("too many disks to attach, remove %d disks from the request", removeDisks)
for diskURI, options := range diskMap {
if removeDisks == 0 {
break
}
if options != nil {
klog.V(2).Infof("remove disk(%s) from attach request from node(%s)", diskURI, nodeName)
delete(diskMap, diskURI)
}
removeDisks--
}
}

lun, setLunErr := c.SetDiskLun(ctx, nodeName, diskuri, diskMap, occupiedLuns)
if setLunErr != nil {
return -1, setLunErr
Expand Down Expand Up @@ -333,7 +317,7 @@ func (c *controllerCommon) batchAttachDiskRequest(diskURI, nodeName string, opti

// clean up attach disk requests
// return original attach disk requests
func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI string) (map[string]*provider.AttachDiskOptions, error) {
func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI string, numDisksAllowed int) (map[string]*provider.AttachDiskOptions, error) {
var diskMap map[string]*provider.AttachDiskOptions

attachDiskMapKey := nodeName + attachDiskMapKeySuffix
Expand All @@ -350,7 +334,26 @@ func (c *controllerCommon) retrieveAttachBatchedDiskRequests(nodeName, diskURI s
klog.V(2).Infof("no attach disk(%s) request on node(%s), diskMap len:%d, %+v", diskURI, nodeName, len(diskMap), diskMap)
return nil, nil
}
c.attachDiskMap.Store(nodeName, make(map[string]*provider.AttachDiskOptions))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

original logic is always clearing up the queue since if removeDisks > 0, keep the remaining disk in the queue would not succeed in the end. and CSI driver has retry logic. clearing up the queue would make the logic more straightfoward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wanted to avoid dropping the disks that haven't been processed, even though I agree that k8s will eventually retry but more changes need to be made to do this safely, so I am adding the cleaning up the queue part back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this approach, the next attach request that comes, would add to this batch of already existing disks in the attachDiskMap? So, we won't create a fresh batch, but reuse the originally dropped disks?
Is that the correct understanding?


// Remove disks from the batch if the number is more than the number of disks node can support
disksToKeepInQueue := make(map[string]*provider.AttachDiskOptions)
removeDisks := len(diskMap) - numDisksAllowed
if removeDisks > 0 {
klog.V(2).Infof("too many disks to attach, remove %d disks from the request", removeDisks)
for currDiskURI, options := range diskMap {
if removeDisks == 0 {
break
}
if options != nil && currDiskURI != diskURI {
klog.V(2).Infof("remove disk(%s) from current batch request from node(%s) but requeue", currDiskURI, nodeName)
disksToKeepInQueue[currDiskURI] = options
delete(diskMap, currDiskURI)
removeDisks--
}
}
}

c.attachDiskMap.Store(nodeName, disksToKeepInQueue)
return diskMap, nil
}

Expand Down
64 changes: 41 additions & 23 deletions pkg/azuredisk/azure_controller_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,32 +646,45 @@ func TestAttachDiskRequest(t *testing.T) {
nodeName string
diskName string
diskNum int
numDisksAllowed int
duplicateDiskRequest bool
expectedErr bool
}{
{
desc: "one disk request in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 1,
expectedErr: false,
desc: "one disk request in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 1,
numDisksAllowed: 8,
expectedErr: false,
},
{
desc: "multiple disk requests in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 10,
expectedErr: false,
desc: "multiple disk requests in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 10,
numDisksAllowed: 16,
expectedErr: false,
},
{
desc: "zero disk request in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 0,
expectedErr: false,
desc: "multiple disk requests in queue but exceeds node limit",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 10,
numDisksAllowed: 8,
expectedErr: false,
},
{
desc: "zero disk request in queue",
diskURI: "diskURI",
nodeName: "nodeName",
diskName: "diskName",
diskNum: 0,
numDisksAllowed: 8,
expectedErr: false,
},
{
desc: "multiple disk requests in queue",
Expand All @@ -680,6 +693,7 @@ func TestAttachDiskRequest(t *testing.T) {
diskName: "diskName",
duplicateDiskRequest: true,
diskNum: 10,
numDisksAllowed: 16,
expectedErr: false,
},
}
Expand All @@ -703,9 +717,13 @@ func TestAttachDiskRequest(t *testing.T) {
}

diskURI := fmt.Sprintf("%s%d", test.diskURI, test.diskNum)
diskMap, err := common.retrieveAttachBatchedDiskRequests(test.nodeName, diskURI)
diskMap, err := common.retrieveAttachBatchedDiskRequests(test.nodeName, diskURI, test.numDisksAllowed)
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.diskNum, len(diskMap), "TestCase[%d]: %s", i, test.desc)
if test.diskNum > test.numDisksAllowed {
assert.Equal(t, test.numDisksAllowed, len(diskMap), "TestCase[%d]: %s", i, test.desc)
} else {
assert.Equal(t, test.diskNum, len(diskMap), "TestCase[%d]: %s", i, test.desc)
}
for diskURI, opt := range diskMap {
assert.Equal(t, strings.Contains(diskURI, test.diskURI), true, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, strings.Contains(opt.DiskName, test.diskName), true, "TestCase[%d]: %s", i, test.desc)
Expand Down Expand Up @@ -1067,8 +1085,8 @@ func TestConcurrentDetachDisk(t *testing.T) {
func(_ context.Context, _ string, name string, params armcompute.VirtualMachine) (*armcompute.VirtualMachine, error) {
if atomic.AddInt32(&callCount, 1) == 1 {
klog.Info("First call to CreateOrUpdate succeeded", "VM Name:", name, "Params:", params)
time.Sleep(100 * time.Millisecond) // Simulate some processing time to hold the node lock while the 3rd detach request is made
return nil, nil // First call succeeds
time.Sleep(1000 * time.Millisecond) // Simulate some processing time to hold the node lock while the 3rd detach request is made
Copy link

Copilot AI Aug 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep duration was increased from 100ms to 1000ms (10x increase) without explanation. This significantly slows down the test. Consider using a more reasonable duration or explaining why such a long delay is necessary.

Suggested change
time.Sleep(1000 * time.Millisecond) // Simulate some processing time to hold the node lock while the 3rd detach request is made
time.Sleep(100 * time.Millisecond) // Simulate processing time to hold the node lock while the 3rd detach request is made; 100ms is sufficient for concurrency in tests

Copilot uses AI. Check for mistakes.
return nil, nil // First call succeeds
}
return nil, errors.New("internal error") // Subsequent calls fail
}).
Expand All @@ -1090,7 +1108,7 @@ func TestConcurrentDetachDisk(t *testing.T) {
}()
}

time.Sleep(1005 * time.Millisecond) // Wait for the batching timeout
time.Sleep(1100 * time.Millisecond) // Wait for the batching timeout
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/%s",
testCloud.SubscriptionID, testCloud.ResourceGroup, "disk-not-batched")
klog.Info("Calling DetachDisk for non-batched disk detach", expectedVM)
Expand Down
4 changes: 4 additions & 0 deletions pkg/azuredisk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/azuredisk-csi-driver/pkg/optimization"
"sigs.k8s.io/azuredisk-csi-driver/pkg/util"
volumehelper "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
azureconsts "sigs.k8s.io/cloud-provider-azure/pkg/consts"
"sigs.k8s.io/cloud-provider-azure/pkg/metrics"
Expand Down Expand Up @@ -571,6 +572,9 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
if len(errMsg) > maxErrMsgLength {
errMsg = errMsg[:maxErrMsgLength]
}
if strings.Contains(err.Error(), util.MaximumDataDiskExceededMsg) {
return nil, status.Errorf(codes.ResourceExhausted, "%v", errMsg)
}
return nil, status.Errorf(codes.Internal, "%v", errMsg)
}
}
Expand Down
Loading