Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers
opts.DeleteOptions,
opts.DrainabilityRules,
opts.DraProvider,
opts.QuotasProvider,
), nil
}

Expand Down Expand Up @@ -142,6 +144,10 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
}
opts.ExpanderStrategy = expanderStrategy
}
if opts.QuotasProvider == nil {
cloudQuotasProvider := resourcequotas.NewCloudQuotasProvider(opts.CloudProvider)
opts.QuotasProvider = resourcequotas.NewCombinedQuotasProvider([]resourcequotas.Provider{cloudQuotasProvider})
}

return nil
}
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/options/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
Expand Down Expand Up @@ -57,4 +58,5 @@ type AutoscalerOptions struct {
DeleteOptions options.NodeDeleteOptions
DrainabilityRules rules.Rules
DraProvider *draprovider.Provider
QuotasProvider resourcequotas.Provider
}
90 changes: 57 additions & 33 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/equivalence"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
Expand All @@ -47,6 +49,7 @@ type ScaleUpOrchestrator struct {
autoscalingCtx *ca_context.AutoscalingContext
processors *ca_processors.AutoscalingProcessors
resourceManager *resource.Manager
quotasTrackerFactory *resourcequotas.TrackerFactory
clusterStateRegistry *clusterstate.ClusterStateRegistry
scaleUpExecutor *scaleUpExecutor
estimatorBuilder estimator.EstimatorBuilder
Expand All @@ -68,6 +71,7 @@ func (o *ScaleUpOrchestrator) Initialize(
clusterStateRegistry *clusterstate.ClusterStateRegistry,
estimatorBuilder estimator.EstimatorBuilder,
taintConfig taints.TaintConfig,
quotasTrackerFactory *resourcequotas.TrackerFactory,
) {
o.autoscalingCtx = autoscalingCtx
o.processors = processors
Expand All @@ -76,6 +80,7 @@ func (o *ScaleUpOrchestrator) Initialize(
o.taintConfig = taintConfig
o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor)
o.scaleUpExecutor = newScaleUpExecutor(autoscalingCtx, processors.ScaleStateNotifier, o.processors.AsyncNodeGroupStateChecker)
o.quotasTrackerFactory = quotasTrackerFactory
o.initialized = true
}

Expand Down Expand Up @@ -121,15 +126,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(
// Initialise binpacking limiter.
o.processors.BinpackingLimiter.InitBinpacking(o.autoscalingCtx, nodeGroups)

resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
if aErr != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
tracker, err := o.newQuotasTracker()
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
}

now := time.Now()

// Filter out invalid node groups
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, len(nodes)+len(upcomingNodes), now)
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, tracker, len(nodes)+len(upcomingNodes), now)

// Mark skipped node groups as processed.
for nodegroupID := range skippedNodeGroups {
Expand Down Expand Up @@ -194,7 +199,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
}

newNodes, aErr = o.applyLimits(newNodes, resourcesLeft, bestOption.NodeGroup, nodeInfos)
newNodes, aErr = o.applyLimits(newNodes, tracker, bestOption.NodeGroup, nodeInfos)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods},
Expand Down Expand Up @@ -283,14 +288,18 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}, nil
}

func (o *ScaleUpOrchestrator) applyLimits(newNodes int, resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
func (o *ScaleUpOrchestrator) applyLimits(newNodes int, tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
klog.Errorf("No node info for: %s", nodeGroup.Id())
return 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for best expansion option!")
}
return o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodes, resourcesLeft, nodeInfo, nodeGroup)
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), newNodes)
if err != nil {
return 0, errors.ToAutoscalerError(errors.InternalError, err)
}
return checkResult.AllowedDelta, nil
}

// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
Expand All @@ -309,9 +318,9 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
nodeGroups := o.autoscalingCtx.CloudProvider.NodeGroups()
scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0)

resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
if aErr != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
tracker, err := o.newQuotasTracker()
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
}

for _, ng := range nodeGroups {
Expand Down Expand Up @@ -342,17 +351,18 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
continue
}

if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, ng, nodeInfo, 1); skipReason != nil {
if skipReason := o.IsNodeGroupResourceExceeded(tracker, ng, nodeInfo, 1); skipReason != nil {
klog.Warningf("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason)
continue
}

newNodeCount := ng.MinSize() - targetSize
newNodeCount, err = o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodeCount, resourcesLeft, nodeInfo, ng)
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, ng, nodeInfo.Node(), newNodeCount)
if err != nil {
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to apply resource limits: %v", err)
continue
}
newNodeCount = checkResult.AllowedDelta

newNodeCount, err = o.GetCappedNewNodeCount(newNodeCount, targetSize)
if err != nil {
Expand Down Expand Up @@ -397,7 +407,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*framework.NodeInfo,
resourcesLeft resource.Limits,
tracker *resourcequotas.Tracker,
currentNodeCount int,
now time.Time,
) ([]cloudprovider.NodeGroup, map[string]status.Reasons) {
Expand Down Expand Up @@ -441,7 +451,7 @@ func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo, numNodes); skipReason != nil {
if skipReason := o.IsNodeGroupResourceExceeded(tracker, nodeGroup, nodeInfo, numNodes); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}
Expand Down Expand Up @@ -664,37 +674,35 @@ func (o *ScaleUpOrchestrator) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.
}

// IsNodeGroupResourceExceeded returns nil if node group resource limits are not exceeded, otherwise a reason is provided.
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
resourcesDelta, err := o.resourceManager.DeltaForNode(o.autoscalingCtx, nodeInfo, nodeGroup)
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), numNodes)
if err != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
klog.Errorf("Skipping node group %s; error checking resource quotas: %v", nodeGroup.Id(), err)
return NotReadyReason
}

for resource, delta := range resourcesDelta {
resourcesDelta[resource] = delta * int64(numNodes)
}

checkResult := resource.CheckDeltaWithinLimits(resourcesLeft, resourcesDelta)
if checkResult.Exceeded {
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.ExceededResources)
for _, resource := range checkResult.ExceededResources {
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleUpCPU()
case cloudprovider.ResourceNameMemory:
metrics.RegisterSkippedScaleUpMemory()
default:
continue
if checkResult.Exceeded() {
for _, quota := range checkResult.ExceededQuotas {
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), quota.ExceededResources)
for _, resource := range quota.ExceededResources {
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleUpCPU()
case cloudprovider.ResourceNameMemory:
metrics.RegisterSkippedScaleUpMemory()
default:
continue
}
}
}
return NewMaxResourceLimitReached(checkResult.ExceededResources)
return NewMaxResourceLimitReached(checkResult.ExceededQuotas)
}
return nil
}

// GetCappedNewNodeCount caps resize according to cluster wide node count limit.
func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) {
// TODO: port MaxNodesTotal to a resource quota
if o.autoscalingCtx.MaxNodesTotal > 0 && newNodeCount+currentNodeCount > o.autoscalingCtx.MaxNodesTotal {
klog.V(1).Infof("Capping size to max cluster total size (%d)", o.autoscalingCtx.MaxNodesTotal)
newNodeCount = o.autoscalingCtx.MaxNodesTotal - currentNodeCount
Expand Down Expand Up @@ -787,6 +795,22 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
return validSimilarNodeGroups
}

func (o *ScaleUpOrchestrator) newQuotasTracker() (*resourcequotas.Tracker, error) {
var nodes []*apiv1.Node
nodeInfos, err := o.autoscalingCtx.ClusterSnapshot.ListNodeInfos()
if err != nil {
return nil, err
}
for _, nodeInfo := range nodeInfos {
node := nodeInfo.Node()
if utils.IsVirtualNode(node) {
continue
}
nodes = append(nodes, nodeInfo.Node())
}
return o.quotasTrackerFactory.NewQuotasTracker(o.autoscalingCtx, nodes)
}

func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool {
schedulableSamplePods := make(map[*apiv1.Pod]bool)
for _, podGroup := range similarPodGroups {
Expand Down
Loading
Loading