Skip to content

Commit c92ca4c

Browse files
authored
Merge pull request kubernetes#3141 from marwanad/avoid-deletion-conflicts
Avoid sending extra deletion calls for in-progress deletions
2 parents 0845415 + fbe928c commit c92ca4c

File tree

4 files changed

+163
-41
lines changed

4 files changed

+163
-41
lines changed

cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute"
2323
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
2424
"github.com/Azure/go-autorest/autorest/azure"
25+
"github.com/Azure/go-autorest/autorest/to"
2526
"github.com/stretchr/testify/assert"
2627

2728
apiv1 "k8s.io/api/core/v1"
@@ -67,7 +68,19 @@ func newTestAzureManager(t *testing.T) *AzureManager {
6768
},
6869
},
6970
},
70-
virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{},
71+
virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{
72+
FakeStore: map[string]map[string]compute.VirtualMachineScaleSetVM{
73+
"test": {
74+
"0": {
75+
ID: to.StringPtr(fakeVirtualMachineScaleSetVMID),
76+
InstanceID: to.StringPtr("0"),
77+
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
78+
VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"),
79+
},
80+
},
81+
},
82+
},
83+
},
7184
},
7285
}
7386

cluster-autoscaler/cloudprovider/azure/azure_fakes.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ func (client *VirtualMachineScaleSetsClientMock) List(ctx context.Context, resou
110110
// VirtualMachineScaleSetVMsClientMock mocks for VirtualMachineScaleSetVMsClient.
111111
type VirtualMachineScaleSetVMsClientMock struct {
112112
mock.Mock
113+
mutex sync.Mutex
114+
FakeStore map[string]map[string]compute.VirtualMachineScaleSetVM
113115
}
114116

115117
// Get gets a VirtualMachineScaleSetVM by VMScaleSetName and instanceID.
@@ -128,18 +130,14 @@ func (m *VirtualMachineScaleSetVMsClientMock) Get(ctx context.Context, resourceG
128130

129131
// List gets a list of VirtualMachineScaleSetVMs.
130132
func (m *VirtualMachineScaleSetVMsClientMock) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error) {
131-
ID := fakeVirtualMachineScaleSetVMID
132-
instanceID := "0"
133-
vmID := "123E4567-E89B-12D3-A456-426655440000"
134-
properties := compute.VirtualMachineScaleSetVMProperties{
135-
VMID: &vmID,
136-
}
137-
result = append(result, compute.VirtualMachineScaleSetVM{
138-
ID: &ID,
139-
InstanceID: &instanceID,
140-
VirtualMachineScaleSetVMProperties: &properties,
141-
})
133+
m.mutex.Lock()
134+
defer m.mutex.Unlock()
142135

136+
if _, ok := m.FakeStore[resourceGroupName]; ok {
137+
for _, v := range m.FakeStore[resourceGroupName] {
138+
result = append(result, v)
139+
}
140+
}
143141
return result, nil
144142
}
145143

cluster-autoscaler/cloudprovider/azure/azure_scale_set.go

Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
321321
}
322322

323323
// GetScaleSetVms returns list of nodes for the given scale set.
324-
// Note that the list results is not used directly because their resource ID format
325-
// is not consistent with Get results.
326-
func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, *retry.Error) {
324+
func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, error) {
327325
klog.V(4).Infof("GetScaleSetVms: starts")
328326
ctx, cancel := getContextWithCancel()
329327
defer cancel()
@@ -336,24 +334,7 @@ func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, *retry.Error) {
336334
return nil, rerr
337335
}
338336

339-
allVMs := make([]string, 0)
340-
for _, vm := range vmList {
341-
// The resource ID is empty string, which indicates the instance may be in deleting state.
342-
if len(*vm.ID) == 0 {
343-
continue
344-
}
345-
346-
resourceID, err := convertResourceGroupNameToLower(*vm.ID)
347-
if err != nil {
348-
// This shouldn't happen. Log a waring message for tracking.
349-
klog.Warningf("GetScaleSetVms.convertResourceGroupNameToLower failed with error: %v", err)
350-
continue
351-
}
352-
353-
allVMs = append(allVMs, resourceID)
354-
}
355-
356-
return allVMs, nil
337+
return vmList, nil
357338
}
358339

359340
// DecreaseTargetSize decreases the target size of the node group. This function
@@ -406,6 +387,9 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
406387
return err
407388
}
408389

390+
scaleSet.instanceMutex.Lock()
391+
defer scaleSet.instanceMutex.Unlock()
392+
409393
instanceIDs := []string{}
410394
for _, instance := range instances {
411395
asg, err := scaleSet.manager.GetAsgForInstance(instance)
@@ -417,6 +401,11 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
417401
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg)
418402
}
419403

404+
if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting {
405+
klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name)
406+
continue
407+
}
408+
420409
instanceID, err := getLastSegment(instance.Name)
421410
if err != nil {
422411
klog.Errorf("getLastSegment failed with error: %v", err)
@@ -426,9 +415,16 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
426415
instanceIDs = append(instanceIDs, instanceID)
427416
}
428417

418+
// nothing to delete
419+
if len(instanceIDs) == 0 {
420+
klog.V(3).Infof("No new instances eligible for deletion, skipping")
421+
return nil
422+
}
423+
429424
requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
430425
InstanceIds: &instanceIDs,
431426
}
427+
432428
ctx, cancel := getContextWithCancel()
433429
defer cancel()
434430
resourceGroup := scaleSet.manager.config.ResourceGroup
@@ -682,16 +678,65 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
682678
return nil, rerr.Error()
683679
}
684680

685-
instances := make([]cloudprovider.Instance, len(vms))
686-
for i := range vms {
687-
name := "azure://" + vms[i]
688-
instances[i] = cloudprovider.Instance{Id: name}
689-
}
690-
691-
scaleSet.instanceCache = instances
681+
scaleSet.instanceCache = buildInstanceCache(vms)
692682
scaleSet.lastInstanceRefresh = time.Now()
693683
klog.V(4).Infof("Nodes: returns")
694-
return instances, nil
684+
return scaleSet.instanceCache, nil
685+
}
686+
687+
// Note that the GetScaleSetVms() results is not used directly because for the List endpoint,
688+
// their resource ID format is not consistent with Get endpoint
689+
func buildInstanceCache(vms []compute.VirtualMachineScaleSetVM) []cloudprovider.Instance {
690+
instances := []cloudprovider.Instance{}
691+
692+
for _, vm := range vms {
693+
// The resource ID is empty string, which indicates the instance may be in deleting state.
694+
if len(*vm.ID) == 0 {
695+
continue
696+
}
697+
698+
resourceID, err := convertResourceGroupNameToLower(*vm.ID)
699+
if err != nil {
700+
// This shouldn't happen. Log a waring message for tracking.
701+
klog.Warningf("buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err)
702+
continue
703+
}
704+
705+
instances = append(instances, cloudprovider.Instance{
706+
Id: "azure://" + resourceID,
707+
Status: instanceStatusFromVM(vm),
708+
})
709+
}
710+
711+
return instances
712+
}
713+
714+
func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) {
715+
for _, instance := range scaleSet.instanceCache {
716+
if instance.Id == providerID {
717+
return instance, true
718+
}
719+
}
720+
return cloudprovider.Instance{}, false
721+
}
722+
723+
// instanceStatusFromVM converts the VM provisioning state to cloudprovider.InstanceStatus
724+
func instanceStatusFromVM(vm compute.VirtualMachineScaleSetVM) *cloudprovider.InstanceStatus {
725+
if vm.ProvisioningState == nil {
726+
return nil
727+
}
728+
729+
status := &cloudprovider.InstanceStatus{}
730+
switch *vm.ProvisioningState {
731+
case string(compute.ProvisioningStateDeleting):
732+
status.State = cloudprovider.InstanceDeleting
733+
case string(compute.ProvisioningStateCreating):
734+
status.State = cloudprovider.InstanceCreating
735+
default:
736+
status.State = cloudprovider.InstanceRunning
737+
}
738+
739+
return status
695740
}
696741

697742
func (scaleSet *ScaleSet) invalidateInstanceCache() {

cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,72 @@ func TestDeleteNodes(t *testing.T) {
226226
scaleSetClient.AssertNumberOfCalls(t, "DeleteInstances", 1)
227227
}
228228

229+
func TestDeleteNoConflictRequest(t *testing.T) {
230+
vmssName := "test-asg"
231+
var vmssCapacity int64 = 3
232+
233+
manager := newTestAzureManager(t)
234+
vmsClient := &VirtualMachineScaleSetVMsClientMock{
235+
FakeStore: map[string]map[string]compute.VirtualMachineScaleSetVM{
236+
"test": {
237+
"0": {
238+
ID: to.StringPtr(fakeVirtualMachineScaleSetVMID),
239+
InstanceID: to.StringPtr("0"),
240+
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
241+
VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"),
242+
ProvisioningState: to.StringPtr("Deleting"),
243+
},
244+
},
245+
},
246+
},
247+
}
248+
249+
scaleSetClient := &VirtualMachineScaleSetsClientMock{
250+
FakeStore: map[string]map[string]compute.VirtualMachineScaleSet{
251+
"test": {
252+
"test-asg": {
253+
Name: &vmssName,
254+
Sku: &compute.Sku{
255+
Capacity: &vmssCapacity,
256+
},
257+
},
258+
},
259+
},
260+
}
261+
262+
response := autorest.Response{
263+
Response: &http.Response{
264+
Status: "OK",
265+
},
266+
}
267+
268+
scaleSetClient.On("DeleteInstances", mock.Anything, "test-asg", mock.Anything, mock.Anything).Return(response, nil)
269+
manager.azClient.virtualMachineScaleSetsClient = scaleSetClient
270+
manager.azClient.virtualMachineScaleSetVMsClient = vmsClient
271+
272+
resourceLimiter := cloudprovider.NewResourceLimiter(
273+
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000},
274+
map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000})
275+
provider, err := BuildAzureCloudProvider(manager, resourceLimiter)
276+
assert.NoError(t, err)
277+
278+
registered := manager.RegisterAsg(newTestScaleSet(manager, "test-asg"))
279+
assert.True(t, registered)
280+
281+
node := &apiv1.Node{
282+
Spec: apiv1.NodeSpec{
283+
ProviderID: "azure://" + fakeVirtualMachineScaleSetVMID,
284+
},
285+
}
286+
287+
scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet)
288+
assert.True(t, ok)
289+
290+
err = scaleSet.DeleteNodes([]*apiv1.Node{node})
291+
// ensure that DeleteInstances isn't called
292+
scaleSetClient.AssertNumberOfCalls(t, "DeleteInstances", 0)
293+
}
294+
229295
func TestId(t *testing.T) {
230296
provider := newTestProvider(t)
231297
registered := provider.azureManager.RegisterAsg(

0 commit comments

Comments
 (0)