diff --git a/azure/scope/managedcontrolplane.go b/azure/scope/managedcontrolplane.go index 564f63b5f1d..2d8d7653b0d 100644 --- a/azure/scope/managedcontrolplane.go +++ b/azure/scope/managedcontrolplane.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "strings" + "time" "github.com/Azure/go-autorest/autorest" "github.com/go-logr/logr" @@ -559,6 +560,37 @@ func (s *ManagedControlPlaneScope) SetAgentPoolReplicas(replicas int32) { s.InfraMachinePool.Status.Replicas = replicas } +//GetAgentPoolAnnotations returns annotations of the infra machine pool. +func (s *ManagedControlPlaneScope) GetAgentPoolAnnotations() map[string]string { + return s.InfraMachinePool.Annotations +} + +//SetAgentPoolAnnotations adds new annotation to the infra machine pool +func (s *ManagedControlPlaneScope) SetAgentPoolAnnotations(k,v string) { + if s.InfraMachinePool.Annotations == nil { + s.InfraMachinePool.Annotations = make(map[string]string) + } + s.InfraMachinePool.Annotations[k] = v +} + +//DeleteAgentPoolAnnotation deletes the infra machine pool annotation having the input key, its a no-op if the key doesn't exist +func (s *ManagedControlPlaneScope) DeleteAgentPoolAnnotation(k string) { + delete(s.InfraMachinePool.Annotations, k) +} + +//GetNodeDrainTimeout returns the node drain timeout of the machine pool. +func (s *ManagedControlPlaneScope) GetNodeDrainTimeout() time.Duration { + var t time.Duration + if s.MachinePool != nil && s.MachinePool.Spec.Template.Spec.NodeDrainTimeout != nil { + t = s.MachinePool.Spec.Template.Spec.NodeDrainTimeout.Duration + } + return t +} + +//GetInfraClient returns the controller client in the scope. +func (s *ManagedControlPlaneScope) GetInfraClient() client.Client { + return s.Client +} // SetAgentPoolReady sets the flag that indicates if the agent pool is ready or not. func (s *ManagedControlPlaneScope) SetAgentPoolReady(ready bool) { s.InfraMachinePool.Status.Ready = ready diff --git a/azure/services/agentpools/agentpools.go b/azure/services/agentpools/agentpools.go index 626eef75f90..e32ebc08e77 100644 --- a/azure/services/agentpools/agentpools.go +++ b/azure/services/agentpools/agentpools.go @@ -29,7 +29,9 @@ import ( infrav1alpha4 "sigs.k8s.io/cluster-api-provider-azure/api/v1alpha4" "sigs.k8s.io/cluster-api-provider-azure/azure" + expv1aplha4 "sigs.k8s.io/cluster-api-provider-azure/exp/api/v1alpha4" "sigs.k8s.io/cluster-api-provider-azure/util/tele" + "sigs.k8s.io/controller-runtime/pkg/client" ) // ManagedMachinePoolScope defines the scope interface for a managed machine pool. @@ -42,6 +44,11 @@ type ManagedMachinePoolScope interface { SetAgentPoolProviderIDList([]string) SetAgentPoolReplicas(int32) SetAgentPoolReady(bool) + GetAgentPoolAnnotations() map[string]string + SetAgentPoolAnnotations(k, v string) + DeleteAgentPoolAnnotation(k string) + GetNodeDrainTimeout() time.Duration + GetInfraClient() client.Client } // Service provides operations on Azure resources. @@ -118,10 +125,20 @@ func (s *Service) Reconcile(ctx context.Context) error { // Diff and check if we require an update diff := cmp.Diff(existingProfile, normalizedProfile) + klog.V(2).Info("updating agentpool annotations for nodepool") if diff != "" { klog.V(2).Infof("Update required (+new -old):\n%s", diff) ps := *existingPool.ManagedClusterAgentPoolProfileProperties.ProvisioningState if ps != string(infrav1alpha4.Canceled) && ps != string(infrav1alpha4.Failed) && ps != string(infrav1alpha4.Succeeded) { + ndt := s.scope.GetNodeDrainTimeout() + if ndt.Seconds() != 0 { + annotations := s.scope.GetAgentPoolAnnotations() + _, ok := annotations[expv1aplha4.NodeDrainTimeoutAnnotation] + if !ok { + s.scope.Info("NodeDrainTimeoutAnnotation missing") + s.scope.SetAgentPoolAnnotations(expv1aplha4.NodeDrainTimeoutAnnotation, time.Now().UTC().String()) + } + } msg := fmt.Sprintf("Unable to update existing agent pool in non terminal state. Agent pool must be in one of the following provisioning states: canceled, failed, or succeeded. Actual state: %s", ps) klog.V(2).Infof(msg) return errors.New(msg) diff --git a/azure/services/scalesets/client.go b/azure/services/scalesets/client.go index f59a0cc955b..9ccf8abf9ee 100644 --- a/azure/services/scalesets/client.go +++ b/azure/services/scalesets/client.go @@ -96,7 +96,7 @@ func newVirtualMachineScaleSetsClient(subscriptionID string, baseURI string, aut } // ListInstances retrieves information about the model views of a virtual machine scale set. -func (ac *AzureClient) ListInstances(ctx context.Context, resourceGroupName, vmssName string) ([]compute.VirtualMachineScaleSetVM, error) { +func (ac AzureClient) ListInstances(ctx context.Context, resourceGroupName, vmssName string) ([]compute.VirtualMachineScaleSetVM, error) { ctx, _, done := tele.StartSpanWithLogger(ctx, "scalesets.AzureClient.ListInstances") defer done() @@ -117,7 +117,7 @@ func (ac *AzureClient) ListInstances(ctx context.Context, resourceGroupName, vms } // List returns all scale sets in a resource group. -func (ac *AzureClient) List(ctx context.Context, resourceGroupName string) ([]compute.VirtualMachineScaleSet, error) { +func (ac AzureClient) List(ctx context.Context, resourceGroupName string) ([]compute.VirtualMachineScaleSet, error) { ctx, _, done := tele.StartSpanWithLogger(ctx, "scalesets.AzureClient.List") defer done() @@ -326,3 +326,16 @@ func (da *deleteResultAdapter) Result(client compute.VirtualMachineScaleSetsClie func (g *genericScaleSetFutureImpl) Result(client compute.VirtualMachineScaleSetsClient) (compute.VirtualMachineScaleSet, error) { return g.result(client) } + +func (ac AzureClient) DeleteInstance(ctx context.Context, nodeResourceGroupName, scalesetName, scalesetVMName, instanceId string) (error) { + ctx, _, done := tele.StartSpanWithLogger(ctx, "scalesets.AzureClient.DeleteInstance") + defer done() + + future, err := ac.scalesetvms.Delete(ctx, nodeResourceGroupName, scalesetName, instanceId, nil) + if err != nil { + return errors.Wrapf(err, "failed deleting vmssvm named %q", scalesetVMName) + } + //wait for future to finish + err = future.WaitForCompletionRef(ctx, ac.scalesetvms.Client) + return err +} \ No newline at end of file diff --git a/exp/api/v1alpha4/azuremanagedmachinepool_types.go b/exp/api/v1alpha4/azuremanagedmachinepool_types.go index 68f704307f7..0c314b6bb61 100644 --- a/exp/api/v1alpha4/azuremanagedmachinepool_types.go +++ b/exp/api/v1alpha4/azuremanagedmachinepool_types.go @@ -25,6 +25,9 @@ const ( // LabelAgentPoolMode represents mode of an agent pool. Possible values include: System, User. LabelAgentPoolMode = "azuremanagedmachinepool.infrastructure.cluster.x-k8s.io/agentpoolmode" + //NodeDrainTimeoutAnnotation represents node drain start. + NodeDrainTimeoutAnnotation = "azuremanagedmachinepool.infrastructure.cluster.x-k8s.io/nodedrainstart" + // NodePoolModeSystem represents mode system for azuremachinepool. NodePoolModeSystem NodePoolMode = "System" diff --git a/exp/controllers/azuremanagedmachinepool_reconciler.go b/exp/controllers/azuremanagedmachinepool_reconciler.go index 9c721f451e6..3720e800ade 100644 --- a/exp/controllers/azuremanagedmachinepool_reconciler.go +++ b/exp/controllers/azuremanagedmachinepool_reconciler.go @@ -25,11 +25,18 @@ import ( "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2021-04-01/compute" "github.com/pkg/errors" + apicore "k8s.io/api/core/v1" "sigs.k8s.io/cluster-api-provider-azure/azure" "sigs.k8s.io/cluster-api-provider-azure/azure/scope" "sigs.k8s.io/cluster-api-provider-azure/azure/services/agentpools" "sigs.k8s.io/cluster-api-provider-azure/azure/services/scalesets" + infraexpv1 "sigs.k8s.io/cluster-api-provider-azure/exp/api/v1alpha4" "sigs.k8s.io/cluster-api-provider-azure/util/tele" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + layout = "2006-01-02 15:04:05 -0700 MST" ) type ( @@ -50,6 +57,7 @@ type ( NodeLister interface { ListInstances(context.Context, string, string) ([]compute.VirtualMachineScaleSetVM, error) List(context.Context, string) ([]compute.VirtualMachineScaleSet, error) + DeleteInstance(context.Context, string, string, string, string) error } ) @@ -89,36 +97,56 @@ func (s *azureManagedMachinePoolService) Reconcile(ctx context.Context) error { s.scope.Info("reconciling machine pool") agentPoolName := s.scope.AgentPoolSpec().Name + nodeResourceGroup := s.scope.NodeResourceGroup() // get the node resource group of the type SPC_MC___ + // set annotation with current time if agent pool needs update ; setting it in reconcile + // agentpool svc takes over if err := s.agentPoolsSvc.Reconcile(ctx); err != nil { - return errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName) + s.scope.Error(err, "error while reconciling agentpoool, checking if node drain timeout set and reached") + // if error check if node timeout has passed + // on timeout go through each node and if the config does not match delete that node using vmssvm + // on delete vm remove the annotation + ndt := s.scope.GetNodeDrainTimeout() + //if node drain timeout is not set then we do not have to do anything and return the err + if ndt.Seconds() == 0 { + s.scope.Info("machine pool has no node drain timeout available", "machinepool", agentPoolName) + return err + } + annotations := s.scope.GetAgentPoolAnnotations() + ts, ok := annotations[infraexpv1.NodeDrainTimeoutAnnotation] + // add annotation when agentpool reconciler returns error and the annotation is missing + if !ok { + s.scope.Info("NodeDrainTimeoutAnnotation missing") + s.scope.SetAgentPoolAnnotations(infraexpv1.NodeDrainTimeoutAnnotation, time.Now().UTC().String()) + //return error as annotation was added recently + return errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName) + } + t, terr := time.Parse(layout, ts) + if terr != nil { + s.scope.Error(terr, "unable to parse time from nodedraintimeout annotation", "timestring", ts) + return errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName) + } + now := time.Now() + diff := now.Sub(t) + // reconcile individual nodes on timeout exceeded + if diff.Seconds() > ndt.Seconds() { + s.scope.Info("reconciling agentpool failed, node timeout exceeded") + return s.reconcileNodes(ctx, nodeResourceGroup, agentPoolName) + } + return azure.WithTransientError(errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName),20*time.Second) } - nodeResourceGroup := s.scope.NodeResourceGroup() - vmss, err := s.scaleSetsSvc.List(ctx, nodeResourceGroup) + // get vm scale set from the node resource group, match will have the right agent pool + match, err := s.getVMScaleSet(ctx, nodeResourceGroup, agentPoolName) if err != nil { - return errors.Wrapf(err, "failed to list vmss in resource group %s", nodeResourceGroup) - } - - var match *compute.VirtualMachineScaleSet - for _, ss := range vmss { - ss := ss - if ss.Tags["poolName"] != nil && *ss.Tags["poolName"] == agentPoolName { - match = &ss - break - } - - if ss.Tags["aks-managed-poolName"] != nil && *ss.Tags["aks-managed-poolName"] == agentPoolName { - match = &ss - break - } + return err } if match == nil { return azure.WithTransientError(NewAgentPoolVMSSNotFoundError(nodeResourceGroup, agentPoolName), 20*time.Second) } - instances, err := s.scaleSetsSvc.ListInstances(ctx, nodeResourceGroup, *match.Name) + instances, err := s.scaleSetsSvc.ListInstances(ctx, nodeResourceGroup, *match.Name) //get all the vm instances in the vmss if err != nil { return errors.Wrapf(err, "failed to reconcile machine pool %s", agentPoolName) } @@ -131,6 +159,7 @@ func (s *azureManagedMachinePoolService) Reconcile(ctx context.Context) error { s.scope.SetAgentPoolProviderIDList(providerIDs) s.scope.SetAgentPoolReplicas(int32(len(providerIDs))) s.scope.SetAgentPoolReady(true) + s.scope.DeleteAgentPoolAnnotation(infraexpv1.NodeDrainTimeoutAnnotation) s.scope.Info("reconciled machine pool successfully") return nil @@ -147,3 +176,110 @@ func (s *azureManagedMachinePoolService) Delete(ctx context.Context) error { return nil } + +func (s *azureManagedMachinePoolService) getVMScaleSet(ctx context.Context, nodeResourceGroup string, agentPoolName string) (*compute.VirtualMachineScaleSet, error) { + vmss, err := s.scaleSetsSvc.List(ctx, nodeResourceGroup) + if err != nil { + return nil, errors.Wrapf(err, "failed to list vmss in resource group %s", nodeResourceGroup) + } + + var match *compute.VirtualMachineScaleSet + for _, ss := range vmss { + ss := ss + if ss.Tags["poolName"] != nil && *ss.Tags["poolName"] == agentPoolName { + match = &ss + break + } + + if ss.Tags["aks-managed-poolName"] != nil && *ss.Tags["aks-managed-poolName"] == agentPoolName { + match = &ss + break + } + } + return match, nil +} + +// reconcileNodes gets scale set version, instances in the scale set and k8s node version in that scaleset; +// and deletes the nodes that do not match the version. +func (s *azureManagedMachinePoolService) reconcileNodes(ctx context.Context, nodeResourceGroup, agentPoolName string) error { + //get the vm scaleset details using agentpool name + scaleset, err := s.getVMScaleSet(ctx,nodeResourceGroup, agentPoolName) + if err != nil { + return err + } + + if scaleset == nil { + return azure.WithTransientError(NewAgentPoolVMSSNotFoundError(nodeResourceGroup, agentPoolName), 20*time.Second) + } + + //version stores the k8s version for the scaleset + var version string = "" + versionTag := scaleset.Tags["aks-managed-orchestrator"] + if versionTag != nil { + strs := strings.Split(*versionTag, ":") + if len(strs) > 1 { + version = strs[1] + } + } + + if len(version) == 0 { + return azure.WithTransientError(errors.New("version tag aks-managed-orchestrator not available on scaleset"), 20 * time.Second) + } + s.scope.Info("version tag of scaleset", "scaleset k8s version", version) + + //get all the vm's in the vm scaleset + vmssvms, err := s.scaleSetsSvc.ListInstances(ctx, nodeResourceGroup, *scaleset.Name) + if err != nil || len(vmssvms) == 0 { + s.scope.Error(err, "unable to get instances in scaleset", "scaleset", scaleset.Name) + return azure.WithTransientError(err, 20*time.Second) + } + + //scalesetVMMap is a map of vm computer name and scaleset vm + var scalesetVMMap = make(map[string]compute.VirtualMachineScaleSetVM) + for _, vmssvm := range vmssvms { + if vmssvm.OsProfile.ComputerName != nil { + scalesetVMMap[*vmssvm.OsProfile.ComputerName] = vmssvm + } + } + + // get the k8s client from the managed machine pool scope + k8sClient := s.scope.GetInfraClient() + //nodelist will have list of k8s nodes for the given agentpool + var nodelist apicore.NodeList + listOpts := ctrlclient.MatchingLabels(map[string]string{"agentpool": agentPoolName}) + //fetch all the nodes matching the label + if err = k8sClient.List(ctx, &nodelist, listOpts); err != nil { + return errors.Wrapf(err, "unable to get agent pool node") + } + + var nodesToDelete []string + for _, node := range nodelist.Items { + if node.Status.NodeInfo.KubeletVersion[1:] != version { + nodesToDelete = append(nodesToDelete, node.Name) + } + } + + if len(nodesToDelete) == 0{ + // all the nodes have been updated and we need not to clean the pool + s.scope.Info("no nodes to delete") + s.scope.DeleteAgentPoolAnnotation(infraexpv1.NodeDrainTimeoutAnnotation) + return nil + } + + s.scope.Info("deleting nodes", "nodesToDelete", nodesToDelete) + + for _ , name := range nodesToDelete { + s.scope.Info("deleting scaleset vm", "nodeName", name) + err = s.scaleSetsSvc.DeleteInstance(ctx, nodeResourceGroup, *scaleset.Name, name, *scalesetVMMap[name].InstanceID) + if err != nil { + s.scope.Error(err,fmt.Sprintf("failed to delete the scaleset vm %s",name)) + } + } + //if after delete any error exist we should reconcile. + if err != nil { + err := errors.New("unable to delete all the nodes") + return azure.WithTransientError(err, 20*time.Second) + } + s.scope.DeleteAgentPoolAnnotation(infraexpv1.NodeDrainTimeoutAnnotation) + return nil +} \ No newline at end of file