Skip to content

Commit 3916c4a

Browse files
committed
remove disk locks per vm
maintain map with nodename and lock move lock map to utils
1 parent 8af6906 commit 3916c4a

File tree

5 files changed

+88
-13
lines changed

5 files changed

+88
-13
lines changed

staging/src/k8s.io/legacy-cloud-providers/azure/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"azure_standard.go",
3030
"azure_storage.go",
3131
"azure_storageaccount.go",
32+
"azure_utils.go",
3233
"azure_vmsets.go",
3334
"azure_vmss.go",
3435
"azure_vmss_cache.go",
@@ -76,7 +77,6 @@ go_library(
7677
"//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library",
7778
"//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library",
7879
"//vendor/k8s.io/klog:go_default_library",
79-
"//vendor/k8s.io/utils/keymutex:go_default_library",
8080
"//vendor/k8s.io/utils/net:go_default_library",
8181
"//vendor/sigs.k8s.io/yaml:go_default_library",
8282
],

staging/src/k8s.io/legacy-cloud-providers/azure/azure.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ func initDiskControllers(az *Cloud) error {
614614
resourceGroup: az.ResourceGroup,
615615
subscriptionID: az.SubscriptionID,
616616
cloud: az,
617+
vmLockMap: newLockMap(),
617618
}
618619

619620
az.BlobDiskController = &BlobDiskController{common: common}

staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
cloudprovider "k8s.io/cloud-provider"
3434
volerr "k8s.io/cloud-provider/volume/errors"
3535
"k8s.io/klog"
36-
"k8s.io/utils/keymutex"
3736
)
3837

3938
const (
@@ -58,17 +57,16 @@ var defaultBackOff = kwait.Backoff{
5857
Jitter: 0.0,
5958
}
6059

61-
// acquire lock to attach/detach disk in one node
62-
var diskOpMutex = keymutex.NewHashed(0)
63-
6460
type controllerCommon struct {
6561
subscriptionID string
6662
location string
6763
storageEndpointSuffix string
6864
resourceGroup string
6965
// store disk URI when disk is in attaching or detaching process
7066
diskAttachDetachMap sync.Map
71-
cloud *Cloud
67+
// vm disk map used to lock per vm update calls
68+
vmLockMap *lockMap
69+
cloud *Cloud
7270
}
7371

7472
// getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type.
@@ -144,8 +142,8 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
144142
return -1, fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
145143
}
146144

147-
diskOpMutex.LockKey(instanceid)
148-
defer diskOpMutex.UnlockKey(instanceid)
145+
c.vmLockMap.LockEntry(string(nodeName))
146+
defer c.vmLockMap.UnlockEntry(string(nodeName))
149147

150148
lun, err := c.GetNextDiskLun(nodeName)
151149
if err != nil {
@@ -161,7 +159,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
161159

162160
// DetachDisk detaches a disk from host. The vhd can be identified by diskName or diskURI.
163161
func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
164-
instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName)
162+
_, err := c.cloud.InstanceID(context.TODO(), nodeName)
165163
if err != nil {
166164
if err == cloudprovider.InstanceNotFound {
167165
// if host doesn't exist, no need to detach
@@ -181,20 +179,20 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
181179
klog.V(2).Infof("detach %v from node %q", diskURI, nodeName)
182180

183181
// make the lock here as small as possible
184-
diskOpMutex.LockKey(instanceid)
182+
c.vmLockMap.LockEntry(string(nodeName))
185183
c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching")
186184
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
187185
c.diskAttachDetachMap.Delete(strings.ToLower(diskURI))
188-
diskOpMutex.UnlockKey(instanceid)
186+
c.vmLockMap.UnlockEntry(string(nodeName))
189187

190188
if c.cloud.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
191189
klog.V(2).Infof("azureDisk - update backing off: detach disk(%s, %s), err: %v", diskName, diskURI, err)
192190
retryErr := kwait.ExponentialBackoff(c.cloud.RequestBackoff(), func() (bool, error) {
193-
diskOpMutex.LockKey(instanceid)
191+
c.vmLockMap.LockEntry(string(nodeName))
194192
c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching")
195193
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
196194
c.diskAttachDetachMap.Delete(strings.ToLower(diskURI))
197-
diskOpMutex.UnlockKey(instanceid)
195+
c.vmLockMap.UnlockEntry(string(nodeName))
198196
return c.cloud.processHTTPRetryResponse(nil, "", resp, err)
199197
})
200198
if retryErr != nil {

staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func TestCommonAttachDisk(t *testing.T) {
6868
resourceGroup: testCloud.ResourceGroup,
6969
subscriptionID: testCloud.SubscriptionID,
7070
cloud: testCloud,
71+
vmLockMap: newLockMap(),
7172
}
7273
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name",
7374
testCloud.SubscriptionID, testCloud.ResourceGroup)
@@ -116,6 +117,7 @@ func TestCommonDetachDisk(t *testing.T) {
116117
resourceGroup: testCloud.ResourceGroup,
117118
subscriptionID: testCloud.SubscriptionID,
118119
cloud: testCloud,
120+
vmLockMap: newLockMap(),
119121
}
120122
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name",
121123
testCloud.SubscriptionID, testCloud.ResourceGroup)
@@ -156,6 +158,7 @@ func TestGetDiskLun(t *testing.T) {
156158
resourceGroup: testCloud.ResourceGroup,
157159
subscriptionID: testCloud.SubscriptionID,
158160
cloud: testCloud,
161+
vmLockMap: newLockMap(),
159162
}
160163
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false)
161164

@@ -194,6 +197,7 @@ func TestGetNextDiskLun(t *testing.T) {
194197
resourceGroup: testCloud.ResourceGroup,
195198
subscriptionID: testCloud.SubscriptionID,
196199
cloud: testCloud,
200+
vmLockMap: newLockMap(),
197201
}
198202
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, test.isDataDisksFull)
199203

@@ -235,6 +239,7 @@ func TestDisksAreAttached(t *testing.T) {
235239
resourceGroup: testCloud.ResourceGroup,
236240
subscriptionID: testCloud.SubscriptionID,
237241
cloud: testCloud,
242+
vmLockMap: newLockMap(),
238243
}
239244
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false)
240245

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// +build !providerless
2+
3+
/*
4+
Copyright 2018 The Kubernetes Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package azure
20+
21+
import (
22+
"sync"
23+
)
24+
25+
// lockMap used to lock on entries
26+
type lockMap struct {
27+
sync.Mutex
28+
mutexMap map[string]*sync.Mutex
29+
}
30+
31+
// NewLockMap returns a new lock map
32+
func newLockMap() *lockMap {
33+
return &lockMap{
34+
mutexMap: make(map[string]*sync.Mutex),
35+
}
36+
}
37+
38+
// LockEntry acquires a lock associated with the specific entry
39+
func (lm *lockMap) LockEntry(entry string) {
40+
lm.Lock()
41+
// check if entry does not exists, then add entry
42+
if _, exists := lm.mutexMap[entry]; !exists {
43+
lm.addEntry(entry)
44+
}
45+
46+
lm.Unlock()
47+
lm.lockEntry(entry)
48+
}
49+
50+
// UnlockEntry release the lock associated with the specific entry
51+
func (lm *lockMap) UnlockEntry(entry string) {
52+
lm.Lock()
53+
defer lm.Unlock()
54+
55+
if _, exists := lm.mutexMap[entry]; !exists {
56+
return
57+
}
58+
lm.unlockEntry(entry)
59+
}
60+
61+
func (lm *lockMap) addEntry(entry string) {
62+
lm.mutexMap[entry] = &sync.Mutex{}
63+
}
64+
65+
func (lm *lockMap) lockEntry(entry string) {
66+
lm.mutexMap[entry].Lock()
67+
}
68+
69+
func (lm *lockMap) unlockEntry(entry string) {
70+
lm.mutexMap[entry].Unlock()
71+
}

0 commit comments

Comments
 (0)