Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions azure/scope/managedcontrolplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"strings"
"time"

"github.com/Azure/go-autorest/autorest"
"github.com/go-logr/logr"
Expand Down Expand Up @@ -559,6 +560,37 @@ func (s *ManagedControlPlaneScope) SetAgentPoolReplicas(replicas int32) {
s.InfraMachinePool.Status.Replicas = replicas
}

//GetAgentPoolAnnotations returns annotations of the infra machine pool.
func (s *ManagedControlPlaneScope) GetAgentPoolAnnotations() map[string]string {
return s.InfraMachinePool.Annotations
}

//SetAgentPoolAnnotations adds new annotation to the infra machine pool
func (s *ManagedControlPlaneScope) SetAgentPoolAnnotations(k,v string) {
if s.InfraMachinePool.Annotations == nil {
s.InfraMachinePool.Annotations = make(map[string]string)
}
s.InfraMachinePool.Annotations[k] = v
}

//DeleteAgentPoolAnnotation deletes the infra machine pool annotation having the input key, its a no-op if the key doesn't exist
func (s *ManagedControlPlaneScope) DeleteAgentPoolAnnotation(k string) {
delete(s.InfraMachinePool.Annotations, k)
}

//GetNodeDrainTimeout returns the node drain timeout of the machine pool.
func (s *ManagedControlPlaneScope) GetNodeDrainTimeout() time.Duration {
var t time.Duration
if s.MachinePool != nil && s.MachinePool.Spec.Template.Spec.NodeDrainTimeout != nil {
t = s.MachinePool.Spec.Template.Spec.NodeDrainTimeout.Duration
}
return t
}

//GetInfraClient returns the controller client in the scope.
func (s *ManagedControlPlaneScope) GetInfraClient() client.Client {
return s.Client
}
// SetAgentPoolReady sets the flag that indicates if the agent pool is ready or not.
func (s *ManagedControlPlaneScope) SetAgentPoolReady(ready bool) {
s.InfraMachinePool.Status.Ready = ready
Expand Down
17 changes: 17 additions & 0 deletions azure/services/agentpools/agentpools.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (

infrav1alpha4 "sigs.k8s.io/cluster-api-provider-azure/api/v1alpha4"
"sigs.k8s.io/cluster-api-provider-azure/azure"
expv1aplha4 "sigs.k8s.io/cluster-api-provider-azure/exp/api/v1alpha4"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ManagedMachinePoolScope defines the scope interface for a managed machine pool.
Expand All @@ -42,6 +44,11 @@ type ManagedMachinePoolScope interface {
SetAgentPoolProviderIDList([]string)
SetAgentPoolReplicas(int32)
SetAgentPoolReady(bool)
GetAgentPoolAnnotations() map[string]string
SetAgentPoolAnnotations(k, v string)
DeleteAgentPoolAnnotation(k string)
GetNodeDrainTimeout() time.Duration
GetInfraClient() client.Client
}

// Service provides operations on Azure resources.
Expand Down Expand Up @@ -118,10 +125,20 @@ func (s *Service) Reconcile(ctx context.Context) error {

// Diff and check if we require an update
diff := cmp.Diff(existingProfile, normalizedProfile)
klog.V(2).Info("updating agentpool annotations for nodepool")
if diff != "" {
klog.V(2).Infof("Update required (+new -old):\n%s", diff)
ps := *existingPool.ManagedClusterAgentPoolProfileProperties.ProvisioningState
if ps != string(infrav1alpha4.Canceled) && ps != string(infrav1alpha4.Failed) && ps != string(infrav1alpha4.Succeeded) {
ndt := s.scope.GetNodeDrainTimeout()
if ndt.Seconds() != 0 {
annotations := s.scope.GetAgentPoolAnnotations()
_, ok := annotations[expv1aplha4.NodeDrainTimeoutAnnotation]
if !ok {
s.scope.Info("NodeDrainTimeoutAnnotation missing")
s.scope.SetAgentPoolAnnotations(expv1aplha4.NodeDrainTimeoutAnnotation, time.Now().UTC().String())
}
}
msg := fmt.Sprintf("Unable to update existing agent pool in non terminal state. Agent pool must be in one of the following provisioning states: canceled, failed, or succeeded. Actual state: %s", ps)
klog.V(2).Infof(msg)
return errors.New(msg)
Expand Down
17 changes: 15 additions & 2 deletions azure/services/scalesets/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newVirtualMachineScaleSetsClient(subscriptionID string, baseURI string, aut
}

// ListInstances retrieves information about the model views of a virtual machine scale set.
func (ac *AzureClient) ListInstances(ctx context.Context, resourceGroupName, vmssName string) ([]compute.VirtualMachineScaleSetVM, error) {
func (ac AzureClient) ListInstances(ctx context.Context, resourceGroupName, vmssName string) ([]compute.VirtualMachineScaleSetVM, error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "scalesets.AzureClient.ListInstances")
defer done()

Expand All @@ -117,7 +117,7 @@ func (ac *AzureClient) ListInstances(ctx context.Context, resourceGroupName, vms
}

// List returns all scale sets in a resource group.
func (ac *AzureClient) List(ctx context.Context, resourceGroupName string) ([]compute.VirtualMachineScaleSet, error) {
func (ac AzureClient) List(ctx context.Context, resourceGroupName string) ([]compute.VirtualMachineScaleSet, error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "scalesets.AzureClient.List")
defer done()

Expand Down Expand Up @@ -326,3 +326,16 @@ func (da *deleteResultAdapter) Result(client compute.VirtualMachineScaleSetsClie
func (g *genericScaleSetFutureImpl) Result(client compute.VirtualMachineScaleSetsClient) (compute.VirtualMachineScaleSet, error) {
return g.result(client)
}

func (ac AzureClient) DeleteInstance(ctx context.Context, nodeResourceGroupName, scalesetName, scalesetVMName, instanceId string) (error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "scalesets.AzureClient.DeleteInstance")
defer done()

future, err := ac.scalesetvms.Delete(ctx, nodeResourceGroupName, scalesetName, instanceId, nil)
if err != nil {
return errors.Wrapf(err, "failed deleting vmssvm named %q", scalesetVMName)
}
//wait for future to finish
err = future.WaitForCompletionRef(ctx, ac.scalesetvms.Client)
return err
}
3 changes: 3 additions & 0 deletions exp/api/v1alpha4/azuremanagedmachinepool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const (
// LabelAgentPoolMode represents mode of an agent pool. Possible values include: System, User.
LabelAgentPoolMode = "azuremanagedmachinepool.infrastructure.cluster.x-k8s.io/agentpoolmode"

//NodeDrainTimeoutAnnotation represents node drain start.
NodeDrainTimeoutAnnotation = "azuremanagedmachinepool.infrastructure.cluster.x-k8s.io/nodedrainstart"

// NodePoolModeSystem represents mode system for azuremachinepool.
NodePoolModeSystem NodePoolMode = "System"

Expand Down
174 changes: 155 additions & 19 deletions exp/controllers/azuremanagedmachinepool_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ import (
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2021-04-01/compute"
"github.com/pkg/errors"

apicore "k8s.io/api/core/v1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/azure/scope"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/agentpools"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/scalesets"
infraexpv1 "sigs.k8s.io/cluster-api-provider-azure/exp/api/v1alpha4"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var (
layout = "2006-01-02 15:04:05 -0700 MST"
)

type (
Expand All @@ -50,6 +57,7 @@ type (
NodeLister interface {
ListInstances(context.Context, string, string) ([]compute.VirtualMachineScaleSetVM, error)
List(context.Context, string) ([]compute.VirtualMachineScaleSet, error)
DeleteInstance(context.Context, string, string, string, string) error
}
)

Expand Down Expand Up @@ -89,36 +97,56 @@ func (s *azureManagedMachinePoolService) Reconcile(ctx context.Context) error {

s.scope.Info("reconciling machine pool")
agentPoolName := s.scope.AgentPoolSpec().Name
nodeResourceGroup := s.scope.NodeResourceGroup() // get the node resource group of the type SPC_MC_<resource group name>_<clustername>_<region>

// set annotation with current time if agent pool needs update ; setting it in reconcile
// agentpool svc takes over
if err := s.agentPoolsSvc.Reconcile(ctx); err != nil {
return errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName)
s.scope.Error(err, "error while reconciling agentpoool, checking if node drain timeout set and reached")
// if error check if node timeout has passed
// on timeout go through each node and if the config does not match delete that node using vmssvm
// on delete vm remove the annotation
ndt := s.scope.GetNodeDrainTimeout()
//if node drain timeout is not set then we do not have to do anything and return the err
if ndt.Seconds() == 0 {
s.scope.Info("machine pool has no node drain timeout available", "machinepool", agentPoolName)
return err
}
annotations := s.scope.GetAgentPoolAnnotations()
ts, ok := annotations[infraexpv1.NodeDrainTimeoutAnnotation]
// add annotation when agentpool reconciler returns error and the annotation is missing
if !ok {
s.scope.Info("NodeDrainTimeoutAnnotation missing")
s.scope.SetAgentPoolAnnotations(infraexpv1.NodeDrainTimeoutAnnotation, time.Now().UTC().String())
//return error as annotation was added recently
return errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName)
}
t, terr := time.Parse(layout, ts)
if terr != nil {
s.scope.Error(terr, "unable to parse time from nodedraintimeout annotation", "timestring", ts)
return errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName)
}
now := time.Now()
diff := now.Sub(t)
// reconcile individual nodes on timeout exceeded
if diff.Seconds() > ndt.Seconds() {
s.scope.Info("reconciling agentpool failed, node timeout exceeded")
return s.reconcileNodes(ctx, nodeResourceGroup, agentPoolName)
}
return azure.WithTransientError(errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName),20*time.Second)
}

nodeResourceGroup := s.scope.NodeResourceGroup()
vmss, err := s.scaleSetsSvc.List(ctx, nodeResourceGroup)
// get vm scale set from the node resource group, match will have the right agent pool
match, err := s.getVMScaleSet(ctx, nodeResourceGroup, agentPoolName)
if err != nil {
return errors.Wrapf(err, "failed to list vmss in resource group %s", nodeResourceGroup)
}

var match *compute.VirtualMachineScaleSet
for _, ss := range vmss {
ss := ss
if ss.Tags["poolName"] != nil && *ss.Tags["poolName"] == agentPoolName {
match = &ss
break
}

if ss.Tags["aks-managed-poolName"] != nil && *ss.Tags["aks-managed-poolName"] == agentPoolName {
match = &ss
break
}
return err
}

if match == nil {
return azure.WithTransientError(NewAgentPoolVMSSNotFoundError(nodeResourceGroup, agentPoolName), 20*time.Second)
}

instances, err := s.scaleSetsSvc.ListInstances(ctx, nodeResourceGroup, *match.Name)
instances, err := s.scaleSetsSvc.ListInstances(ctx, nodeResourceGroup, *match.Name) //get all the vm instances in the vmss
if err != nil {
return errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName)
}
Expand All @@ -131,6 +159,7 @@ func (s *azureManagedMachinePoolService) Reconcile(ctx context.Context) error {
s.scope.SetAgentPoolProviderIDList(providerIDs)
s.scope.SetAgentPoolReplicas(int32(len(providerIDs)))
s.scope.SetAgentPoolReady(true)
s.scope.DeleteAgentPoolAnnotation(infraexpv1.NodeDrainTimeoutAnnotation)

s.scope.Info("reconciled machine pool successfully")
return nil
Expand All @@ -147,3 +176,110 @@ func (s *azureManagedMachinePoolService) Delete(ctx context.Context) error {

return nil
}

func (s *azureManagedMachinePoolService) getVMScaleSet(ctx context.Context, nodeResourceGroup string, agentPoolName string) (*compute.VirtualMachineScaleSet, error) {
vmss, err := s.scaleSetsSvc.List(ctx, nodeResourceGroup)
if err != nil {
return nil, errors.Wrapf(err, "failed to list vmss in resource group %s", nodeResourceGroup)
}

var match *compute.VirtualMachineScaleSet
for _, ss := range vmss {
ss := ss
if ss.Tags["poolName"] != nil && *ss.Tags["poolName"] == agentPoolName {
match = &ss
break
}

if ss.Tags["aks-managed-poolName"] != nil && *ss.Tags["aks-managed-poolName"] == agentPoolName {
match = &ss
break
}
}
return match, nil
}

// reconcileNodes gets scale set version, instances in the scale set and k8s node version in that scaleset;
// and deletes the nodes that do not match the version.
func (s *azureManagedMachinePoolService) reconcileNodes(ctx context.Context, nodeResourceGroup, agentPoolName string) error {
//get the vm scaleset details using agentpool name
scaleset, err := s.getVMScaleSet(ctx,nodeResourceGroup, agentPoolName)
if err != nil {
return err
}

if scaleset == nil {
return azure.WithTransientError(NewAgentPoolVMSSNotFoundError(nodeResourceGroup, agentPoolName), 20*time.Second)
}

//version stores the k8s version for the scaleset
var version string = ""
versionTag := scaleset.Tags["aks-managed-orchestrator"]
if versionTag != nil {
strs := strings.Split(*versionTag, ":")
if len(strs) > 1 {
version = strs[1]
}
}

if len(version) == 0 {
return azure.WithTransientError(errors.New("version tag aks-managed-orchestrator not available on scaleset"), 20 * time.Second)
}
s.scope.Info("version tag of scaleset", "scaleset k8s version", version)

//get all the vm's in the vm scaleset
vmssvms, err := s.scaleSetsSvc.ListInstances(ctx, nodeResourceGroup, *scaleset.Name)
if err != nil || len(vmssvms) == 0 {
s.scope.Error(err, "unable to get instances in scaleset", "scaleset", scaleset.Name)
return azure.WithTransientError(err, 20*time.Second)
}

//scalesetVMMap is a map of vm computer name and scaleset vm
var scalesetVMMap = make(map[string]compute.VirtualMachineScaleSetVM)
for _, vmssvm := range vmssvms {
if vmssvm.OsProfile.ComputerName != nil {
scalesetVMMap[*vmssvm.OsProfile.ComputerName] = vmssvm
}
}

// get the k8s client from the managed machine pool scope
k8sClient := s.scope.GetInfraClient()
//nodelist will have list of k8s nodes for the given agentpool
var nodelist apicore.NodeList
listOpts := ctrlclient.MatchingLabels(map[string]string{"agentpool": agentPoolName})
//fetch all the nodes matching the label
if err = k8sClient.List(ctx, &nodelist, listOpts); err != nil {
return errors.Wrapf(err, "unable to get agent pool node")
}

var nodesToDelete []string
for _, node := range nodelist.Items {
if node.Status.NodeInfo.KubeletVersion[1:] != version {
nodesToDelete = append(nodesToDelete, node.Name)
}
}

if len(nodesToDelete) == 0{
// all the nodes have been updated and we need not to clean the pool
s.scope.Info("no nodes to delete")
s.scope.DeleteAgentPoolAnnotation(infraexpv1.NodeDrainTimeoutAnnotation)
return nil
}

s.scope.Info("deleting nodes", "nodesToDelete", nodesToDelete)

for _ , name := range nodesToDelete {
s.scope.Info("deleting scaleset vm", "nodeName", name)
err = s.scaleSetsSvc.DeleteInstance(ctx, nodeResourceGroup, *scaleset.Name, name, *scalesetVMMap[name].InstanceID)
if err != nil {
s.scope.Error(err,fmt.Sprintf("failed to delete the scaleset vm %s",name))
}
}
//if after delete any error exist we should reconcile.
if err != nil {
err := errors.New("unable to delete all the nodes")
return azure.WithTransientError(err, 20*time.Second)
}
s.scope.DeleteAgentPoolAnnotation(infraexpv1.NodeDrainTimeoutAnnotation)
return nil
}