Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 121 additions & 66 deletions cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ limitations under the License.
package azure

import (
"errors"
"fmt"
"math/rand"
"net/http"
"sort"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute"
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
azStorage "github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v7"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources/v2"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"k8s.io/utils/ptr"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -82,10 +86,10 @@ func (as *AgentPool) initialize() error {
ctx, cancel := getContextWithCancel()
defer cancel()

template, err := as.manager.azClient.deploymentClient.ExportTemplate(ctx, as.manager.config.ResourceGroup, as.manager.config.Deployment)
template, err := as.manager.azClient.deploymentClient.ExportTemplate(ctx, as.manager.config.ResourceGroup, as.manager.config.Deployment, nil)
if err != nil {
klog.Errorf("deploymentClient.ExportTemplate(%s, %s) failed: %v", as.manager.config.ResourceGroup, as.manager.config.Deployment, err)
return err.Error()
return err
}

as.template = template.Template.(map[string]interface{})
Expand Down Expand Up @@ -136,10 +140,10 @@ func (as *AgentPool) Id() string {
return as.Name
}

func (as *AgentPool) getVMsFromCache() ([]compute.VirtualMachine, error) {
func (as *AgentPool) getVMsFromCache() ([]armcompute.VirtualMachine, error) {
allVMs := as.manager.azureCache.getVirtualMachines()
if _, exists := allVMs[as.Name]; !exists {
return []compute.VirtualMachine{}, fmt.Errorf("could not find VMs with poolName: %s", as.Name)
return []armcompute.VirtualMachine{}, fmt.Errorf("could not find VMs with poolName: %s", as.Name)
}
return allVMs[as.Name], nil
}
Expand All @@ -157,7 +161,7 @@ func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) {
indexes := make([]int, 0)
indexToVM := make(map[int]string)
for _, instance := range instances {
index, err := GetVMNameIndex(instance.StorageProfile.OsDisk.OsType, *instance.Name)
index, err := GetVMNameIndex(ptr.Deref(instance.Properties.StorageProfile.OSDisk.OSType, armcompute.OperatingSystemTypesLinux), *instance.Name)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -211,27 +215,28 @@ func (as *AgentPool) TargetSize() (int, error) {
return int(size), nil
}

func (as *AgentPool) getAllSucceededAndFailedDeployments() ([]resources.DeploymentExtended, error) {
func (as *AgentPool) getAllSucceededAndFailedDeployments() ([]armresources.DeploymentExtended, error) {
ctx, cancel := getContextWithCancel()
defer cancel()

allDeployments, rerr := as.manager.azClient.deploymentClient.List(ctx, as.manager.config.ResourceGroup)
if rerr != nil {
klog.Errorf("getAllSucceededAndFailedDeployments: failed to list deployments with error: %v", rerr.Error())
return nil, rerr.Error()
}

result := make([]resources.DeploymentExtended, 0)
for _, deployment := range allDeployments {
if deployment.Properties == nil || deployment.Properties.ProvisioningState == nil {
continue
var deployments []armresources.DeploymentExtended
pager := as.manager.azClient.deploymentClient.NewListByResourceGroupPager(as.manager.config.ResourceGroup, nil)
for pager.More() {
page, rerr := pager.NextPage(ctx)
if rerr != nil {
klog.Errorf("getAllSucceededAndFailedDeployments: failed to list deployments with error: %v", rerr.Error())
return nil, rerr
}
if *deployment.Properties.ProvisioningState == "Succeeded" || *deployment.Properties.ProvisioningState == "Failed" {
result = append(result, deployment)
for _, deployment := range page.Value {
if deployment.Properties == nil || deployment.Properties.ProvisioningState == nil {
continue
}
if *deployment.Properties.ProvisioningState == "Succeeded" || *deployment.Properties.ProvisioningState == "Failed" {
deployments = append(deployments, *deployment)
}
}
}

return result, rerr.Error()
return deployments, nil
}

// deleteOutdatedDeployments keeps the newest deployments in the resource group and delete others,
Expand All @@ -256,7 +261,7 @@ func (as *AgentPool) deleteOutdatedDeployments() (err error) {
}

sort.Slice(deployments, func(i, j int) bool {
return deployments[i].Properties.Timestamp.Time.After(deployments[j].Properties.Timestamp.Time)
return deployments[i].Properties.Timestamp.After(*deployments[j].Properties.Timestamp)
})

toBeDeleted := deployments[as.manager.config.MaxDeploymentsCount:]
Expand All @@ -266,10 +271,18 @@ func (as *AgentPool) deleteOutdatedDeployments() (err error) {

errList := make([]error, 0)
for _, deployment := range toBeDeleted {
klog.V(4).Infof("deleteOutdatedDeployments: starts deleting outdated deployment (%s)", *deployment.Name)
rerr := as.manager.azClient.deploymentClient.Delete(ctx, as.manager.config.ResourceGroup, *deployment.Name)
klog.V(4).Infof("deleteOutdatedDeployments: start deleting outdated deployment (%s)", *deployment.Name)
poller, rerr := as.manager.azClient.deploymentClient.BeginDelete(ctx, as.manager.config.ResourceGroup, *deployment.Name, nil)
if rerr != nil {
errList = append(errList, rerr.Error())
klog.Errorf("deleteOutdatedDeployments: failed to begin deleting deployment (%s) with error: %v", *deployment.Name, rerr.Error())
errList = append(errList, rerr)
continue
}
_, rerr = poller.PollUntilDone(ctx, &runtime.PollUntilDoneOptions{Frequency: 30 * time.Second})
if rerr != nil {
klog.Errorf("deleteOutdatedDeployments: failed to delete deployment (%s) with error: %v", *deployment.Name, rerr.Error())
errList = append(errList, rerr)
continue
}
}

Expand Down Expand Up @@ -313,26 +326,39 @@ func (as *AgentPool) IncreaseSize(delta int) error {
if highestUsedIndex != 0 {
countForTemplate += highestUsedIndex + 1 - curSize
}
as.parameters[as.Name+"Count"] = map[string]int{"value": countForTemplate}
as.parameters[as.Name+"Offset"] = map[string]int{"value": highestUsedIndex + 1}
as.parameters[as.Name+"Count"] = map[string]interface{}{"value": countForTemplate}
as.parameters[as.Name+"Offset"] = map[string]interface{}{"value": highestUsedIndex + 1}

// Convert parameters to DeploymentParameter format
deploymentParams := make(map[string]*armresources.DeploymentParameter, len(as.parameters))
for key, value := range as.parameters {
deploymentParams[key] = &armresources.DeploymentParameter{
Value: value,
}
}

newDeploymentName := fmt.Sprintf("cluster-autoscaler-%d", rand.New(rand.NewSource(time.Now().UnixNano())).Int31())
newDeployment := resources.Deployment{
Properties: &resources.DeploymentProperties{
Template: &as.template,
Parameters: &as.parameters,
Mode: resources.Incremental,
newDeployment := armresources.Deployment{
Properties: &armresources.DeploymentProperties{
Template: as.template,
Parameters: deploymentParams,
Mode: ptr.To(armresources.DeploymentModeIncremental),
},
}
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(3).Infof("Waiting for deploymentClient.CreateOrUpdate(%s, %s, %v)", as.manager.config.ResourceGroup, newDeploymentName, newDeployment)
rerr := as.manager.azClient.deploymentClient.CreateOrUpdate(ctx, as.manager.config.ResourceGroup, newDeploymentName, newDeployment, "")
poller, rerr := as.manager.azClient.deploymentClient.BeginCreateOrUpdate(ctx, as.manager.config.ResourceGroup, newDeploymentName, newDeployment, nil)
if rerr != nil {
klog.Errorf("deploymentClient.BeginCreateOrUpdate for deployment %q failed: %v", newDeploymentName, rerr.Error())
return rerr
}
resp, rerr := poller.PollUntilDone(ctx, &runtime.PollUntilDoneOptions{Frequency: 30 * time.Second})
if rerr != nil {
klog.Errorf("deploymentClient.CreateOrUpdate for deployment %q failed: %v", newDeploymentName, rerr.Error())
return rerr.Error()
return rerr
}
klog.V(3).Infof("deploymentClient.CreateOrUpdate(%s, %s, %v) success", as.manager.config.ResourceGroup, newDeploymentName, newDeployment)
klog.V(3).Infof("deploymentClient.CreateOrUpdate(%s, %s, %v) success", as.manager.config.ResourceGroup, *resp.DeploymentExtended.Name, resp.DeploymentExtended)

// Update cache after scale success.
as.curSize = int64(expectedSize)
Expand Down Expand Up @@ -515,50 +541,64 @@ func (as *AgentPool) deleteBlob(accountName, vhdContainer, vhdBlob string) error
ctx, cancel := getContextWithCancel()
defer cancel()

storageKeysResult, rerr := as.manager.azClient.storageAccountsClient.ListKeys(ctx, as.manager.config.SubscriptionID, as.manager.config.ResourceGroup, accountName)
storageKeysResult, rerr := as.manager.azClient.storageAccountsClient.ListKeys(ctx, as.manager.config.ResourceGroup, accountName, nil)
if rerr != nil {
return rerr.Error()
return rerr
}

keys := storageKeysResult.Keys
if len(keys) == 0 {
return fmt.Errorf("no storage keys found for account %s", accountName)
}

keys := *storageKeysResult.Keys
client, err := azStorage.NewBasicClientOnSovereignCloud(accountName, ptr.Deref(keys[0].Value, ""), as.manager.env)
// Construct service URL using the storage account endpoint
serviceURL := fmt.Sprintf("https://%s.blob.%s", accountName, as.manager.env.StorageEndpointSuffix)

// Create a SharedKeyCredential
credential, err := azblob.NewSharedKeyCredential(accountName, ptr.Deref(keys[0].Value, ""))
if err != nil {
return err
return fmt.Errorf("failed to create shared key credential: %w", err)
}

bs := client.GetBlobService()
containerRef := bs.GetContainerReference(vhdContainer)
blobRef := containerRef.GetBlobReference(vhdBlob)
// Create a service client
serviceClient, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, nil)
if err != nil {
return fmt.Errorf("failed to create service client: %w", err)
}

return blobRef.Delete(&azStorage.DeleteBlobOptions{})
// Delete the blob
_, err = serviceClient.DeleteBlob(ctx, vhdContainer, vhdBlob, nil)
return err
}

// deleteVirtualMachine deletes a VM and any associated OS disk
func (as *AgentPool) deleteVirtualMachine(name string) error {
ctx, cancel := getContextWithCancel()
defer cancel()

vm, rerr := as.manager.azClient.virtualMachinesClient.Get(ctx, as.manager.config.ResourceGroup, name, "")
vm, rerr := as.manager.azClient.virtualMachinesClient.Get(ctx, as.manager.config.ResourceGroup, name, nil)
if rerr != nil {
if exists, _ := checkResourceExistsFromRetryError(rerr); !exists {
// Check if it's a 404 error indicating resource doesn't exist
var respErr *azcore.ResponseError
if errors.As(rerr, &respErr) && respErr.StatusCode == http.StatusNotFound {
klog.V(2).Infof("VirtualMachine %s/%s has already been removed", as.manager.config.ResourceGroup, name)
return nil
}

klog.Errorf("failed to get VM: %s/%s: %s", as.manager.config.ResourceGroup, name, rerr.Error())
return rerr.Error()
return rerr
}

vhd := vm.VirtualMachineProperties.StorageProfile.OsDisk.Vhd
managedDisk := vm.VirtualMachineProperties.StorageProfile.OsDisk.ManagedDisk
vhd := vm.Properties.StorageProfile.OSDisk.Vhd
managedDisk := vm.Properties.StorageProfile.OSDisk.ManagedDisk
if vhd == nil && managedDisk == nil {
klog.Errorf("failed to get a valid os disk URI for VM: %s/%s", as.manager.config.ResourceGroup, name)
return fmt.Errorf("os disk does not have a VHD URI")
}

osDiskName := vm.VirtualMachineProperties.StorageProfile.OsDisk.Name
osDiskName := vm.Properties.StorageProfile.OSDisk.Name
var nicName string
nicID := (*vm.VirtualMachineProperties.NetworkProfile.NetworkInterfaces)[0].ID
nicID := (vm.Properties.NetworkProfile.NetworkInterfaces)[0].ID
if nicID == nil {
klog.Warningf("NIC ID is not set for VM (%s/%s)", as.manager.config.ResourceGroup, name)
} else {
Expand All @@ -574,22 +614,32 @@ func (as *AgentPool) deleteVirtualMachine(name string) error {
defer deleteCancel()

klog.Infof("waiting for VirtualMachine deletion: %s/%s", as.manager.config.ResourceGroup, name)
rerr = as.manager.azClient.virtualMachinesClient.Delete(deleteCtx, as.manager.config.ResourceGroup, name)
_, realErr := checkResourceExistsFromRetryError(rerr)
if realErr != nil {
return realErr
poller, rerr := as.manager.azClient.virtualMachinesClient.BeginDelete(deleteCtx, as.manager.config.ResourceGroup, name, nil)
if rerr != nil {
klog.Errorf("failed to begin deleting VM: %s/%s: %s", as.manager.config.ResourceGroup, name, rerr.Error())
return rerr
}
_, rerr = poller.PollUntilDone(deleteCtx, &runtime.PollUntilDoneOptions{Frequency: 30 * time.Second})
if rerr != nil {
klog.Errorf("failed to delete VM: %s/%s: %s", as.manager.config.ResourceGroup, name, rerr.Error())
return rerr
}
klog.V(2).Infof("VirtualMachine %s/%s removed", as.manager.config.ResourceGroup, name)

if len(nicName) > 0 {
klog.Infof("deleting nic: %s/%s", as.manager.config.ResourceGroup, nicName)
interfaceCtx, interfaceCancel := getContextWithCancel()
defer interfaceCancel()
rerr := as.manager.azClient.interfacesClient.Delete(interfaceCtx, as.manager.config.ResourceGroup, nicName)
klog.Infof("waiting for nic deletion: %s/%s", as.manager.config.ResourceGroup, nicName)
_, realErr := checkResourceExistsFromRetryError(rerr)
if realErr != nil {
return realErr

poller, rerr := as.manager.azClient.interfacesClient.BeginDelete(interfaceCtx, as.manager.config.ResourceGroup, nicName, nil)
if rerr != nil {
klog.Errorf("failed to begin deleting nic: %s/%s: %s", as.manager.config.ResourceGroup, nicName, rerr.Error())
return rerr
}
_, rerr = poller.PollUntilDone(interfaceCtx, &runtime.PollUntilDoneOptions{Frequency: 30 * time.Second})
if rerr != nil {
klog.Errorf("failed to delete nic: %s/%s: %s", as.manager.config.ResourceGroup, nicName, rerr.Error())
return rerr
}
klog.V(2).Infof("interface %s/%s removed", as.manager.config.ResourceGroup, nicName)
}
Expand Down Expand Up @@ -617,10 +667,15 @@ func (as *AgentPool) deleteVirtualMachine(name string) error {
klog.Infof("deleting managed disk: %s/%s", as.manager.config.ResourceGroup, *osDiskName)
disksCtx, disksCancel := getContextWithCancel()
defer disksCancel()
rerr := as.manager.azClient.disksClient.Delete(disksCtx, as.manager.config.SubscriptionID, as.manager.config.ResourceGroup, *osDiskName)
_, realErr := checkResourceExistsFromRetryError(rerr)
if realErr != nil {
return realErr
poller, rerr := as.manager.azClient.disksClient.BeginDelete(disksCtx, as.manager.config.ResourceGroup, *osDiskName, nil)
if rerr != nil {
klog.Errorf("failed to begin deleting managed disk: %s/%s: %s", as.manager.config.ResourceGroup, *osDiskName, rerr.Error())
return rerr
}
_, rerr = poller.PollUntilDone(disksCtx, &runtime.PollUntilDoneOptions{Frequency: 30 * time.Second})
if rerr != nil {
klog.Errorf("failed to delete managed disk: %s/%s: %s", as.manager.config.ResourceGroup, *osDiskName, rerr.Error())
return rerr
}
klog.V(2).Infof("disk %s/%s removed", as.manager.config.ResourceGroup, *osDiskName)
}
Expand Down
Loading