Skip to content

Commit 4172b84

Browse files
committed
Allow atomic scale down if number of candidates is equal to number of registered nodes
1 parent af53141 commit 4172b84

File tree

9 files changed

+429
-111
lines changed

9 files changed

+429
-111
lines changed

cluster-autoscaler/cloudprovider/test/test_cloud_provider.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,9 @@ func (tng *TestNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
485485
id := tng.id
486486
tng.targetSize -= len(nodes)
487487
tng.Unlock()
488-
if tng.opts != nil && tng.opts.ZeroOrMaxNodeScaling && tng.targetSize != 0 {
488+
allNodes, _ := tng.Nodes()
489+
currentSize := len(allNodes)
490+
if tng.opts != nil && tng.opts.ZeroOrMaxNodeScaling && tng.targetSize != 0 && currentSize != len(nodes) {
489491
return fmt.Errorf("TestNodeGroup: attempted to partially scale down a node group that should be scaled down atomically")
490492
}
491493
for _, node := range nodes {

cluster-autoscaler/core/scaledown/budgets/budgets.go

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ package budgets
1818

1919
import (
2020
"reflect"
21+
"strconv"
2122

2223
apiv1 "k8s.io/api/core/v1"
2324
"k8s.io/klog/v2"
2425

2526
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2627
"k8s.io/autoscaler/cluster-autoscaler/context"
2728
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
29+
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
30+
"k8s.io/autoscaler/cluster-autoscaler/utils/annotations"
2831
)
2932

3033
// NodeGroupView is a subset of nodes from a given NodeGroup
@@ -65,6 +68,12 @@ func (bp *ScaleDownBudgetProcessor) CropNodes(as scaledown.ActuationStatus, empt
6568
var err error
6669
canOverflow := true
6770
emptyToDelete, drainToDelete = []*NodeGroupView{}, []*NodeGroupView{}
71+
72+
allNodes, err := allNodes(bp.ctx.ClusterSnapshot)
73+
if err != nil {
74+
klog.Errorf("failed to read all nodes from the cluster snapshot for nodes cropping, err: %s", err)
75+
}
76+
6877
for _, bucket := range emptyAtomic {
6978
drainNodes := []*apiv1.Node{}
7079
drainBucket, drainFound := drainAtomicMap[bucket.Group.Id()]
@@ -90,9 +99,22 @@ func (bp *ScaleDownBudgetProcessor) CropNodes(as scaledown.ActuationStatus, empt
9099
continue
91100
}
92101
bucket.BatchSize = targetSize
93-
if len(bucket.Nodes)+len(drainNodes) != targetSize {
102+
103+
// If available we consider only registered nodes for the scale down,
104+
// excluding failed instances unable to register as K8s nodes. Waiting
105+
// for such instances could block scale down indefinitely.
106+
registeredNodes, err := bp.getAllRegisteredNodesForNodeGroup(allNodes, bucket.Group)
107+
if err != nil {
108+
klog.Errorf("failed to get registered nodes for node group %s: %v", bucket.Group.Id(), err)
109+
}
110+
currentSize := len(registeredNodes)
111+
if len(bucket.Nodes) == currentSize {
112+
bucket.BatchSize = currentSize
113+
}
114+
115+
if len(bucket.Nodes)+len(drainNodes) != targetSize && len(bucket.Nodes)+len(drainNodes) != currentSize {
94116
// We can't only partially scale down atomic group.
95-
klog.Errorf("not scaling atomic group %v because not all nodes are candidates, target size: %v, empty: %v, drainable: %v", bucket.Group.Id(), targetSize, len(bucket.Nodes), len(drainNodes))
117+
klog.Errorf("not scaling atomic group %v because not all nodes are candidates, target size: %v, current size: %v empty: %v, drainable: %v", bucket.Group.Id(), targetSize, currentSize, len(bucket.Nodes), len(drainNodes))
96118
continue
97119
}
98120
emptyToDelete = append(emptyToDelete, bucket)
@@ -127,9 +149,22 @@ func (bp *ScaleDownBudgetProcessor) CropNodes(as scaledown.ActuationStatus, empt
127149
continue
128150
}
129151
bucket.BatchSize = targetSize
130-
if len(bucket.Nodes) != targetSize {
152+
153+
// If available we consider only registered nodes for the scale down,
154+
// excluding failed instances unable to register as K8s nodes. Waiting
155+
// for such instances could block scale down indefinitely.
156+
registeredNodes, err := bp.getAllRegisteredNodesForNodeGroup(allNodes, bucket.Group)
157+
if err != nil {
158+
klog.Errorf("Failed to get registered nodes for node group %s: %v", bucket.Group.Id(), err)
159+
}
160+
currentSize := len(registeredNodes)
161+
if len(bucket.Nodes) == currentSize {
162+
bucket.BatchSize = currentSize
163+
}
164+
165+
if len(bucket.Nodes) != targetSize && len(bucket.Nodes) != currentSize {
131166
// We can't only partially scale down atomic group.
132-
klog.Errorf("not scaling atomic group %v because not all nodes are candidates, target size: %v, empty: none, drainable: %v", bucket.Group.Id(), targetSize, len(bucket.Nodes))
167+
klog.Errorf("not scaling atomic group %v because not all nodes are candidates, target size: %v, current size: %v, empty: none, drainable: %v", bucket.Group.Id(), targetSize, currentSize, len(bucket.Nodes))
133168
continue
134169
}
135170
drainToDelete = append(drainToDelete, bucket)
@@ -210,3 +245,39 @@ func (bp *ScaleDownBudgetProcessor) categorize(groups []*NodeGroupView) (individ
210245
}
211246
return individual, atomic
212247
}
248+
249+
func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) {
250+
nodeInfos, err := s.ListNodeInfos()
251+
if err != nil {
252+
// This should never happen, List() returns err only because scheduler interface requires it.
253+
return nil, err
254+
}
255+
nodes := make([]*apiv1.Node, len(nodeInfos))
256+
for i, ni := range nodeInfos {
257+
nodes[i] = ni.Node()
258+
}
259+
return nodes, nil
260+
}
261+
262+
func (bp *ScaleDownBudgetProcessor) getAllRegisteredNodesForNodeGroup(allNodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]*apiv1.Node, error) {
263+
allNodesInNodeGroup, err := nodeGroup.Nodes()
264+
if err != nil {
265+
return nil, err
266+
}
267+
nodeByNodeName := map[string]cloudprovider.Instance{}
268+
for _, node := range allNodesInNodeGroup {
269+
nodeByNodeName[node.Id] = node
270+
}
271+
var registeredNodesForNodeGroup []*apiv1.Node
272+
for _, node := range allNodes {
273+
if val, ok := node.Annotations[annotations.NodeUpcomingAnnotation]; ok {
274+
if res, ok := strconv.ParseBool(val); ok == nil && res {
275+
continue
276+
}
277+
}
278+
if _, ok := nodeByNodeName[node.Spec.ProviderID]; ok {
279+
registeredNodesForNodeGroup = append(registeredNodesForNodeGroup, node)
280+
}
281+
}
282+
return registeredNodesForNodeGroup, nil
283+
}

cluster-autoscaler/core/scaledown/budgets/budgets_test.go

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ import (
2323

2424
"github.com/google/go-cmp/cmp"
2525
"github.com/google/go-cmp/cmp/cmpopts"
26+
"github.com/stretchr/testify/assert"
2627
apiv1 "k8s.io/api/core/v1"
2728
"k8s.io/apimachinery/pkg/api/resource"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930

3031
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
3132
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
3233
"k8s.io/autoscaler/cluster-autoscaler/config"
33-
"k8s.io/autoscaler/cluster-autoscaler/context"
3434
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
35+
"k8s.io/autoscaler/cluster-autoscaler/core/test"
36+
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
37+
"k8s.io/client-go/kubernetes/fake"
3538
)
3639

3740
func TestCropNodesToBudgets(t *testing.T) {
@@ -49,6 +52,7 @@ func TestCropNodesToBudgets(t *testing.T) {
4952
drain []*NodeGroupView
5053
wantEmpty []*NodeGroupView
5154
wantDrain []*NodeGroupView
55+
otherNodes []*NodeGroupView
5256
}{
5357
"no nodes": {
5458
empty: []*NodeGroupView{},
@@ -201,8 +205,8 @@ func TestCropNodesToBudgets(t *testing.T) {
201205
// Empty and drain nodes together.
202206
"empty&drain nodes within max limits, no deletions in progress": {
203207
empty: generateNodeGroupViewList(testNg, 0, 5),
204-
drain: generateNodeGroupViewList(testNg, 0, 5),
205-
wantDrain: generateNodeGroupViewList(testNg, 0, 5),
208+
drain: generateNodeGroupViewList(testNg2, 0, 5),
209+
wantDrain: generateNodeGroupViewList(testNg2, 0, 5),
206210
wantEmpty: generateNodeGroupViewList(testNg, 0, 5),
207211
},
208212
"empty&drain atomic nodes within max limits, no deletions in progress": {
@@ -213,8 +217,8 @@ func TestCropNodesToBudgets(t *testing.T) {
213217
},
214218
"empty&drain nodes exceeding overall limit, no deletions in progress": {
215219
empty: generateNodeGroupViewList(testNg, 0, 8),
216-
drain: generateNodeGroupViewList(testNg, 0, 8),
217-
wantDrain: generateNodeGroupViewList(testNg, 0, 2),
220+
drain: generateNodeGroupViewList(testNg2, 0, 8),
221+
wantDrain: generateNodeGroupViewList(testNg2, 0, 2),
218222
wantEmpty: generateNodeGroupViewList(testNg, 0, 8),
219223
},
220224
"empty&drain atomic nodes exceeding overall limit, no deletions in progress": {
@@ -241,11 +245,18 @@ func TestCropNodesToBudgets(t *testing.T) {
241245
wantEmpty: generateNodeGroupViewList(atomic8, 0, 2),
242246
wantDrain: generateNodeGroupViewList(atomic8, 2, 8),
243247
},
244-
"empty&drain atomic nodes in same group not matching node group size, no deletions in progress": {
248+
"empty&drain atomic nodes in same group not matching registered node group size, no deletions in progress": {
249+
empty: generateNodeGroupViewList(atomic8, 0, 2),
250+
drain: generateNodeGroupViewList(atomic8, 2, 4),
251+
wantEmpty: []*NodeGroupView{},
252+
wantDrain: []*NodeGroupView{},
253+
otherNodes: generateNodeGroupViewList(atomic8, 4, 8),
254+
},
255+
"empty&drain atomic nodes in same group matching registered node group size, no deletions in progress": {
245256
empty: generateNodeGroupViewList(atomic8, 0, 2),
246257
drain: generateNodeGroupViewList(atomic8, 2, 4),
247-
wantEmpty: []*NodeGroupView{},
248-
wantDrain: []*NodeGroupView{},
258+
wantEmpty: generateNodeGroupViewList(atomic8, 0, 2),
259+
wantDrain: generateNodeGroupViewList(atomic8, 2, 4),
249260
},
250261
"empty&drain atomic nodes exceeding drain limit, no deletions in progress": {
251262
empty: generateNodeGroupViewList(atomic4, 0, 4),
@@ -285,14 +296,14 @@ func TestCropNodesToBudgets(t *testing.T) {
285296
},
286297
"empty&drain nodes exceeding drain limit, no deletions in progress": {
287298
empty: generateNodeGroupViewList(testNg, 0, 2),
288-
drain: generateNodeGroupViewList(testNg, 0, 8),
289-
wantDrain: generateNodeGroupViewList(testNg, 0, 5),
299+
drain: generateNodeGroupViewList(testNg2, 0, 8),
300+
wantDrain: generateNodeGroupViewList(testNg2, 0, 5),
290301
wantEmpty: generateNodeGroupViewList(testNg, 0, 2),
291302
},
292303
"empty&drain nodes with deletions in progress, 0 overall budget left": {
293304
emptyDeletionsInProgress: 10,
294305
empty: generateNodeGroupViewList(testNg, 0, 5),
295-
drain: generateNodeGroupViewList(testNg, 0, 5),
306+
drain: generateNodeGroupViewList(testNg2, 0, 5),
296307
wantEmpty: []*NodeGroupView{},
297308
wantDrain: []*NodeGroupView{},
298309
},
@@ -313,14 +324,14 @@ func TestCropNodesToBudgets(t *testing.T) {
313324
"empty&drain nodes with deletions in progress, overall budget exceeded (shouldn't happen, just a sanity check)": {
314325
emptyDeletionsInProgress: 50,
315326
empty: generateNodeGroupViewList(testNg, 0, 5),
316-
drain: generateNodeGroupViewList(testNg, 0, 5),
327+
drain: generateNodeGroupViewList(testNg2, 0, 5),
317328
wantEmpty: []*NodeGroupView{},
318329
wantDrain: []*NodeGroupView{},
319330
},
320331
"empty&drain nodes with deletions in progress, 0 drain budget left": {
321332
drainDeletionsInProgress: 5,
322333
empty: generateNodeGroupViewList(testNg, 0, 5),
323-
drain: generateNodeGroupViewList(testNg, 0, 5),
334+
drain: generateNodeGroupViewList(testNg2, 0, 5),
324335
wantEmpty: generateNodeGroupViewList(testNg, 0, 5),
325336
wantDrain: []*NodeGroupView{},
326337
},
@@ -334,33 +345,33 @@ func TestCropNodesToBudgets(t *testing.T) {
334345
"empty&drain nodes with deletions in progress, drain budget exceeded (shouldn't happen, just a sanity check)": {
335346
drainDeletionsInProgress: 9,
336347
empty: generateNodeGroupViewList(testNg, 0, 5),
337-
drain: generateNodeGroupViewList(testNg, 0, 5),
348+
drain: generateNodeGroupViewList(testNg2, 0, 5),
338349
wantEmpty: generateNodeGroupViewList(testNg, 0, 1),
339350
wantDrain: []*NodeGroupView{},
340351
},
341352
"empty&drain nodes with deletions in progress, overall budget exceeded, only empty nodes fit": {
342353
emptyDeletionsInProgress: 5,
343354
drainDeletionsInProgress: 3,
344355
empty: generateNodeGroupViewList(testNg, 0, 5),
345-
drain: generateNodeGroupViewList(testNg, 0, 2),
356+
drain: generateNodeGroupViewList(testNg2, 0, 2),
346357
wantEmpty: generateNodeGroupViewList(testNg, 0, 2),
347358
wantDrain: []*NodeGroupView{},
348359
},
349360
"empty&drain nodes with deletions in progress, overall budget exceeded, both empty&drain nodes fit": {
350361
emptyDeletionsInProgress: 5,
351362
drainDeletionsInProgress: 3,
352363
empty: generateNodeGroupViewList(testNg, 0, 1),
353-
drain: generateNodeGroupViewList(testNg, 0, 2),
364+
drain: generateNodeGroupViewList(testNg2, 0, 2),
354365
wantEmpty: generateNodeGroupViewList(testNg, 0, 1),
355-
wantDrain: generateNodeGroupViewList(testNg, 0, 1),
366+
wantDrain: generateNodeGroupViewList(testNg2, 0, 1),
356367
},
357368
"empty&drain nodes with deletions in progress, drain budget exceeded": {
358369
emptyDeletionsInProgress: 1,
359370
drainDeletionsInProgress: 3,
360371
empty: generateNodeGroupViewList(testNg, 0, 4),
361-
drain: generateNodeGroupViewList(testNg, 0, 5),
372+
drain: generateNodeGroupViewList(testNg2, 0, 5),
362373
wantEmpty: generateNodeGroupViewList(testNg, 0, 4),
363-
wantDrain: generateNodeGroupViewList(testNg, 0, 2),
374+
wantDrain: generateNodeGroupViewList(testNg2, 0, 2),
364375
},
365376
"empty&drain atomic nodes with deletions in progress, overall budget exceeded, only empty nodes fit": {
366377
emptyDeletionsInProgress: 5,
@@ -423,23 +434,25 @@ func TestCropNodesToBudgets(t *testing.T) {
423434
provider := testprovider.NewTestCloudProviderBuilder().WithOnScaleDown(func(nodeGroup string, node string) error {
424435
return nil
425436
}).Build()
426-
for _, bucket := range append(tc.empty, tc.drain...) {
437+
allNodes := []*apiv1.Node{}
438+
for _, bucket := range append(append(tc.empty, tc.drain...), tc.otherNodes...) {
427439
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
428440
provider.InsertNodeGroup(bucket.Group)
429441
for _, node := range bucket.Nodes {
430442
provider.AddNode(bucket.Group.Id(), node)
431443
}
444+
allNodes = append(allNodes, bucket.Nodes...)
432445
}
433446

434-
ctx := &context.AutoscalingContext{
435-
AutoscalingOptions: config.AutoscalingOptions{
436-
MaxScaleDownParallelism: 10,
437-
MaxDrainParallelism: 5,
438-
NodeDeletionBatcherInterval: 0 * time.Second,
439-
NodeDeleteDelayAfterTaint: 1 * time.Second,
440-
},
441-
CloudProvider: provider,
447+
options := config.AutoscalingOptions{
448+
MaxScaleDownParallelism: 10,
449+
MaxDrainParallelism: 5,
450+
NodeDeletionBatcherInterval: 0 * time.Second,
451+
NodeDeleteDelayAfterTaint: 1 * time.Second,
442452
}
453+
454+
ctx, err := test.NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
455+
assert.NoError(t, err)
443456
ndt := deletiontracker.NewNodeDeletionTracker(1 * time.Hour)
444457
for i := 0; i < tc.emptyDeletionsInProgress; i++ {
445458
ndt.StartDeletion("ng1", fmt.Sprintf("empty-node-%d", i))
@@ -455,7 +468,8 @@ func TestCropNodesToBudgets(t *testing.T) {
455468
drainList = append(drainList, bucket.Nodes...)
456469
}
457470

458-
budgeter := NewScaleDownBudgetProcessor(ctx)
471+
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, allNodes, nil)
472+
budgeter := NewScaleDownBudgetProcessor(&ctx)
459473
gotEmpty, gotDrain := budgeter.CropNodes(ndt, emptyList, drainList)
460474
if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty(), transformNodeGroupView); diff != "" {
461475
t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff)
@@ -510,6 +524,9 @@ func generateNodeGroupViewList(ng cloudprovider.NodeGroup, from, to int) []*Node
510524
func generateNode(name string) *apiv1.Node {
511525
return &apiv1.Node{
512526
ObjectMeta: metav1.ObjectMeta{Name: name},
527+
Spec: apiv1.NodeSpec{
528+
ProviderID: name,
529+
},
513530
Status: apiv1.NodeStatus{
514531
Allocatable: apiv1.ResourceList{
515532
apiv1.ResourceCPU: resource.MustParse("8"),

0 commit comments

Comments
 (0)