Skip to content

Commit 1ae89b9

Browse files
authored
Merge pull request kubernetes#3177 from elmiko/issue/3104
Fix stale replicas issue with cluster-autoscaler CAPI provider
2 parents 995591a + be6edb4 commit 1ae89b9

File tree

9 files changed

+344
-68
lines changed

9 files changed

+344
-68
lines changed

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"os"
2323
"strings"
24+
"sync"
2425

2526
corev1 "k8s.io/api/core/v1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -65,6 +66,7 @@ type machineController struct {
6566
machineSetResource *schema.GroupVersionResource
6667
machineResource *schema.GroupVersionResource
6768
machineDeploymentResource *schema.GroupVersionResource
69+
accessLock sync.Mutex
6870
}
6971

7072
type machineSetFilterFunc func(machineSet *MachineSet) error

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/klog"
2728
"k8s.io/utils/pointer"
2829
)
2930

@@ -75,8 +76,22 @@ func (r machineDeploymentScalableResource) Nodes() ([]string, error) {
7576
return result, nil
7677
}
7778

78-
func (r machineDeploymentScalableResource) Replicas() int32 {
79-
return pointer.Int32PtrDerefOr(r.machineDeployment.Spec.Replicas, 0)
79+
func (r machineDeploymentScalableResource) Replicas() (int32, error) {
80+
freshMachineDeployment, err := r.controller.getMachineDeployment(r.machineDeployment.Namespace, r.machineDeployment.Name, metav1.GetOptions{})
81+
if err != nil {
82+
return 0, err
83+
}
84+
85+
if freshMachineDeployment == nil {
86+
return 0, fmt.Errorf("unknown machineDeployment %s", r.machineDeployment.Name)
87+
}
88+
89+
if freshMachineDeployment.Spec.Replicas == nil {
90+
klog.Warningf("MachineDeployment %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineDeployment.Name)
91+
}
92+
// If no value for replicas on the MachineSet spec, fallback to the status
93+
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
94+
return pointer.Int32PtrDerefOr(freshMachineDeployment.Spec.Replicas, freshMachineDeployment.Status.Replicas), nil
8095
}
8196

8297
func (r machineDeploymentScalableResource) SetSize(nreplicas int32) error {

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/klog"
2728
"k8s.io/utils/pointer"
2829
)
2930

@@ -60,8 +61,23 @@ func (r machineSetScalableResource) Nodes() ([]string, error) {
6061
return r.controller.machineSetProviderIDs(r.machineSet)
6162
}
6263

63-
func (r machineSetScalableResource) Replicas() int32 {
64-
return pointer.Int32PtrDerefOr(r.machineSet.Spec.Replicas, 0)
64+
func (r machineSetScalableResource) Replicas() (int32, error) {
65+
freshMachineSet, err := r.controller.getMachineSet(r.machineSet.Namespace, r.machineSet.Name, metav1.GetOptions{})
66+
if err != nil {
67+
return 0, err
68+
}
69+
70+
if freshMachineSet == nil {
71+
return 0, fmt.Errorf("unknown machineSet %s", r.machineSet.Name)
72+
}
73+
74+
if freshMachineSet.Spec.Replicas == nil {
75+
klog.Warningf("MachineSet %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineSet.Name)
76+
}
77+
78+
// If no value for replicas on the MachineSet spec, fallback to the status
79+
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
80+
return pointer.Int32PtrDerefOr(freshMachineSet.Spec.Replicas, freshMachineSet.Status.Replicas), nil
6581
}
6682

6783
func (r machineSetScalableResource) SetSize(nreplicas int32) error {
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package clusterapi
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
25+
)
26+
27+
func TestSetSize(t *testing.T) {
28+
initialReplicas := int32(1)
29+
updatedReplicas := int32(5)
30+
31+
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
32+
controller, stop := mustCreateTestController(t, testConfig)
33+
defer stop()
34+
35+
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
36+
if err != nil {
37+
t.Fatal(err)
38+
}
39+
40+
err = sr.SetSize(updatedReplicas)
41+
if err != nil {
42+
t.Fatal(err)
43+
}
44+
45+
// fetch machineSet
46+
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
47+
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
48+
if err != nil {
49+
t.Fatal(err)
50+
}
51+
52+
replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas")
53+
if err != nil {
54+
t.Fatal(err)
55+
}
56+
if !found {
57+
t.Fatal("spec.replicas not found")
58+
}
59+
60+
got := int32(replicas)
61+
if got != updatedReplicas {
62+
t.Errorf("expected %v, got: %v", updatedReplicas, got)
63+
}
64+
}
65+
66+
func TestReplicas(t *testing.T) {
67+
initialReplicas := int32(1)
68+
updatedReplicas := int32(5)
69+
70+
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
71+
controller, stop := mustCreateTestController(t, testConfig)
72+
defer stop()
73+
74+
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
75+
if err != nil {
76+
t.Fatal(err)
77+
}
78+
79+
i, err := sr.Replicas()
80+
if err != nil {
81+
t.Fatal(err)
82+
}
83+
84+
if i != initialReplicas {
85+
t.Errorf("expected %v, got: %v", initialReplicas, i)
86+
}
87+
88+
// fetch and update machineSet
89+
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
90+
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
91+
if err != nil {
92+
t.Fatal(err)
93+
}
94+
95+
if err := unstructured.SetNestedField(u.Object, int64(updatedReplicas), "spec", "replicas"); err != nil {
96+
t.Fatal(err)
97+
}
98+
99+
_, err = sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(u.GetNamespace()).
100+
Update(context.TODO(), u, metav1.UpdateOptions{})
101+
if err != nil {
102+
t.Fatal(err)
103+
}
104+
105+
i, err = sr.Replicas()
106+
if err != nil {
107+
t.Fatal(err)
108+
}
109+
110+
if i != updatedReplicas {
111+
t.Errorf("expected %v, got: %v", updatedReplicas, i)
112+
}
113+
}
114+
115+
func TestSetSizeAndReplicas(t *testing.T) {
116+
initialReplicas := int32(1)
117+
updatedReplicas := int32(5)
118+
119+
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
120+
controller, stop := mustCreateTestController(t, testConfig)
121+
defer stop()
122+
123+
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
124+
if err != nil {
125+
t.Fatal(err)
126+
}
127+
128+
i, err := sr.Replicas()
129+
if err != nil {
130+
t.Fatal(err)
131+
}
132+
133+
if i != initialReplicas {
134+
t.Errorf("expected %v, got: %v", initialReplicas, i)
135+
}
136+
137+
err = sr.SetSize(updatedReplicas)
138+
if err != nil {
139+
t.Fatal(err)
140+
}
141+
142+
i, err = sr.Replicas()
143+
if err != nil {
144+
t.Fatal(err)
145+
}
146+
147+
if i != updatedReplicas {
148+
t.Errorf("expected %v, got: %v", updatedReplicas, i)
149+
}
150+
}

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ func (ng *nodegroup) MaxSize() int {
5959
// (new nodes finish startup and registration or removed nodes are
6060
// deleted completely). Implementation required.
6161
func (ng *nodegroup) TargetSize() (int, error) {
62-
return int(ng.scalableResource.Replicas()), nil
62+
size, err := ng.scalableResource.Replicas()
63+
if err != nil {
64+
return 0, err
65+
}
66+
return int(size), nil
6367
}
6468

6569
// IncreaseSize increases the size of the node group. To delete a node
@@ -70,18 +74,37 @@ func (ng *nodegroup) IncreaseSize(delta int) error {
7074
if delta <= 0 {
7175
return fmt.Errorf("size increase must be positive")
7276
}
73-
size := int(ng.scalableResource.Replicas())
74-
if size+delta > ng.MaxSize() {
75-
return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.MaxSize())
77+
78+
size, err := ng.scalableResource.Replicas()
79+
if err != nil {
80+
return err
7681
}
77-
return ng.scalableResource.SetSize(int32(size + delta))
82+
intSize := int(size)
83+
84+
if intSize+delta > ng.MaxSize() {
85+
return fmt.Errorf("size increase too large - desired:%d max:%d", intSize+delta, ng.MaxSize())
86+
}
87+
return ng.scalableResource.SetSize(int32(intSize + delta))
7888
}
7989

8090
// DeleteNodes deletes nodes from this node group. Error is returned
8191
// either on failure or if the given node doesn't belong to this node
8292
// group. This function should wait until node group size is updated.
8393
// Implementation required.
8494
func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error {
95+
ng.machineController.accessLock.Lock()
96+
defer ng.machineController.accessLock.Unlock()
97+
98+
replicas, err := ng.scalableResource.Replicas()
99+
if err != nil {
100+
return err
101+
}
102+
103+
// if we are at minSize already we wail early.
104+
if int(replicas) <= ng.MinSize() {
105+
return fmt.Errorf("min size reached, nodes will not be deleted")
106+
}
107+
85108
// Step 1: Verify all nodes belong to this node group.
86109
for _, node := range nodes {
87110
actualNodeGroup, err := ng.machineController.nodeGroupForNode(node)
@@ -99,12 +122,10 @@ func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error {
99122
}
100123

101124
// Step 2: if deleting len(nodes) would make the replica count
102-
// <= 0, then the request to delete that many nodes is bogus
125+
// < minSize, then the request to delete that many nodes is bogus
103126
// and we fail fast.
104-
replicas := ng.scalableResource.Replicas()
105-
106-
if replicas-int32(len(nodes)) <= 0 {
107-
return fmt.Errorf("unable to delete %d machines in %q, machine replicas are <= 0 ", len(nodes), ng.Id())
127+
if replicas-int32(len(nodes)) < int32(ng.MinSize()) {
128+
return fmt.Errorf("unable to delete %d machines in %q, machine replicas are %q, minSize is %q ", len(nodes), ng.Id(), replicas, ng.MinSize())
108129
}
109130

110131
// Step 3: annotate the corresponding machine that it is a
@@ -184,7 +205,11 @@ func (ng *nodegroup) Id() string {
184205

185206
// Debug returns a string containing all information regarding this node group.
186207
func (ng *nodegroup) Debug() string {
187-
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), ng.scalableResource.Replicas())
208+
replicas, err := ng.scalableResource.Replicas()
209+
if err != nil {
210+
return fmt.Sprintf("%s (min: %d, max: %d, replicas: %v)", ng.Id(), ng.MinSize(), ng.MaxSize(), err)
211+
}
212+
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), replicas)
188213
}
189214

190215
// Nodes returns a list of all nodes that belong to this node group.

0 commit comments

Comments
 (0)