Skip to content

Commit f3e88d9

Browse files
committed
Cache node deltas in resource quotas
1 parent 4177783 commit f3e88d9

File tree

7 files changed

+307
-138
lines changed

7 files changed

+307
-138
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcequotas
18+
19+
import (
20+
"fmt"
21+
22+
corev1 "k8s.io/api/core/v1"
23+
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
24+
cacontext "k8s.io/autoscaler/cluster-autoscaler/context"
25+
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
26+
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
27+
)
28+
29+
type nodeResourcesCache struct {
30+
crp customresources.CustomResourcesProcessor
31+
resources map[string]resourceList
32+
}
33+
34+
func newNodeResourcesCache(crp customresources.CustomResourcesProcessor) *nodeResourcesCache {
35+
return &nodeResourcesCache{
36+
crp: crp,
37+
resources: make(map[string]resourceList),
38+
}
39+
}
40+
41+
func (nc *nodeResourcesCache) nodeResources(autoscalingCtx *cacontext.AutoscalingContext, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) {
42+
if nodeGroup != nil {
43+
if delta, ok := nc.resources[nodeGroup.Id()]; ok {
44+
return delta, nil
45+
}
46+
}
47+
delta, err := nodeResources(autoscalingCtx, nc.crp, node, nodeGroup)
48+
if err != nil {
49+
return nil, err
50+
}
51+
if nodeGroup != nil {
52+
nc.resources[nodeGroup.Id()] = delta
53+
}
54+
return delta, nil
55+
}
56+
57+
// nodeResources calculates the amount of resources that a node contains.
58+
func nodeResources(autoscalingCtx *cacontext.AutoscalingContext, crp customresources.CustomResourcesProcessor, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) {
59+
// TODO: storage?
60+
nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(node)
61+
nodeResources := resourceList{
62+
string(corev1.ResourceCPU): nodeCPU,
63+
string(corev1.ResourceMemory): nodeMemory,
64+
ResourceNodes: 1,
65+
}
66+
67+
resourceTargets, err := crp.GetNodeResourceTargets(autoscalingCtx, node, nodeGroup)
68+
if err != nil {
69+
return nil, fmt.Errorf("failed to get custom resources: %w", err)
70+
}
71+
72+
for _, resourceTarget := range resourceTargets {
73+
nodeResources[resourceTarget.ResourceType] = resourceTarget.ResourceCount
74+
}
75+
76+
return nodeResources, nil
77+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcequotas
18+
19+
import (
20+
"testing"
21+
22+
"github.com/google/go-cmp/cmp"
23+
"github.com/stretchr/testify/mock"
24+
apiv1 "k8s.io/api/core/v1"
25+
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
26+
cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
27+
"k8s.io/autoscaler/cluster-autoscaler/context"
28+
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
29+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
30+
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
31+
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
32+
)
33+
34+
type mockCustomResourcesProcessor struct {
35+
mock.Mock
36+
}
37+
38+
func (m *mockCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(_ *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, _ *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) {
39+
return allNodes, readyNodes
40+
}
41+
42+
func (m *mockCustomResourcesProcessor) GetNodeResourceTargets(autoscalingCtx *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]customresources.CustomResourceTarget, errors.AutoscalerError) {
43+
args := m.Called(autoscalingCtx, node, nodeGroup)
44+
return args.Get(0).([]customresources.CustomResourceTarget), nil
45+
}
46+
47+
func (m *mockCustomResourcesProcessor) CleanUp() {
48+
return
49+
}
50+
51+
func TestNodeResourcesCache(t *testing.T) {
52+
node := test.BuildTestNode("n1", 1000, 2000)
53+
autoscalingCtx := &context.AutoscalingContext{}
54+
ng1 := cptest.NewTestNodeGroup("ng1", 1, 10, 1, true, false, "n1-template", nil, nil)
55+
ng2 := cptest.NewTestNodeGroup("ng2", 1, 10, 1, true, false, "n2-template", nil, nil)
56+
resourceTargets := []customresources.CustomResourceTarget{
57+
{ResourceType: "gpu", ResourceCount: 1},
58+
}
59+
wantResources := resourceList{"cpu": 1, "memory": 2000, "nodes": 1, "gpu": 1}
60+
61+
type nodeResourcesCall struct {
62+
node *apiv1.Node
63+
nodeGroup cloudprovider.NodeGroup
64+
}
65+
66+
testCases := []struct {
67+
name string
68+
calls []nodeResourcesCall
69+
setupCRPExpectations func(*mock.Mock)
70+
}{
71+
{
72+
name: "cache hit",
73+
calls: []nodeResourcesCall{
74+
{node: node, nodeGroup: ng1},
75+
{node: node, nodeGroup: ng1},
76+
},
77+
setupCRPExpectations: func(m *mock.Mock) {
78+
m.On("GetNodeResourceTargets", autoscalingCtx, node, ng1).Return(resourceTargets, nil).Once()
79+
},
80+
},
81+
{
82+
name: "cache miss on different node group",
83+
calls: []nodeResourcesCall{
84+
{node: node, nodeGroup: ng1},
85+
{node: node, nodeGroup: ng2},
86+
},
87+
setupCRPExpectations: func(m *mock.Mock) {
88+
m.On("GetNodeResourceTargets", autoscalingCtx, node, ng1).Return(resourceTargets, nil).Once().
89+
On("GetNodeResourceTargets", autoscalingCtx, node, ng2).Return(resourceTargets, nil).Once()
90+
},
91+
},
92+
{
93+
name: "no node group bypasses cache",
94+
calls: []nodeResourcesCall{
95+
{node: node, nodeGroup: nil},
96+
{node: node, nodeGroup: nil},
97+
},
98+
setupCRPExpectations: func(m *mock.Mock) {
99+
m.On("GetNodeResourceTargets", autoscalingCtx, node, nil).Return(resourceTargets, nil).Twice()
100+
},
101+
},
102+
}
103+
for _, tc := range testCases {
104+
t.Run(tc.name, func(t *testing.T) {
105+
mockCRP := &mockCustomResourcesProcessor{}
106+
tc.setupCRPExpectations(&mockCRP.Mock)
107+
nc := newNodeResourcesCache(mockCRP)
108+
for _, call := range tc.calls {
109+
resources, err := nc.nodeResources(autoscalingCtx, call.node, call.nodeGroup)
110+
if err != nil {
111+
t.Fatalf("nodeResources unexpected error: %v", err)
112+
}
113+
if diff := cmp.Diff(wantResources, resources); diff != "" {
114+
t.Errorf("nodeResources mismatch (-want, +got):\n%s", diff)
115+
}
116+
}
117+
})
118+
}
119+
}
120+
121+
func TestNodeResources(t *testing.T) {
122+
testCases := []struct {
123+
name string
124+
node *apiv1.Node
125+
crp customresources.CustomResourcesProcessor
126+
wantDelta resourceList
127+
}{
128+
{
129+
name: "node just with CPU and memory",
130+
node: test.BuildTestNode("test", 1000, 2048),
131+
crp: &fakeCustomResourcesProcessor{},
132+
wantDelta: resourceList{
133+
"cpu": 1,
134+
"memory": 2048,
135+
"nodes": 1,
136+
},
137+
},
138+
{
139+
// nodes should not have milliCPUs in the capacity, so we round it up
140+
// to the nearest integer.
141+
name: "node just with CPU and memory, milli cores rounded up",
142+
node: test.BuildTestNode("test", 2500, 4096),
143+
crp: &fakeCustomResourcesProcessor{},
144+
wantDelta: resourceList{
145+
"cpu": 3,
146+
"memory": 4096,
147+
"nodes": 1,
148+
},
149+
},
150+
{
151+
name: "node with custom resources",
152+
node: test.BuildTestNode("test", 1000, 2048),
153+
crp: &fakeCustomResourcesProcessor{NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget {
154+
return []customresources.CustomResourceTarget{
155+
{
156+
ResourceType: "gpu",
157+
ResourceCount: 1,
158+
},
159+
}
160+
}},
161+
wantDelta: resourceList{
162+
"cpu": 1,
163+
"memory": 2048,
164+
"gpu": 1,
165+
"nodes": 1,
166+
},
167+
},
168+
}
169+
for _, tc := range testCases {
170+
t.Run(tc.name, func(t *testing.T) {
171+
ctx := &context.AutoscalingContext{}
172+
delta, err := nodeResources(ctx, tc.crp, tc.node, nil)
173+
if err != nil {
174+
t.Errorf("nodeResources: unexpected error: %v", err)
175+
}
176+
if diff := cmp.Diff(tc.wantDelta, delta); diff != "" {
177+
t.Errorf("delta mismatch (-want +got):\n%s", diff)
178+
}
179+
})
180+
}
181+
}

cluster-autoscaler/resourcequotas/factory.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import (
2424

2525
// TrackerFactory builds quota trackers.
2626
type TrackerFactory struct {
27-
crp customresources.CustomResourcesProcessor
28-
quotasProvider Provider
29-
usageCalculator *usageCalculator
27+
crp customresources.CustomResourcesProcessor
28+
quotasProvider Provider
29+
nodeFilter NodeFilter
3030
}
3131

3232
// TrackerOptions stores configuration for quota tracking.
@@ -38,11 +38,10 @@ type TrackerOptions struct {
3838

3939
// NewTrackerFactory creates a new TrackerFactory.
4040
func NewTrackerFactory(opts TrackerOptions) *TrackerFactory {
41-
uc := newUsageCalculator(opts.CustomResourcesProcessor, opts.NodeFilter)
4241
return &TrackerFactory{
43-
crp: opts.CustomResourcesProcessor,
44-
quotasProvider: opts.QuotaProvider,
45-
usageCalculator: uc,
42+
crp: opts.CustomResourcesProcessor,
43+
quotasProvider: opts.QuotaProvider,
44+
nodeFilter: opts.NodeFilter,
4645
}
4746
}
4847

@@ -56,7 +55,9 @@ func (f *TrackerFactory) NewQuotasTracker(autoscalingCtx *context.AutoscalingCon
5655
if err != nil {
5756
return nil, err
5857
}
59-
usages, err := f.usageCalculator.calculateUsages(autoscalingCtx, nodes, quotas)
58+
nc := newNodeResourcesCache(f.crp)
59+
uc := newUsageCalculator(f.nodeFilter, nc)
60+
usages, err := uc.calculateUsages(autoscalingCtx, nodes, quotas)
6061
if err != nil {
6162
return nil, err
6263
}
@@ -73,6 +74,6 @@ func (f *TrackerFactory) NewQuotasTracker(autoscalingCtx *context.AutoscalingCon
7374
limitsLeft: limitsLeft,
7475
})
7576
}
76-
tracker := newTracker(f.crp, quotaStatuses)
77+
tracker := newTracker(quotaStatuses, nc)
7778
return tracker, nil
7879
}

cluster-autoscaler/resourcequotas/tracker.go

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,9 @@ limitations under the License.
1717
package resourcequotas
1818

1919
import (
20-
"fmt"
21-
2220
corev1 "k8s.io/api/core/v1"
2321
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2422
"k8s.io/autoscaler/cluster-autoscaler/context"
25-
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
26-
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
2723
)
2824

2925
const (
@@ -45,8 +41,8 @@ type resourceList map[string]int64
4541

4642
// Tracker tracks resource quotas.
4743
type Tracker struct {
48-
crp customresources.CustomResourcesProcessor
4944
quotaStatuses []*quotaStatus
45+
nodeCache *nodeResourcesCache
5046
}
5147

5248
type quotaStatus struct {
@@ -55,10 +51,10 @@ type quotaStatus struct {
5551
}
5652

5753
// newTracker creates a new Tracker.
58-
func newTracker(crp customresources.CustomResourcesProcessor, quotaStatuses []*quotaStatus) *Tracker {
54+
func newTracker(quotaStatuses []*quotaStatus, nodeCache *nodeResourcesCache) *Tracker {
5955
return &Tracker{
60-
crp: crp,
6156
quotaStatuses: quotaStatuses,
57+
nodeCache: nodeCache,
6258
}
6359
}
6460

@@ -67,7 +63,7 @@ func newTracker(crp customresources.CustomResourcesProcessor, quotaStatuses []*q
6763
func (t *Tracker) ApplyDelta(
6864
autoscalingCtx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int,
6965
) (*CheckDeltaResult, error) {
70-
delta, err := nodeResources(autoscalingCtx, t.crp, node, nodeGroup)
66+
delta, err := t.nodeCache.nodeResources(autoscalingCtx, node, nodeGroup)
7167
if err != nil {
7268
return nil, err
7369
}
@@ -100,8 +96,7 @@ func (t *Tracker) ApplyDelta(
10096
func (t *Tracker) CheckDelta(
10197
autoscalingCtx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int,
10298
) (*CheckDeltaResult, error) {
103-
// TODO: cache deltas
104-
delta, err := nodeResources(autoscalingCtx, t.crp, node, nodeGroup)
99+
delta, err := t.nodeCache.nodeResources(autoscalingCtx, node, nodeGroup)
105100
if err != nil {
106101
return nil, err
107102
}
@@ -174,25 +169,3 @@ type ExceededQuota struct {
174169
ID string
175170
ExceededResources []string
176171
}
177-
178-
// nodeResources calculates the amount of resources that will be used from the cluster when creating a node.
179-
func nodeResources(autoscalingCtx *context.AutoscalingContext, crp customresources.CustomResourcesProcessor, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) {
180-
// TODO: storage?
181-
nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(node)
182-
nodeResources := resourceList{
183-
string(corev1.ResourceCPU): nodeCPU,
184-
string(corev1.ResourceMemory): nodeMemory,
185-
ResourceNodes: 1,
186-
}
187-
188-
resourceTargets, err := crp.GetNodeResourceTargets(autoscalingCtx, node, nodeGroup)
189-
if err != nil {
190-
return nil, fmt.Errorf("failed to get custom resources: %w", err)
191-
}
192-
193-
for _, resourceTarget := range resourceTargets {
194-
nodeResources[resourceTarget.ResourceType] = resourceTarget.ResourceCount
195-
}
196-
197-
return nodeResources, nil
198-
}

0 commit comments

Comments
 (0)