Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions cluster-autoscaler/resourcequotas/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resourcequotas

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
cacontext "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
)

type nodeResourcesCache struct {
crp customresources.CustomResourcesProcessor
resources map[string]resourceList
}

func newNodeResourcesCache(crp customresources.CustomResourcesProcessor) *nodeResourcesCache {
return &nodeResourcesCache{
crp: crp,
resources: make(map[string]resourceList),
}
}

func (nc *nodeResourcesCache) nodeResources(autoscalingCtx *cacontext.AutoscalingContext, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) {
if nodeGroup != nil {
if delta, ok := nc.resources[nodeGroup.Id()]; ok {
return delta, nil
}
}
delta, err := nodeResources(autoscalingCtx, nc.crp, node, nodeGroup)
if err != nil {
return nil, err
}
if nodeGroup != nil {
nc.resources[nodeGroup.Id()] = delta
}
return delta, nil
}

// nodeResources calculates the amount of resources that a node contains.
func nodeResources(autoscalingCtx *cacontext.AutoscalingContext, crp customresources.CustomResourcesProcessor, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) {
// TODO: storage?
nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(node)
nodeResources := resourceList{
string(corev1.ResourceCPU): nodeCPU,
string(corev1.ResourceMemory): nodeMemory,
ResourceNodes: 1,
}

resourceTargets, err := crp.GetNodeResourceTargets(autoscalingCtx, node, nodeGroup)
if err != nil {
return nil, fmt.Errorf("failed to get custom resources: %w", err)
}

for _, resourceTarget := range resourceTargets {
nodeResources[resourceTarget.ResourceType] = resourceTarget.ResourceCount
}

return nodeResources, nil
}
181 changes: 181 additions & 0 deletions cluster-autoscaler/resourcequotas/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resourcequotas

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)

type mockCustomResourcesProcessor struct {
mock.Mock
}

func (m *mockCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(_ *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, _ *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) {
return allNodes, readyNodes
}

func (m *mockCustomResourcesProcessor) GetNodeResourceTargets(autoscalingCtx *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]customresources.CustomResourceTarget, errors.AutoscalerError) {
args := m.Called(autoscalingCtx, node, nodeGroup)
return args.Get(0).([]customresources.CustomResourceTarget), nil
}

func (m *mockCustomResourcesProcessor) CleanUp() {
return
}

func TestNodeResourcesCache(t *testing.T) {
node := test.BuildTestNode("n1", 1000, 2000)
autoscalingCtx := &context.AutoscalingContext{}
ng1 := cptest.NewTestNodeGroup("ng1", 1, 10, 1, true, false, "n1-template", nil, nil)
ng2 := cptest.NewTestNodeGroup("ng2", 1, 10, 1, true, false, "n2-template", nil, nil)
resourceTargets := []customresources.CustomResourceTarget{
{ResourceType: "gpu", ResourceCount: 1},
}
wantResources := resourceList{"cpu": 1, "memory": 2000, "nodes": 1, "gpu": 1}

type nodeResourcesCall struct {
node *apiv1.Node
nodeGroup cloudprovider.NodeGroup
}

testCases := []struct {
name string
calls []nodeResourcesCall
setupCRPExpectations func(*mock.Mock)
}{
{
name: "cache hit",
calls: []nodeResourcesCall{
{node: node, nodeGroup: ng1},
{node: node, nodeGroup: ng1},
},
setupCRPExpectations: func(m *mock.Mock) {
m.On("GetNodeResourceTargets", autoscalingCtx, node, ng1).Return(resourceTargets, nil).Once()
},
},
{
name: "cache miss on different node group",
calls: []nodeResourcesCall{
{node: node, nodeGroup: ng1},
{node: node, nodeGroup: ng2},
},
setupCRPExpectations: func(m *mock.Mock) {
m.On("GetNodeResourceTargets", autoscalingCtx, node, ng1).Return(resourceTargets, nil).Once().
On("GetNodeResourceTargets", autoscalingCtx, node, ng2).Return(resourceTargets, nil).Once()
},
},
{
name: "no node group bypasses cache",
calls: []nodeResourcesCall{
{node: node, nodeGroup: nil},
{node: node, nodeGroup: nil},
},
setupCRPExpectations: func(m *mock.Mock) {
m.On("GetNodeResourceTargets", autoscalingCtx, node, nil).Return(resourceTargets, nil).Twice()
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockCRP := &mockCustomResourcesProcessor{}
tc.setupCRPExpectations(&mockCRP.Mock)
nc := newNodeResourcesCache(mockCRP)
for _, call := range tc.calls {
resources, err := nc.nodeResources(autoscalingCtx, call.node, call.nodeGroup)
if err != nil {
t.Fatalf("nodeResources unexpected error: %v", err)
}
if diff := cmp.Diff(wantResources, resources); diff != "" {
t.Errorf("nodeResources mismatch (-want, +got):\n%s", diff)
}
}
})
}
}

func TestNodeResources(t *testing.T) {
testCases := []struct {
name string
node *apiv1.Node
crp customresources.CustomResourcesProcessor
wantDelta resourceList
}{
{
name: "node just with CPU and memory",
node: test.BuildTestNode("test", 1000, 2048),
crp: &fakeCustomResourcesProcessor{},
wantDelta: resourceList{
"cpu": 1,
"memory": 2048,
"nodes": 1,
},
},
{
// nodes should not have milliCPUs in the capacity, so we round it up
// to the nearest integer.
name: "node just with CPU and memory, milli cores rounded up",
node: test.BuildTestNode("test", 2500, 4096),
crp: &fakeCustomResourcesProcessor{},
wantDelta: resourceList{
"cpu": 3,
"memory": 4096,
"nodes": 1,
},
},
{
name: "node with custom resources",
node: test.BuildTestNode("test", 1000, 2048),
crp: &fakeCustomResourcesProcessor{NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget {
return []customresources.CustomResourceTarget{
{
ResourceType: "gpu",
ResourceCount: 1,
},
}
}},
wantDelta: resourceList{
"cpu": 1,
"memory": 2048,
"gpu": 1,
"nodes": 1,
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := &context.AutoscalingContext{}
delta, err := nodeResources(ctx, tc.crp, tc.node, nil)
if err != nil {
t.Errorf("nodeResources: unexpected error: %v", err)
}
if diff := cmp.Diff(tc.wantDelta, delta); diff != "" {
t.Errorf("delta mismatch (-want +got):\n%s", diff)
}
})
}
}
19 changes: 10 additions & 9 deletions cluster-autoscaler/resourcequotas/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (

// TrackerFactory builds quota trackers.
type TrackerFactory struct {
crp customresources.CustomResourcesProcessor
quotasProvider Provider
usageCalculator *usageCalculator
crp customresources.CustomResourcesProcessor
quotasProvider Provider
nodeFilter NodeFilter
}

// TrackerOptions stores configuration for quota tracking.
Expand All @@ -38,11 +38,10 @@ type TrackerOptions struct {

// NewTrackerFactory creates a new TrackerFactory.
func NewTrackerFactory(opts TrackerOptions) *TrackerFactory {
uc := newUsageCalculator(opts.CustomResourcesProcessor, opts.NodeFilter)
return &TrackerFactory{
crp: opts.CustomResourcesProcessor,
quotasProvider: opts.QuotaProvider,
usageCalculator: uc,
crp: opts.CustomResourcesProcessor,
quotasProvider: opts.QuotaProvider,
nodeFilter: opts.NodeFilter,
}
}

Expand All @@ -56,7 +55,9 @@ func (f *TrackerFactory) NewQuotasTracker(autoscalingCtx *context.AutoscalingCon
if err != nil {
return nil, err
}
usages, err := f.usageCalculator.calculateUsages(autoscalingCtx, nodes, quotas)
nc := newNodeResourcesCache(f.crp)
uc := newUsageCalculator(f.nodeFilter, nc)
usages, err := uc.calculateUsages(autoscalingCtx, nodes, quotas)
if err != nil {
return nil, err
}
Expand All @@ -73,6 +74,6 @@ func (f *TrackerFactory) NewQuotasTracker(autoscalingCtx *context.AutoscalingCon
limitsLeft: limitsLeft,
})
}
tracker := newTracker(f.crp, quotaStatuses)
tracker := newTracker(quotaStatuses, nc)
return tracker, nil
}
37 changes: 5 additions & 32 deletions cluster-autoscaler/resourcequotas/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ limitations under the License.
package resourcequotas

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
)

const (
Expand All @@ -45,8 +41,8 @@ type resourceList map[string]int64

// Tracker tracks resource quotas.
type Tracker struct {
crp customresources.CustomResourcesProcessor
quotaStatuses []*quotaStatus
nodeCache *nodeResourcesCache
}

type quotaStatus struct {
Expand All @@ -55,10 +51,10 @@ type quotaStatus struct {
}

// newTracker creates a new Tracker.
func newTracker(crp customresources.CustomResourcesProcessor, quotaStatuses []*quotaStatus) *Tracker {
func newTracker(quotaStatuses []*quotaStatus, nodeCache *nodeResourcesCache) *Tracker {
return &Tracker{
crp: crp,
quotaStatuses: quotaStatuses,
nodeCache: nodeCache,
}
}

Expand All @@ -67,7 +63,7 @@ func newTracker(crp customresources.CustomResourcesProcessor, quotaStatuses []*q
func (t *Tracker) ApplyDelta(
autoscalingCtx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int,
) (*CheckDeltaResult, error) {
delta, err := nodeResources(autoscalingCtx, t.crp, node, nodeGroup)
delta, err := t.nodeCache.nodeResources(autoscalingCtx, node, nodeGroup)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -100,8 +96,7 @@ func (t *Tracker) ApplyDelta(
func (t *Tracker) CheckDelta(
autoscalingCtx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int,
) (*CheckDeltaResult, error) {
// TODO: cache deltas
delta, err := nodeResources(autoscalingCtx, t.crp, node, nodeGroup)
delta, err := t.nodeCache.nodeResources(autoscalingCtx, node, nodeGroup)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,25 +169,3 @@ type ExceededQuota struct {
ID string
ExceededResources []string
}

// nodeResources calculates the amount of resources that will be used from the cluster when creating a node.
func nodeResources(autoscalingCtx *context.AutoscalingContext, crp customresources.CustomResourcesProcessor, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) {
// TODO: storage?
nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(node)
nodeResources := resourceList{
string(corev1.ResourceCPU): nodeCPU,
string(corev1.ResourceMemory): nodeMemory,
ResourceNodes: 1,
}

resourceTargets, err := crp.GetNodeResourceTargets(autoscalingCtx, node, nodeGroup)
if err != nil {
return nil, fmt.Errorf("failed to get custom resources: %w", err)
}

for _, resourceTarget := range resourceTargets {
nodeResources[resourceTarget.ResourceType] = resourceTarget.ResourceCount
}

return nodeResources, nil
}
Loading