From 59e55705fb0e0076026ed28c8c22bd64b5c82288 Mon Sep 17 00:00:00 2001 From: Norbert Cyran Date: Wed, 12 Nov 2025 17:43:47 +0100 Subject: [PATCH] Integrate resource quotas with scale up # Conflicts: # cluster-autoscaler/core/utils/utils.go # cluster-autoscaler/resourcequotas/tracker.go --- cluster-autoscaler/core/autoscaler.go | 6 + cluster-autoscaler/core/options/autoscaler.go | 2 + .../core/scaleup/orchestrator/orchestrator.go | 90 ++++-- .../scaleup/orchestrator/orchestrator_test.go | 281 ++++++++++++++++-- .../scaleup/orchestrator/skippedreasons.go | 18 +- .../orchestrator/skippedreasons_test.go | 24 +- cluster-autoscaler/core/scaleup/scaleup.go | 5 +- cluster-autoscaler/core/static_autoscaler.go | 10 +- .../core/static_autoscaler_test.go | 31 +- cluster-autoscaler/core/test/common.go | 2 + cluster-autoscaler/core/utils/utils.go | 6 +- .../besteffortatomic/provisioning_class.go | 4 +- .../checkcapacity/provisioningclass.go | 2 + .../orchestrator/orchestrator.go | 6 +- .../orchestrator/orchestrator_test.go | 8 +- .../orchestrator/wrapper_orchestrator.go | 13 +- .../orchestrator/wrapper_orchestrator_test.go | 9 +- cluster-autoscaler/resourcequotas/provider.go | 27 +- .../resourcequotas/provider_test.go | 57 ++++ .../resourcequotas/testutils.go | 53 +++- cluster-autoscaler/resourcequotas/tracker.go | 2 +- .../resourcequotas/tracker_test.go | 30 +- .../resourcequotas/usage_test.go | 48 +-- 23 files changed, 573 insertions(+), 161 deletions(-) diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 7c0d8f223102..f93aeeb6ef4b 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -28,6 +28,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander/factory" "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" @@ -76,6 +77,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers opts.DeleteOptions, opts.DrainabilityRules, opts.DraProvider, + opts.QuotasProvider, ), nil } @@ -142,6 +144,10 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto } opts.ExpanderStrategy = expanderStrategy } + if opts.QuotasProvider == nil { + cloudQuotasProvider := resourcequotas.NewCloudQuotasProvider(opts.CloudProvider) + opts.QuotasProvider = resourcequotas.NewCombinedQuotasProvider([]resourcequotas.Provider{cloudQuotasProvider}) + } return nil } diff --git a/cluster-autoscaler/core/options/autoscaler.go b/cluster-autoscaler/core/options/autoscaler.go index 0ee0dbb12ed8..3c76b5abd6c4 100644 --- a/cluster-autoscaler/core/options/autoscaler.go +++ b/cluster-autoscaler/core/options/autoscaler.go @@ -27,6 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider" @@ -57,4 +58,5 @@ type AutoscalerOptions struct { DeleteOptions options.NodeDeleteOptions DrainabilityRules rules.Rules DraProvider *draprovider.Provider + QuotasProvider resourcequotas.Provider } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 9826d704c8f9..4e104a5c2f31 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -27,6 +27,7 @@ import ( ca_context "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/equivalence" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/resource" + "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" @@ -34,6 +35,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -47,6 +49,7 @@ type ScaleUpOrchestrator struct { autoscalingCtx *ca_context.AutoscalingContext processors *ca_processors.AutoscalingProcessors resourceManager *resource.Manager + quotasTrackerFactory *resourcequotas.TrackerFactory clusterStateRegistry *clusterstate.ClusterStateRegistry scaleUpExecutor *scaleUpExecutor estimatorBuilder estimator.EstimatorBuilder @@ -68,6 +71,7 @@ func (o *ScaleUpOrchestrator) Initialize( clusterStateRegistry *clusterstate.ClusterStateRegistry, estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, + quotasTrackerFactory *resourcequotas.TrackerFactory, ) { o.autoscalingCtx = autoscalingCtx o.processors = processors @@ -76,6 +80,7 @@ func (o *ScaleUpOrchestrator) Initialize( o.taintConfig = taintConfig o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor) o.scaleUpExecutor = newScaleUpExecutor(autoscalingCtx, processors.ScaleStateNotifier, o.processors.AsyncNodeGroupStateChecker) + o.quotasTrackerFactory = quotasTrackerFactory o.initialized = true } @@ -121,15 +126,15 @@ func (o *ScaleUpOrchestrator) ScaleUp( // Initialise binpacking limiter. o.processors.BinpackingLimiter.InitBinpacking(o.autoscalingCtx, nodeGroups) - resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes) - if aErr != nil { - return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) + tracker, err := o.newQuotasTracker() + if err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err)) } now := time.Now() // Filter out invalid node groups - validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, len(nodes)+len(upcomingNodes), now) + validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, tracker, len(nodes)+len(upcomingNodes), now) // Mark skipped node groups as processed. for nodegroupID := range skippedNodeGroups { @@ -194,7 +199,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr) } - newNodes, aErr = o.applyLimits(newNodes, resourcesLeft, bestOption.NodeGroup, nodeInfos) + newNodes, aErr = o.applyLimits(newNodes, tracker, bestOption.NodeGroup, nodeInfos) if aErr != nil { return status.UpdateScaleUpError( &status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, @@ -283,14 +288,18 @@ func (o *ScaleUpOrchestrator) ScaleUp( }, nil } -func (o *ScaleUpOrchestrator) applyLimits(newNodes int, resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) { +func (o *ScaleUpOrchestrator) applyLimits(newNodes int, tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) { nodeInfo, found := nodeInfos[nodeGroup.Id()] if !found { // This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup. klog.Errorf("No node info for: %s", nodeGroup.Id()) return 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for best expansion option!") } - return o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodes, resourcesLeft, nodeInfo, nodeGroup) + checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), newNodes) + if err != nil { + return 0, errors.ToAutoscalerError(errors.InternalError, err) + } + return checkResult.AllowedDelta, nil } // ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes @@ -309,9 +318,9 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( nodeGroups := o.autoscalingCtx.CloudProvider.NodeGroups() scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0) - resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes) - if aErr != nil { - return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) + tracker, err := o.newQuotasTracker() + if err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err)) } for _, ng := range nodeGroups { @@ -342,17 +351,18 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( continue } - if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, ng, nodeInfo, 1); skipReason != nil { + if skipReason := o.IsNodeGroupResourceExceeded(tracker, ng, nodeInfo, 1); skipReason != nil { klog.Warningf("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason) continue } newNodeCount := ng.MinSize() - targetSize - newNodeCount, err = o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodeCount, resourcesLeft, nodeInfo, ng) + checkResult, err := tracker.CheckDelta(o.autoscalingCtx, ng, nodeInfo.Node(), newNodeCount) if err != nil { klog.Warningf("ScaleUpToNodeGroupMinSize: failed to apply resource limits: %v", err) continue } + newNodeCount = checkResult.AllowedDelta newNodeCount, err = o.GetCappedNewNodeCount(newNodeCount, targetSize) if err != nil { @@ -397,7 +407,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups( nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo, - resourcesLeft resource.Limits, + tracker *resourcequotas.Tracker, currentNodeCount int, now time.Time, ) ([]cloudprovider.NodeGroup, map[string]status.Reasons) { @@ -441,7 +451,7 @@ func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups( skippedNodeGroups[nodeGroup.Id()] = NotReadyReason continue } - if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo, numNodes); skipReason != nil { + if skipReason := o.IsNodeGroupResourceExceeded(tracker, nodeGroup, nodeInfo, numNodes); skipReason != nil { skippedNodeGroups[nodeGroup.Id()] = skipReason continue } @@ -664,37 +674,35 @@ func (o *ScaleUpOrchestrator) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider. } // IsNodeGroupResourceExceeded returns nil if node group resource limits are not exceeded, otherwise a reason is provided. -func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons { - resourcesDelta, err := o.resourceManager.DeltaForNode(o.autoscalingCtx, nodeInfo, nodeGroup) +func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons { + checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), numNodes) if err != nil { - klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err) + klog.Errorf("Skipping node group %s; error checking resource quotas: %v", nodeGroup.Id(), err) return NotReadyReason } - for resource, delta := range resourcesDelta { - resourcesDelta[resource] = delta * int64(numNodes) - } - - checkResult := resource.CheckDeltaWithinLimits(resourcesLeft, resourcesDelta) - if checkResult.Exceeded { - klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.ExceededResources) - for _, resource := range checkResult.ExceededResources { - switch resource { - case cloudprovider.ResourceNameCores: - metrics.RegisterSkippedScaleUpCPU() - case cloudprovider.ResourceNameMemory: - metrics.RegisterSkippedScaleUpMemory() - default: - continue + if checkResult.Exceeded() { + for _, quota := range checkResult.ExceededQuotas { + klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), quota.ExceededResources) + for _, resource := range quota.ExceededResources { + switch resource { + case cloudprovider.ResourceNameCores: + metrics.RegisterSkippedScaleUpCPU() + case cloudprovider.ResourceNameMemory: + metrics.RegisterSkippedScaleUpMemory() + default: + continue + } } } - return NewMaxResourceLimitReached(checkResult.ExceededResources) + return NewMaxResourceLimitReached(checkResult.ExceededQuotas) } return nil } // GetCappedNewNodeCount caps resize according to cluster wide node count limit. func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) { + // TODO: port MaxNodesTotal to a resource quota if o.autoscalingCtx.MaxNodesTotal > 0 && newNodeCount+currentNodeCount > o.autoscalingCtx.MaxNodesTotal { klog.V(1).Infof("Capping size to max cluster total size (%d)", o.autoscalingCtx.MaxNodesTotal) newNodeCount = o.autoscalingCtx.MaxNodesTotal - currentNodeCount @@ -787,6 +795,22 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups( return validSimilarNodeGroups } +func (o *ScaleUpOrchestrator) newQuotasTracker() (*resourcequotas.Tracker, error) { + var nodes []*apiv1.Node + nodeInfos, err := o.autoscalingCtx.ClusterSnapshot.ListNodeInfos() + if err != nil { + return nil, err + } + for _, nodeInfo := range nodeInfos { + node := nodeInfo.Node() + if utils.IsVirtualNode(node) { + continue + } + nodes = append(nodes, nodeInfo.Node()) + } + return o.quotasTrackerFactory.NewQuotasTracker(o.autoscalingCtx, nodes) +} + func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool { schedulableSamplePods := make(map[*apiv1.Pod]bool) for _, podGroup := range similarPodGroups { diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 82fe5b75def1..d633a624a117 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -21,6 +21,7 @@ import ( "net/http" "net/http/httptest" "regexp" + "slices" "strings" "sync/atomic" "testing" @@ -46,6 +47,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/status" processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -59,6 +61,8 @@ import ( "github.com/stretchr/testify/assert" ) +const nodeGroupLabel = "ng" + var defaultOptions = config.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, MaxCoresTotal: config.DefaultMaxClusterCores, @@ -293,7 +297,7 @@ func TestZeroOrMaxNodeScaling(t *testing.T) { }, }, expectedResults: &ScaleTestResults{ - NoScaleUpReason: "max cluster cpu limit reached", + NoScaleUpReason: "exceeded quota: \"cluster-wide\", resources: cpu", ScaleUpStatus: ScaleUpStatusInfo{ PodsRemainUnschedulable: []string{"p-new-1", "p-new-2"}, }, @@ -327,7 +331,7 @@ func TestZeroOrMaxNodeScaling(t *testing.T) { }, }, expectedResults: &ScaleTestResults{ - NoScaleUpReason: "max cluster memory limit reached", + NoScaleUpReason: "exceeded quota: \"cluster-wide\", resources: memory", ScaleUpStatus: ScaleUpStatusInfo{ PodsRemainUnschedulable: []string{"p-new-1", "p-new-2"}, }, @@ -500,6 +504,163 @@ func TestScaleUpMaxMemoryLimitHitWithNotAutoscaledGroup(t *testing.T) { simpleScaleUpTest(t, config, results) } +func TestScaleUpWithMultipleQuotas(t *testing.T) { + testCases := []struct { + name string + testConfig *ScaleUpTestConfig + expectedResults *ScaleTestResults + isScaleUpOk bool + }{ + { + name: "all quotas passed", + testConfig: &ScaleUpTestConfig{ + Nodes: []NodeConfig{ + {Name: "n1", Cpu: 2000, Ready: true, Group: "ng1"}, + {Name: "n2", Cpu: 2000, Ready: true, Group: "ng2"}, + }, + ExtraPods: []PodConfig{ + {Name: "p1", Cpu: 1000}, + }, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 1}, + ResourceQuotas: []resourcequotas.Quota{ + &resourcequotas.FakeQuota{ + Name: "global", + AppliesToFn: resourcequotas.MatchEveryNode, + LimitsVal: map[string]int64{"cpu": 8}, + }, + &resourcequotas.FakeQuota{ + Name: "quota-ng2", + AppliesToFn: matchNodeGroups([]string{"ng2"}), + LimitsVal: map[string]int64{"cpu": 4}, + }, + }, + }, + expectedResults: &ScaleTestResults{ + ExpansionOptions: []GroupSizeChange{ + {GroupName: "ng1", SizeChange: 1}, + {GroupName: "ng2", SizeChange: 1}, + }, + FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 1}, + ScaleUpStatus: ScaleUpStatusInfo{ + Result: status.ScaleUpSuccessful, + PodsTriggeredScaleUp: []string{"p1"}, + }, + }, + isScaleUpOk: true, + }, + { + name: "ng filtered out by quota", + testConfig: &ScaleUpTestConfig{ + Nodes: []NodeConfig{ + {Name: "n1", Cpu: 2000, Ready: true, Group: "ng1"}, + {Name: "n2", Cpu: 2000, Ready: true, Group: "ng2"}, + }, + ExtraPods: []PodConfig{ + {Name: "p1", Cpu: 1000}, + }, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 1}, + ResourceQuotas: []resourcequotas.Quota{ + &resourcequotas.FakeQuota{ + Name: "global", + AppliesToFn: resourcequotas.MatchEveryNode, + LimitsVal: map[string]int64{"cpu": 8}, + }, + &resourcequotas.FakeQuota{ + Name: "quota-ng2", + AppliesToFn: matchNodeGroups([]string{"ng2"}), + LimitsVal: map[string]int64{"cpu": 2}, + }, + }, + }, + expectedResults: &ScaleTestResults{ + ExpansionOptions: []GroupSizeChange{ + {GroupName: "ng1", SizeChange: 1}, + // ng2 filtered out due to quota + }, + FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 1}, + ScaleUpStatus: ScaleUpStatusInfo{ + Result: status.ScaleUpSuccessful, + PodsTriggeredScaleUp: []string{"p1"}, + }, + }, + isScaleUpOk: true, + }, + { + name: "both node groups exceed quota", + testConfig: &ScaleUpTestConfig{ + Nodes: []NodeConfig{ + {Name: "n1", Cpu: 2000, Ready: true, Group: "ng1"}, + {Name: "n2", Cpu: 2000, Ready: true, Group: "ng2"}, + }, + ExtraPods: []PodConfig{ + {Name: "p1", Cpu: 1000}, + }, + ResourceQuotas: []resourcequotas.Quota{ + &resourcequotas.FakeQuota{ + Name: "quota-ng1", + AppliesToFn: matchNodeGroups([]string{"ng1"}), + LimitsVal: map[string]int64{"cpu": 2}, + }, + &resourcequotas.FakeQuota{ + Name: "quota-ng2", + AppliesToFn: matchNodeGroups([]string{"ng2"}), + LimitsVal: map[string]int64{"cpu": 2}, + }, + }, + }, + expectedResults: &ScaleTestResults{ + ScaleUpStatus: ScaleUpStatusInfo{ + Result: status.ScaleUpNoOptionsAvailable, + PodsRemainUnschedulable: []string{"p1"}, + }, + }, + isScaleUpOk: false, + }, + { + name: "capped by quota", + testConfig: &ScaleUpTestConfig{ + Nodes: []NodeConfig{ + {Name: "n1", Cpu: 2000, Ready: true, Group: "ng1"}, + }, + ExtraPods: []PodConfig{ + {Name: "p1", Cpu: 2000}, + {Name: "p2", Cpu: 2000}, + {Name: "p3", Cpu: 2000}, + }, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 3}, + ResourceQuotas: []resourcequotas.Quota{ + &resourcequotas.FakeQuota{ + Name: "quota-ng1", + AppliesToFn: matchNodeGroups([]string{"ng1"}), + LimitsVal: map[string]int64{"cpu": 6}, + }, + }, + }, + expectedResults: &ScaleTestResults{ + ExpansionOptions: []GroupSizeChange{ + {GroupName: "ng1", SizeChange: 3}, + }, + FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 2}, + ScaleUpStatus: ScaleUpStatusInfo{ + Result: status.ScaleUpSuccessful, + PodsTriggeredScaleUp: []string{"p1", "p2", "p3"}, + }, + }, + isScaleUpOk: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.isScaleUpOk { + simpleScaleUpTest(t, tc.testConfig, tc.expectedResults) + } else { + simpleNoScaleUpTest(t, tc.testConfig, tc.expectedResults) + } + }) + } +} + func TestScaleUpTwoGroups(t *testing.T) { options := defaultOptions options.BalanceSimilarNodeGroups = true @@ -813,7 +974,7 @@ func TestNoScaleUpMaxCoresLimitHit(t *testing.T) { Options: &options, } results := &ScaleTestResults{ - NoScaleUpReason: "max cluster cpu, memory limit reached", + NoScaleUpReason: "exceeded quota: \"cluster-wide\"", ScaleUpStatus: ScaleUpStatusInfo{ PodsRemainUnschedulable: []string{"p-new-1", "p-new-2"}, }, @@ -953,28 +1114,29 @@ func simpleNoScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResult "actual and expected awaiting evaluation pods should be the same") } -func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestResult { +func runSimpleScaleUpTest(t *testing.T, testConfig *ScaleUpTestConfig) *ScaleUpTestResult { now := time.Now() groupSizeChangesChannel := make(chan GroupSizeChange, 20) groupNodes := make(map[string][]*apiv1.Node) // build nodes - nodes := make([]*apiv1.Node, 0, len(config.Nodes)) - for _, n := range config.Nodes { + nodes := make([]*apiv1.Node, 0, len(testConfig.Nodes)) + for _, n := range testConfig.Nodes { node := buildTestNode(n, now) nodes = append(nodes, node) if n.Group != "" { groupNodes[n.Group] = append(groupNodes[n.Group], node) + node.Labels[nodeGroupLabel] = n.Group } } // build and setup pods - pods := make([]*apiv1.Pod, len(config.Pods)) - for i, p := range config.Pods { + pods := make([]*apiv1.Pod, len(testConfig.Pods)) + for i, p := range testConfig.Pods { pods[i] = buildTestPod(p) } - extraPods := make([]*apiv1.Pod, len(config.ExtraPods)) - for i, p := range config.ExtraPods { + extraPods := make([]*apiv1.Pod, len(testConfig.ExtraPods)) + for i, p := range testConfig.ExtraPods { extraPods[i] = buildTestPod(p) } podLister := kube_util.NewTestPodLister(pods) @@ -984,21 +1146,21 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR var provider *testprovider.TestCloudProvider onScaleUpFunc := func(nodeGroup string, increase int) error { groupSizeChangesChannel <- GroupSizeChange{GroupName: nodeGroup, SizeChange: increase} - if config.OnScaleUp != nil { - return config.OnScaleUp(nodeGroup, increase) + if testConfig.OnScaleUp != nil { + return testConfig.OnScaleUp(nodeGroup, increase) } return nil } onCreateGroupFunc := func(nodeGroup string) error { - if config.OnCreateGroup != nil { - return config.OnCreateGroup(nodeGroup) + if testConfig.OnCreateGroup != nil { + return testConfig.OnCreateGroup(nodeGroup) } return fmt.Errorf("unexpected node group create: OnCreateGroup not defined") } - if len(config.NodeTemplateConfigs) > 0 { + if len(testConfig.NodeTemplateConfigs) > 0 { machineTypes := []string{} machineTemplates := map[string]*framework.NodeInfo{} - for _, ntc := range config.NodeTemplateConfigs { + for _, ntc := range testConfig.NodeTemplateConfigs { machineTypes = append(machineTypes, ntc.MachineType) machineTemplates[ntc.NodeGroupName] = ntc.NodeInfo machineTemplates[ntc.MachineType] = ntc.NodeInfo @@ -1008,15 +1170,15 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR provider = testprovider.NewTestCloudProviderBuilder().WithOnScaleUp(onScaleUpFunc).Build() } options := defaultOptions - if config.Options != nil { - options = *config.Options + if testConfig.Options != nil { + options = *testConfig.Options } resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: options.MinCoresTotal, cloudprovider.ResourceNameMemory: options.MinMemoryTotal}, map[string]int64{cloudprovider.ResourceNameCores: options.MaxCoresTotal, cloudprovider.ResourceNameMemory: options.MaxMemoryTotal}) provider.SetResourceLimiter(resourceLimiter) groupConfigs := make(map[string]*NodeGroupConfig) - for _, group := range config.Groups { + for _, group := range testConfig.Groups { groupConfigs[group.Name] = &group } for name, nodesInGroup := range groupNodes { @@ -1037,7 +1199,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR // Build node groups without any nodes for name, ng := range groupConfigs { if provider.GetNodeGroup(name) == nil { - tng := provider.BuildNodeGroup(name, ng.MinSize, ng.MaxSize, 0, true, false, config.NodeTemplateConfigs[name].MachineType, &options.NodeGroupDefaults) + tng := provider.BuildNodeGroup(name, ng.MinSize, ng.MaxSize, 0, true, false, testConfig.NodeTemplateConfigs[name].MachineType, &options.NodeGroupDefaults) provider.InsertNodeGroup(tng) } } @@ -1053,17 +1215,24 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) processors := processorstest.NewTestProcessors(&autoscalingCtx) processors.ScaleStateNotifier.Register(clusterState) - if config.EnableAutoprovisioning { + processors.NodeGroupSetProcessor = nodegroupset.NewDefaultNodeGroupSetProcessor([]string{nodeGroupLabel}, config.NodeGroupDifferenceRatios{}) + if testConfig.EnableAutoprovisioning { processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0} } + cloudQuotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + fakeQuotasProvider := resourcequotas.NewFakeProvider(testConfig.ResourceQuotas) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: resourcequotas.NewCombinedQuotasProvider([]resourcequotas.Provider{cloudQuotasProvider, fakeQuotasProvider}), + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) orchestrator := New() - orchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - expander := NewMockReportingStrategy(t, config.ExpansionOptionToChoose, nil) + orchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) + expander := NewMockReportingStrategy(t, testConfig.ExpansionOptionToChoose, nil) autoscalingCtx.ExpanderStrategy = expander // scale up - scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, config.AllOrNothing) + scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, testConfig.AllOrNothing) processors.ScaleUpStatusProcessor.Process(&autoscalingCtx, scaleUpStatus) // aggregate group size changes @@ -1163,8 +1332,13 @@ func TestScaleUpUnhealthy(t *testing.T) { p3 := BuildTestPod("p-new", 550, 0) processors := processorstest.NewTestProcessors(&autoscalingCtx) + quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: quotasProvider, + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) suOrchestrator := New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) @@ -1214,8 +1388,13 @@ func TestBinpackingLimiter(t *testing.T) { // We should stop binpacking after finding expansion option from first node group. processors.BinpackingLimiter = &MockBinpackingLimiter{} + quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: quotasProvider, + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) suOrchestrator := New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) expander := NewMockReportingStrategy(t, nil, nil) autoscalingCtx.ExpanderStrategy = expander @@ -1267,8 +1446,13 @@ func TestScaleUpNoHelp(t *testing.T) { p3 := BuildTestPod("p-new", 500, 0) processors := processorstest.NewTestProcessors(&autoscalingCtx) + quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: quotasProvider, + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) suOrchestrator := New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) processors.ScaleUpStatusProcessor.Process(&autoscalingCtx, scaleUpStatus) @@ -1421,7 +1605,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) { assert.NoError(t, clusterState.UpdateNodes(nodes, nodeInfos, time.Now())) suOrchestrator := &ScaleUpOrchestrator{} - suOrchestrator.Initialize(&autoscalingCtx, &processors.AutoscalingProcessors{NodeGroupSetProcessor: nodeGroupSetProcessor}, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, &processors.AutoscalingProcessors{NodeGroupSetProcessor: nodeGroupSetProcessor}, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, nil) similarNodeGroups := suOrchestrator.ComputeSimilarNodeGroups(provider.GetNodeGroup(tc.nodeGroup), nodeInfos, tc.schedulablePodGroups, now) var gotSimilarNodeGroups []string @@ -1514,8 +1698,13 @@ func TestScaleUpBalanceGroups(t *testing.T) { autoscalingCtx.ExpanderStrategy = tc.expanderStrategy } processors := processorstest.NewTestProcessors(&autoscalingCtx) + quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: quotasProvider, + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) suOrchestrator := New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, typedErr) @@ -1575,8 +1764,13 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { nodes := []*apiv1.Node{} nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) + quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: quotasProvider, + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) suOrchestrator := New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -1627,8 +1821,13 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { nodes := []*apiv1.Node{} nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) + quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: quotasProvider, + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) suOrchestrator := New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -1685,8 +1884,13 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) + quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: quotasProvider, + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) suOrchestrator := New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) scaleUpStatus, err := suOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -1778,8 +1982,13 @@ func TestScaleupAsyncNodeGroupsEnabled(t *testing.T) { nodes := []*apiv1.Node{} nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) + quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) + trackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: quotasProvider, + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) suOrchestrator := New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, trackerFactory) scaleUpStatus, err := suOrchestrator.ScaleUp(tc.podsToAdd, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -1920,3 +2129,9 @@ func newEstimatorBuilder() estimator.EstimatorBuilder { return estimatorBuilder } + +func matchNodeGroups(nodeGroups []string) func(*apiv1.Node) bool { + return func(n *apiv1.Node) bool { + return slices.Contains(nodeGroups, n.Labels[nodeGroupLabel]) + } +} diff --git a/cluster-autoscaler/core/scaleup/orchestrator/skippedreasons.go b/cluster-autoscaler/core/scaleup/orchestrator/skippedreasons.go index 85537cedc68e..0af50320997d 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/skippedreasons.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/skippedreasons.go @@ -19,6 +19,9 @@ package orchestrator import ( "fmt" "strings" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" ) // SkippedReasons contains information why given node group was skipped. @@ -61,10 +64,17 @@ func (sr *MaxResourceLimitReached) Resources() []string { return sr.resources } -// NewMaxResourceLimitReached returns a reason describing which cluster wide resource limits were reached. -func NewMaxResourceLimitReached(resources []string) *MaxResourceLimitReached { +// NewMaxResourceLimitReached returns a reason describing which resource limits were reached. +func NewMaxResourceLimitReached(exceededQuotas []resourcequotas.ExceededQuota) *MaxResourceLimitReached { + var messages []string + resources := make(sets.Set[string]) + for _, quota := range exceededQuotas { + msg := fmt.Sprintf("exceeded quota: %q, resources: %s", quota.ID, strings.Join(quota.ExceededResources, ", ")) + messages = append(messages, msg) + resources.Insert(quota.ExceededResources...) + } return &MaxResourceLimitReached{ - messages: []string{fmt.Sprintf("max cluster %s limit reached", strings.Join(resources, ", "))}, - resources: resources, + messages: messages, + resources: resources.UnsortedList(), } } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/skippedreasons_test.go b/cluster-autoscaler/core/scaleup/orchestrator/skippedreasons_test.go index 0c768e523b3c..bf453b9d8b92 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/skippedreasons_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/skippedreasons_test.go @@ -19,33 +19,47 @@ package orchestrator import ( "reflect" "testing" + + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" ) func TestMaxResourceLimitReached(t *testing.T) { tests := []struct { name string + quotaID string resources []string wantReasons []string }{ { name: "simple test", + quotaID: "test", resources: []string{"gpu"}, - wantReasons: []string{"max cluster gpu limit reached"}, + wantReasons: []string{`exceeded quota: "test", resources: gpu`}, }, { name: "multiple resources", + quotaID: "test", resources: []string{"gpu1", "gpu3", "tpu", "ram"}, - wantReasons: []string{"max cluster gpu1, gpu3, tpu, ram limit reached"}, + wantReasons: []string{`exceeded quota: "test", resources: gpu1, gpu3, tpu, ram`}, }, { name: "no resources", - wantReasons: []string{"max cluster limit reached"}, + quotaID: "test", + resources: []string{}, + wantReasons: []string{`exceeded quota: "test", resources: `}, + }, + { + name: "different quota ID", + quotaID: "project-quota", + resources: []string{"cpu"}, + wantReasons: []string{`exceeded quota: "project-quota", resources: cpu`}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewMaxResourceLimitReached(tt.resources); !reflect.DeepEqual(got.Reasons(), tt.wantReasons) { - t.Errorf("MaxResourceLimitReached(%v) = %v, want %v", tt.resources, got.Reasons(), tt.wantReasons) + exceededQuotas := []resourcequotas.ExceededQuota{{ID: tt.quotaID, ExceededResources: tt.resources}} + if got := NewMaxResourceLimitReached(exceededQuotas); !reflect.DeepEqual(got.Reasons(), tt.wantReasons) { + t.Errorf("MaxResourceLimitReached(quotaID=%v, resources=%v) = %v, want %v", tt.quotaID, tt.resources, got.Reasons(), tt.wantReasons) } }) } diff --git a/cluster-autoscaler/core/scaleup/scaleup.go b/cluster-autoscaler/core/scaleup/scaleup.go index 4edc1cdd9135..1caa8bef0756 100644 --- a/cluster-autoscaler/core/scaleup/scaleup.go +++ b/cluster-autoscaler/core/scaleup/scaleup.go @@ -24,6 +24,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -35,10 +36,10 @@ type Orchestrator interface { // Initialize initializes the orchestrator object with required fields. Initialize( autoscalingCtx *ca_context.AutoscalingContext, - processors *ca_processors.AutoscalingProcessors, - clusterStateRegistry *clusterstate.ClusterStateRegistry, + processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, + quotasTrackerFactory *resourcequotas.TrackerFactory, ) // ScaleUp tries to scale the cluster up. Returns appropriate status or error if // an unexpected error occurred. Assumes that all nodes in the cluster are ready diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 4512aa5beedf..5c3efaf53291 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -43,6 +43,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" @@ -141,7 +142,8 @@ func NewStaticAutoscaler( scaleUpOrchestrator scaleup.Orchestrator, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, - draProvider *draprovider.Provider) *StaticAutoscaler { + draProvider *draprovider.Provider, + quotasProvider resourcequotas.Provider) *StaticAutoscaler { klog.V(4).Infof("Creating new static autoscaler with opts: %v", opts) @@ -177,10 +179,14 @@ func NewStaticAutoscaler( scaleDownActuator := actuation.NewActuator(autoscalingCtx, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) autoscalingCtx.ScaleDownActuator = scaleDownActuator + quotasTrackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + CustomResourcesProcessor: processors.CustomResourcesProcessor, + QuotaProvider: quotasProvider, + }) if scaleUpOrchestrator == nil { scaleUpOrchestrator = orchestrator.New() } - scaleUpOrchestrator.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig) + scaleUpOrchestrator.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig, quotasTrackerFactory) // Set the initial scale times to be less than the start time so as to // not start in cooldown mode. diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 5425875cbe6a..1148d5d10318 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -31,6 +31,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" @@ -310,9 +311,10 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker) clusterState.UpdateNodes(allNodes, nil, config.nodeStateUpdateTime) processors.ScaleStateNotifier.Register(clusterState) + quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, quotasTrackerFactory) deleteOptions := options.NewNodeDeleteOptions(autoscalingCtx.AutoscalingOptions) drainabilityRules := rules.Default(deleteOptions) @@ -412,8 +414,9 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { processors := processorstest.NewTestProcessors(&autoscalingCtx) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) + quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, quotasTrackerFactory) autoscaler := &StaticAutoscaler{ AutoscalingContext: &autoscalingCtx, @@ -681,8 +684,9 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { processors.ScaleStateNotifier.Register(clusterState) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) + quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, quotasTrackerFactory) autoscaler := &StaticAutoscaler{ AutoscalingContext: &autoscalingCtx, @@ -826,8 +830,9 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) + quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, quotasTrackerFactory) autoscaler := &StaticAutoscaler{ AutoscalingContext: &autoscalingCtx, @@ -980,8 +985,9 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { clusterState.UpdateNodes(nodes, nil, later) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) + quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, quotasTrackerFactory) autoscaler := &StaticAutoscaler{ AutoscalingContext: &autoscalingCtx, @@ -1136,8 +1142,9 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { processors := processorstest.NewTestProcessors(&autoscalingCtx) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) + quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, quotasTrackerFactory) autoscaler := &StaticAutoscaler{ AutoscalingContext: &autoscalingCtx, @@ -1717,8 +1724,9 @@ func TestStaticAutoscalerRunOnceWithExistingDeletionCandidateNodes(t *testing.T) processors := processorstest.NewTestProcessors(&autoscalingCtx) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) + quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + suOrchestrator.Initialize(&autoscalingCtx, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}, quotasTrackerFactory) autoscaler := &StaticAutoscaler{ AutoscalingContext: &autoscalingCtx, @@ -3169,6 +3177,15 @@ func newEstimatorBuilder() estimator.EstimatorBuilder { return estimatorBuilder } +func newQuotasTrackerFactory(autoscalingCtx *ca_context.AutoscalingContext, p *ca_processors.AutoscalingProcessors) *resourcequotas.TrackerFactory { + cloudQuotasProvider := resourcequotas.NewCloudQuotasProvider(autoscalingCtx.CloudProvider) + quotasProvider := resourcequotas.NewCombinedQuotasProvider([]resourcequotas.Provider{cloudQuotasProvider}) + return resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + CustomResourcesProcessor: p.CustomResourcesProcessor, + QuotaProvider: quotasProvider, + }) +} + func TestCleaningSoftTaintsInScaleDown(t *testing.T) { provider := testprovider.NewTestCloudProviderBuilder().Build() diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 9e705419faaf..210ad6b86538 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -37,6 +37,7 @@ import ( processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" @@ -118,6 +119,7 @@ type ScaleUpTestConfig struct { NodeTemplateConfigs map[string]*NodeTemplateConfig EnableAutoprovisioning bool AllOrNothing bool + ResourceQuotas []resourcequotas.Quota } // ScaleUpTestResult represents a node groups scale up result diff --git a/cluster-autoscaler/core/utils/utils.go b/cluster-autoscaler/core/utils/utils.go index 2b0ac3254979..5200cf5a373e 100644 --- a/cluster-autoscaler/core/utils/utils.go +++ b/cluster-autoscaler/core/utils/utils.go @@ -33,8 +33,8 @@ const ( VirtualKubeletNodeLabelValue = "virtual-kubelet" ) -// isVirtualKubeletNode determines if the node is created by virtual kubelet -func isVirtualKubeletNode(node *apiv1.Node) bool { +// IsVirtualKubeletNode determines if the node is created by virtual kubelet +func IsVirtualKubeletNode(node *apiv1.Node) bool { if node == nil { return false } @@ -48,7 +48,7 @@ func FilterOutNodesFromNotAutoscaledGroups(nodes []*apiv1.Node, cloudProvider cl for _, node := range nodes { // Exclude the virtual node here since it may have lots of resource and exceed the total resource limit - if isVirtualKubeletNode(node) { + if IsVirtualKubeletNode(node) { continue } nodeGroup, err := cloudProvider.NodeGroupForNode(node) diff --git a/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go b/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go index 13d08ae19a43..3261cf0452dd 100644 --- a/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go +++ b/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ca_context "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/klog/v2" @@ -65,10 +66,11 @@ func (o *bestEffortAtomicProvClass) Initialize( estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, injector *scheduling.HintingSimulator, + quotasTrackerFactory *resourcequotas.TrackerFactory, ) { o.autoscalingCtx = autoscalingCtx o.injector = injector - o.scaleUpOrchestrator.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig) + o.scaleUpOrchestrator.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig, quotasTrackerFactory) } // Provision returns success if there is, or has just been requested, sufficient capacity in the cluster for pods from ProvisioningRequest. diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go index b6cd4f42fee2..3243d9e4111c 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go @@ -35,6 +35,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -76,6 +77,7 @@ func (o *checkCapacityProvClass) Initialize( estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, schedulingSimulator *scheduling.HintingSimulator, + quotasTrackerFactory *resourcequotas.TrackerFactory, ) { o.autoscalingCtx = autoscalingCtx o.schedulingSimulator = schedulingSimulator diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index a8907b8b45f5..f43d5f32eeb4 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -26,6 +26,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -39,7 +40,7 @@ type ProvisioningClass interface { Provision([]*apiv1.Pod, []*apiv1.Node, []*appsv1.DaemonSet, map[string]*framework.NodeInfo) (*status.ScaleUpStatus, ca_errors.AutoscalerError) Initialize(*ca_context.AutoscalingContext, *ca_processors.AutoscalingProcessors, *clusterstate.ClusterStateRegistry, - estimator.EstimatorBuilder, taints.TaintConfig, *scheduling.HintingSimulator) + estimator.EstimatorBuilder, taints.TaintConfig, *scheduling.HintingSimulator, *resourcequotas.TrackerFactory) } // provReqOrchestrator is an orchestrator that contains orchestrators for all supported Provisioning Classes. @@ -66,12 +67,13 @@ func (o *provReqOrchestrator) Initialize( clusterStateRegistry *clusterstate.ClusterStateRegistry, estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, + quotasTrackerFactory *resourcequotas.TrackerFactory, ) { o.initialized = true o.autoscalingCtx = autoscalingCtx o.injector = scheduling.NewHintingSimulator() for _, mode := range o.provisioningClasses { - mode.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig, o.injector) + mode.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig, o.injector, quotasTrackerFactory) } } diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 730cdcab5e6a..b0af8961c387 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -518,12 +519,15 @@ func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, no injector = provreq.NewFakePodsInjector(client, clocktesting.NewFakePassiveClock(now)) } + quotasTrackerFactory := resourcequotas.NewTrackerFactory(resourcequotas.TrackerOptions{ + QuotaProvider: resourcequotas.NewFakeProvider(nil), + CustomResourcesProcessor: processors.CustomResourcesProcessor, + }) orchestrator := &provReqOrchestrator{ client: client, provisioningClasses: []ProvisioningClass{checkcapacity.New(client, injector), besteffortatomic.New(client)}, } - - orchestrator.Initialize(&autoscalingCtx, processors, clusterState, estimatorBuilder, taints.TaintConfig{}) + orchestrator.Initialize(&autoscalingCtx, processors, clusterState, estimatorBuilder, taints.TaintConfig{}, quotasTrackerFactory) return orchestrator, nodeInfos } diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go index 4d375c136963..b6bf5dcf2fd8 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go @@ -27,6 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -50,16 +51,10 @@ func NewWrapperOrchestrator(provReqOrchestrator scaleup.Orchestrator) *WrapperOr } // Initialize initializes the orchestrator object with required fields. -func (o *WrapperOrchestrator) Initialize( - autoscalingCtx *ca_context.AutoscalingContext, - processors *ca_processors.AutoscalingProcessors, - clusterStateRegistry *clusterstate.ClusterStateRegistry, - estimatorBuilder estimator.EstimatorBuilder, - taintConfig taints.TaintConfig, -) { +func (o *WrapperOrchestrator) Initialize(autoscalingCtx *ca_context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, quotasTrackerFactory *resourcequotas.TrackerFactory) { o.autoscalingCtx = autoscalingCtx - o.podsOrchestrator.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig) - o.provReqOrchestrator.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig) + o.podsOrchestrator.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig, quotasTrackerFactory) + o.provReqOrchestrator.Initialize(autoscalingCtx, processors, clusterStateRegistry, estimatorBuilder, taintConfig, quotasTrackerFactory) } // ScaleUp run scaleUp function for regular pods of pods from ProvisioningRequest. diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go index ca102ce3ef81..81efc5d16bee 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/resourcequotas" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -77,13 +78,7 @@ func (f *fakeScaleUp) ScaleUp( return nil, errors.NewAutoscalerError(errors.InternalError, f.errorMsg) } -func (f *fakeScaleUp) Initialize( - autoscalingCtx *ca_context.AutoscalingContext, - processors *ca_processors.AutoscalingProcessors, - clusterStateRegistry *clusterstate.ClusterStateRegistry, - estimatorBuilder estimator.EstimatorBuilder, - taintConfig taints.TaintConfig, -) { +func (f *fakeScaleUp) Initialize(autoscalingCtx *ca_context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, quotasTrackerFactory *resourcequotas.TrackerFactory) { } func (f *fakeScaleUp) ScaleUpToNodeGroupMinSize( diff --git a/cluster-autoscaler/resourcequotas/provider.go b/cluster-autoscaler/resourcequotas/provider.go index 848c5734e5cd..67a1d046f11a 100644 --- a/cluster-autoscaler/resourcequotas/provider.go +++ b/cluster-autoscaler/resourcequotas/provider.go @@ -33,7 +33,7 @@ type CloudQuotasProvider struct { // Quotas returns the cloud provider's ResourceLimiter, which implements Quota interface. // -// This acts as a compatibility layer with the legacy resource limits system. +// This acts as a compatibility layer with the legacy resource LimitsVal system. func (p *CloudQuotasProvider) Quotas() ([]Quota, error) { rl, err := p.cloudProvider.GetResourceLimiter() if err != nil { @@ -48,3 +48,28 @@ func NewCloudQuotasProvider(cloudProvider cloudprovider.CloudProvider) *CloudQuo cloudProvider: cloudProvider, } } + +// CombinedQuotasProvider wraps other Providers and combines their quotas. +type CombinedQuotasProvider struct { + providers []Provider +} + +// NewCombinedQuotasProvider returns a new CombinedQuotasProvider. +func NewCombinedQuotasProvider(providers []Provider) *CombinedQuotasProvider { + return &CombinedQuotasProvider{ + providers: providers, + } +} + +// Quotas returns a union of quotas from all wrapped providers. +func (p *CombinedQuotasProvider) Quotas() ([]Quota, error) { + var allQuotas []Quota + for _, provider := range p.providers { + quotas, err := provider.Quotas() + if err != nil { + return nil, err + } + allQuotas = append(allQuotas, quotas...) + } + return allQuotas, nil +} diff --git a/cluster-autoscaler/resourcequotas/provider_test.go b/cluster-autoscaler/resourcequotas/provider_test.go index bc4003fd5942..4fd5e925fff8 100644 --- a/cluster-autoscaler/resourcequotas/provider_test.go +++ b/cluster-autoscaler/resourcequotas/provider_test.go @@ -17,6 +17,7 @@ limitations under the License. package resourcequotas import ( + "errors" "testing" "github.com/google/go-cmp/cmp" @@ -44,3 +45,59 @@ func TestCloudLimitersProvider(t *testing.T) { t.Errorf("Limits() mismatch (-want +got):\n%s", diff) } } + +func TestCombinedQuotasProvider(t *testing.T) { + q1 := &FakeQuota{Name: "quota1"} + q2 := &FakeQuota{Name: "quota2"} + q3 := &FakeQuota{Name: "quota3"} + providerErr := errors.New("test error") + + p1 := NewFakeProvider([]Quota{q1}) + p2 := NewFakeProvider([]Quota{q2, q3}) + pErr := NewFailingProvider(providerErr) + + testCases := []struct { + name string + providers []Provider + wantQuotas []Quota + wantErr error + }{ + { + name: "no providers", + providers: []Provider{}, + wantQuotas: nil, + }, + { + name: "one provider", + providers: []Provider{p1}, + wantQuotas: []Quota{q1}, + }, + { + name: "multiple providers", + providers: []Provider{p1, p2}, + wantQuotas: []Quota{q1, q2, q3}, + }, + { + name: "provider with error", + providers: []Provider{p1, pErr}, + wantQuotas: nil, + wantErr: providerErr, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + provider := NewCombinedQuotasProvider(tc.providers) + quotas, err := provider.Quotas() + + if !errors.Is(err, tc.wantErr) { + t.Errorf("Quotas() err mismatch: got %v, want %v", err, tc.wantErr) + } + if diff := cmp.Diff(tc.wantQuotas, quotas, cmp.Comparer(func(q1, q2 Quota) bool { + return q1.ID() == q2.ID() + })); diff != "" { + t.Errorf("Quotas() mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/cluster-autoscaler/resourcequotas/testutils.go b/cluster-autoscaler/resourcequotas/testutils.go index 8acdc43674a4..d1ed54b04377 100644 --- a/cluster-autoscaler/resourcequotas/testutils.go +++ b/cluster-autoscaler/resourcequotas/testutils.go @@ -54,20 +54,53 @@ func (f *fakeCustomResourcesProcessor) GetNodeResourceTargets(context *context.A func (f *fakeCustomResourcesProcessor) CleanUp() { } -type fakeQuota struct { - id string - appliesToFn func(*apiv1.Node) bool - limits resourceList +// FakeQuota is a simple implementation of Quota for testing. +type FakeQuota struct { + Name string + AppliesToFn func(*apiv1.Node) bool + LimitsVal map[string]int64 } -func (f *fakeQuota) ID() string { - return f.id +// ID returns the name of the quota. +func (f *FakeQuota) ID() string { + return f.Name } -func (f *fakeQuota) AppliesTo(node *apiv1.Node) bool { - return f.appliesToFn(node) +// AppliesTo checks if a node applies to the quota, which is determined by the result of `AppliesToFn`. +func (f *FakeQuota) AppliesTo(node *apiv1.Node) bool { + return f.AppliesToFn(node) } -func (f *fakeQuota) Limits() map[string]int64 { - return f.limits +// Limits returns the limits defined by the quota. +func (f *FakeQuota) Limits() map[string]int64 { + return f.LimitsVal +} + +// MatchEveryNode returns true for every passed node. +func MatchEveryNode(_ *apiv1.Node) bool { + return true +} + +// FakeProvider is a fake implementation of Provider for testing. +type FakeProvider struct { + quotas []Quota + err error +} + +// Quotas returns quotas or error explicitly passed to the fake provider. +func (f *FakeProvider) Quotas() ([]Quota, error) { + if f.err != nil { + return nil, f.err + } + return f.quotas, nil +} + +// NewFakeProvider returns a new FakeProvider with hard coded quotas. +func NewFakeProvider(quotas []Quota) *FakeProvider { + return &FakeProvider{quotas: quotas} +} + +// NewFailingProvider returns a new FakeProvider with an error. +func NewFailingProvider(err error) *FakeProvider { + return &FakeProvider{err: err} } diff --git a/cluster-autoscaler/resourcequotas/tracker.go b/cluster-autoscaler/resourcequotas/tracker.go index dd5fdf57acdc..7ae005ed1fba 100644 --- a/cluster-autoscaler/resourcequotas/tracker.go +++ b/cluster-autoscaler/resourcequotas/tracker.go @@ -36,7 +36,7 @@ type Quota interface { ID() string // AppliesTo returns true if the quota applies to the given node. AppliesTo(node *corev1.Node) bool - // Limits returns the resource limits defined by the quota. + // Limits returns the resource LimitsVal defined by the quota. Limits() map[string]int64 } diff --git a/cluster-autoscaler/resourcequotas/tracker_test.go b/cluster-autoscaler/resourcequotas/tracker_test.go index 868308b7b810..2eaa91740ff8 100644 --- a/cluster-autoscaler/resourcequotas/tracker_test.go +++ b/cluster-autoscaler/resourcequotas/tracker_test.go @@ -39,10 +39,10 @@ func TestCheckDelta(t *testing.T) { wantExceeded bool }{ { - name: "delta fits within limits", + name: "delta fits within LimitsVal", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 10, "memory": 1000, "nodes": 5}, }, }), @@ -56,7 +56,7 @@ func TestCheckDelta(t *testing.T) { name: "delta exceeds one resource limit", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 1, "memory": 1000, "nodes": 5}, }, }), @@ -71,10 +71,10 @@ func TestCheckDelta(t *testing.T) { wantExceeded: true, }, { - name: "delta exceeds multiple resource limits", + name: "delta exceeds multiple resource LimitsVal", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 1, "memory": 300, "nodes": 5}, }, }), @@ -137,7 +137,7 @@ func TestCheckDelta(t *testing.T) { name: "no matching quotas", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return false }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return false }}, limitsLeft: resourceList{"cpu": 1, "memory": 100, "nodes": 1}, }, }), @@ -148,10 +148,10 @@ func TestCheckDelta(t *testing.T) { }, }, { - name: "resource in limits but not in the node", + name: "resource in LimitsVal but not in the node", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 4, "memory": 32 * units.GiB, "gpu": 2}, }, }), @@ -162,7 +162,7 @@ func TestCheckDelta(t *testing.T) { }, }, { - name: "resource in the node but not in the limits", + name: "resource in the node but not in the LimitsVal", tracker: newTracker(&fakeCustomResourcesProcessor{NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget { return []customresources.CustomResourceTarget{ { @@ -172,7 +172,7 @@ func TestCheckDelta(t *testing.T) { } }}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 4, "memory": 32 * units.GiB}, }, }), @@ -216,7 +216,7 @@ func TestApplyDelta(t *testing.T) { name: "delta applied successfully", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 10, "memory": 1000, "nodes": 5}, }, }), @@ -233,7 +233,7 @@ func TestApplyDelta(t *testing.T) { name: "partial delta calculated, nothing applied", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 3, "memory": 1000, "nodes": 5}, }, }), @@ -250,10 +250,10 @@ func TestApplyDelta(t *testing.T) { }, }, { - name: "delta not applied because it exceeds limits", + name: "delta not applied because it exceeds LimitsVal", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 1, "memory": 100, "nodes": 5}, }, }), @@ -273,7 +273,7 @@ func TestApplyDelta(t *testing.T) { name: "applied delta results in zero limit", tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ { - quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + quota: &FakeQuota{Name: "limiter1", AppliesToFn: func(*apiv1.Node) bool { return true }}, limitsLeft: resourceList{"cpu": 2, "memory": 500, "nodes": 10}, }, }), diff --git a/cluster-autoscaler/resourcequotas/usage_test.go b/cluster-autoscaler/resourcequotas/usage_test.go index fc8160eccfb6..11da78f36e43 100644 --- a/cluster-autoscaler/resourcequotas/usage_test.go +++ b/cluster-autoscaler/resourcequotas/usage_test.go @@ -44,9 +44,9 @@ func TestCalculateUsages(t *testing.T) { test.BuildTestNode("n3", 3000, 8000), }, quotas: []Quota{ - &fakeQuota{ - id: "cluster-wide", - appliesToFn: includeAll, + &FakeQuota{ + Name: "cluster-wide", + AppliesToFn: includeAll, }, }, wantUsages: map[string]resourceList{ @@ -65,13 +65,13 @@ func TestCalculateUsages(t *testing.T) { addLabel(test.BuildTestNode("n3", 3000, 8000), "pool", "a"), }, quotas: []Quota{ - &fakeQuota{ - id: "pool-a", - appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, + &FakeQuota{ + Name: "pool-a", + AppliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, }, - &fakeQuota{ - id: "pool-b", - appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, + &FakeQuota{ + Name: "pool-b", + AppliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, }, }, wantUsages: map[string]resourceList{ @@ -95,9 +95,9 @@ func TestCalculateUsages(t *testing.T) { test.BuildTestNode("n3", 3000, 8000), }, quotas: []Quota{ - &fakeQuota{ - id: "cluster-wide", - appliesToFn: includeAll, + &FakeQuota{ + Name: "cluster-wide", + AppliesToFn: includeAll, }, }, nodeFilter: func(node *apiv1.Node) bool { return node.Name == "n2" }, @@ -115,9 +115,9 @@ func TestCalculateUsages(t *testing.T) { test.BuildTestNode("n1", 1000, 2000), }, quotas: []Quota{ - &fakeQuota{ - id: "no-match", - appliesToFn: func(node *apiv1.Node) bool { return false }, + &FakeQuota{ + Name: "no-match", + AppliesToFn: func(node *apiv1.Node) bool { return false }, }, }, wantUsages: map[string]resourceList{ @@ -131,9 +131,9 @@ func TestCalculateUsages(t *testing.T) { test.BuildTestNode("n2", 2000, 4000), }, quotas: []Quota{ - &fakeQuota{ - id: "cluster-wide", - appliesToFn: includeAll, + &FakeQuota{ + Name: "cluster-wide", + AppliesToFn: includeAll, }, }, customTargets: map[string][]customresources.CustomResourceTarget{ @@ -158,13 +158,13 @@ func TestCalculateUsages(t *testing.T) { addLabel(test.BuildTestNode("n3", 3000, 8000), "pool", "a"), }, quotas: []Quota{ - &fakeQuota{ - id: "pool-a", - appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, + &FakeQuota{ + Name: "pool-a", + AppliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, }, - &fakeQuota{ - id: "pool-b", - appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, + &FakeQuota{ + Name: "pool-b", + AppliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, }, }, nodeFilter: func(node *apiv1.Node) bool { return node.Name == "n3" },