Skip to content

Commit 9c8b78a

Browse files
enxebreelmiko
authored andcommitted
Get replicas always from API server for cluster-autoscaler CAPI provider
When getting Replicas() the local struct in the scalable resource might be stale. To mitigate possible side effects, we want always get a fresh replicas. This is one in a series of PR to mitigate kubernetes#3104
1 parent f1407a1 commit 9c8b78a

File tree

8 files changed

+235
-17
lines changed

8 files changed

+235
-17
lines changed

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/klog"
2728
"k8s.io/utils/pointer"
2829
)
2930

@@ -75,8 +76,22 @@ func (r machineDeploymentScalableResource) Nodes() ([]string, error) {
7576
return result, nil
7677
}
7778

78-
func (r machineDeploymentScalableResource) Replicas() int32 {
79-
return pointer.Int32PtrDerefOr(r.machineDeployment.Spec.Replicas, 0)
79+
func (r machineDeploymentScalableResource) Replicas() (int32, error) {
80+
freshMachineDeployment, err := r.controller.getMachineDeployment(r.machineDeployment.Namespace, r.machineDeployment.Name, metav1.GetOptions{})
81+
if err != nil {
82+
return 0, err
83+
}
84+
85+
if freshMachineDeployment == nil {
86+
return 0, fmt.Errorf("unknown machineDeployment %s", r.machineDeployment.Name)
87+
}
88+
89+
if freshMachineDeployment.Spec.Replicas == nil {
90+
klog.Warningf("MachineDeployment %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineDeployment.Name)
91+
}
92+
// If no value for replicas on the MachineSet spec, fallback to the status
93+
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
94+
return pointer.Int32PtrDerefOr(freshMachineDeployment.Spec.Replicas, freshMachineDeployment.Status.Replicas), nil
8095
}
8196

8297
func (r machineDeploymentScalableResource) SetSize(nreplicas int32) error {

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/klog"
2728
"k8s.io/utils/pointer"
2829
)
2930

@@ -60,8 +61,23 @@ func (r machineSetScalableResource) Nodes() ([]string, error) {
6061
return r.controller.machineSetProviderIDs(r.machineSet)
6162
}
6263

63-
func (r machineSetScalableResource) Replicas() int32 {
64-
return pointer.Int32PtrDerefOr(r.machineSet.Spec.Replicas, 0)
64+
func (r machineSetScalableResource) Replicas() (int32, error) {
65+
freshMachineSet, err := r.controller.getMachineSet(r.machineSet.Namespace, r.machineSet.Name, metav1.GetOptions{})
66+
if err != nil {
67+
return 0, err
68+
}
69+
70+
if freshMachineSet == nil {
71+
return 0, fmt.Errorf("unknown machineSet %s", r.machineSet.Name)
72+
}
73+
74+
if freshMachineSet.Spec.Replicas == nil {
75+
klog.Warningf("MachineSet %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineSet.Name)
76+
}
77+
78+
// If no value for replicas on the MachineSet spec, fallback to the status
79+
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
80+
return pointer.Int32PtrDerefOr(freshMachineSet.Spec.Replicas, freshMachineSet.Status.Replicas), nil
6581
}
6682

6783
func (r machineSetScalableResource) SetSize(nreplicas int32) error {
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package clusterapi
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
25+
)
26+
27+
func TestSetSize(t *testing.T) {
28+
initialReplicas := int32(1)
29+
updatedReplicas := int32(5)
30+
31+
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
32+
controller, stop := mustCreateTestController(t, testConfig)
33+
defer stop()
34+
35+
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
36+
if err != nil {
37+
t.Fatal(err)
38+
}
39+
40+
err = sr.SetSize(updatedReplicas)
41+
if err != nil {
42+
t.Fatal(err)
43+
}
44+
45+
// fetch machineSet
46+
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
47+
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
48+
if err != nil {
49+
t.Fatal(err)
50+
}
51+
52+
replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas")
53+
if err != nil {
54+
t.Fatal(err)
55+
}
56+
if !found {
57+
t.Fatal("spec.replicas not found")
58+
}
59+
60+
got := int32(replicas)
61+
if got != updatedReplicas {
62+
t.Errorf("expected %v, got: %v", updatedReplicas, got)
63+
}
64+
}
65+
66+
func TestReplicas(t *testing.T) {
67+
initialReplicas := int32(1)
68+
updatedReplicas := int32(5)
69+
70+
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
71+
controller, stop := mustCreateTestController(t, testConfig)
72+
defer stop()
73+
74+
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
75+
if err != nil {
76+
t.Fatal(err)
77+
}
78+
79+
i, err := sr.Replicas()
80+
if err != nil {
81+
t.Fatal(err)
82+
}
83+
84+
if i != initialReplicas {
85+
t.Errorf("expected %v, got: %v", initialReplicas, i)
86+
}
87+
88+
// fetch and update machineSet
89+
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
90+
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
91+
if err != nil {
92+
t.Fatal(err)
93+
}
94+
95+
if err := unstructured.SetNestedField(u.Object, int64(updatedReplicas), "spec", "replicas"); err != nil {
96+
t.Fatal(err)
97+
}
98+
99+
_, err = sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(u.GetNamespace()).
100+
Update(context.TODO(), u, metav1.UpdateOptions{})
101+
if err != nil {
102+
t.Fatal(err)
103+
}
104+
105+
i, err = sr.Replicas()
106+
if err != nil {
107+
t.Fatal(err)
108+
}
109+
110+
if i != updatedReplicas {
111+
t.Errorf("expected %v, got: %v", updatedReplicas, i)
112+
}
113+
}
114+
115+
func TestSetSizeAndReplicas(t *testing.T) {
116+
initialReplicas := int32(1)
117+
updatedReplicas := int32(5)
118+
119+
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
120+
controller, stop := mustCreateTestController(t, testConfig)
121+
defer stop()
122+
123+
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
124+
if err != nil {
125+
t.Fatal(err)
126+
}
127+
128+
i, err := sr.Replicas()
129+
if err != nil {
130+
t.Fatal(err)
131+
}
132+
133+
if i != initialReplicas {
134+
t.Errorf("expected %v, got: %v", initialReplicas, i)
135+
}
136+
137+
err = sr.SetSize(updatedReplicas)
138+
if err != nil {
139+
t.Fatal(err)
140+
}
141+
142+
i, err = sr.Replicas()
143+
if err != nil {
144+
t.Fatal(err)
145+
}
146+
147+
if i != updatedReplicas {
148+
t.Errorf("expected %v, got: %v", updatedReplicas, i)
149+
}
150+
}

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ func (ng *nodegroup) MaxSize() int {
5959
// (new nodes finish startup and registration or removed nodes are
6060
// deleted completely). Implementation required.
6161
func (ng *nodegroup) TargetSize() (int, error) {
62-
return int(ng.scalableResource.Replicas()), nil
62+
size, err := ng.scalableResource.Replicas()
63+
if err != nil {
64+
return 0, err
65+
}
66+
return int(size), nil
6367
}
6468

6569
// IncreaseSize increases the size of the node group. To delete a node
@@ -70,11 +74,17 @@ func (ng *nodegroup) IncreaseSize(delta int) error {
7074
if delta <= 0 {
7175
return fmt.Errorf("size increase must be positive")
7276
}
73-
size := int(ng.scalableResource.Replicas())
74-
if size+delta > ng.MaxSize() {
75-
return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.MaxSize())
77+
78+
size, err := ng.scalableResource.Replicas()
79+
if err != nil {
80+
return err
7681
}
77-
return ng.scalableResource.SetSize(int32(size + delta))
82+
intSize := int(size)
83+
84+
if intSize+delta > ng.MaxSize() {
85+
return fmt.Errorf("size increase too large - desired:%d max:%d", intSize+delta, ng.MaxSize())
86+
}
87+
return ng.scalableResource.SetSize(int32(intSize + delta))
7888
}
7989

8090
// DeleteNodes deletes nodes from this node group. Error is returned
@@ -104,7 +114,10 @@ func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error {
104114
// Step 2: if deleting len(nodes) would make the replica count
105115
// <= 0, then the request to delete that many nodes is bogus
106116
// and we fail fast.
107-
replicas := ng.scalableResource.Replicas()
117+
replicas, err := ng.scalableResource.Replicas()
118+
if err != nil {
119+
return err
120+
}
108121

109122
if replicas-int32(len(nodes)) <= 0 {
110123
return fmt.Errorf("unable to delete %d machines in %q, machine replicas are <= 0 ", len(nodes), ng.Id())
@@ -187,7 +200,11 @@ func (ng *nodegroup) Id() string {
187200

188201
// Debug returns a string containing all information regarding this node group.
189202
func (ng *nodegroup) Debug() string {
190-
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), ng.scalableResource.Replicas())
203+
replicas, err := ng.scalableResource.Replicas()
204+
if err != nil {
205+
return fmt.Sprintf("%s (min: %d, max: %d, replicas: %v)", ng.Id(), ng.MinSize(), ng.MaxSize(), err)
206+
}
207+
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), replicas)
191208
}
192209

193210
// Nodes returns a list of all nodes that belong to this node group.

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ limitations under the License.
1717
package clusterapi
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"path"
2223
"sort"
2324
"strings"
2425
"testing"
2526
"time"
2627

28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2729
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2830
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2931
"k8s.io/utils/pointer"
@@ -108,7 +110,7 @@ func TestNodeGroupNewNodeGroupConstructor(t *testing.T) {
108110
}
109111

110112
test := func(t *testing.T, tc testCase, testConfig *testConfig) {
111-
controller, stop := mustCreateTestController(t)
113+
controller, stop := mustCreateTestController(t, testConfig)
112114
defer stop()
113115

114116
ng, err := newNodeGroup(t, controller, testConfig)
@@ -429,12 +431,22 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) {
429431
switch v := (ng.scalableResource).(type) {
430432
case *machineSetScalableResource:
431433
testConfig.machineSet.Spec.Replicas = int32ptr(*testConfig.machineSet.Spec.Replicas + tc.targetSizeIncrement)
432-
if err := controller.machineSetInformer.Informer().GetStore().Add(newUnstructuredFromMachineSet(testConfig.machineSet)); err != nil {
434+
u := newUnstructuredFromMachineSet(testConfig.machineSet)
435+
if err := controller.machineSetInformer.Informer().GetStore().Add(u); err != nil {
433436
t.Fatalf("failed to add new machine: %v", err)
434437
}
438+
_, err := controller.dynamicclient.Resource(*controller.machineSetResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{})
439+
if err != nil {
440+
t.Fatalf("failed to updating machine: %v", err)
441+
}
435442
case *machineDeploymentScalableResource:
436443
testConfig.machineDeployment.Spec.Replicas = int32ptr(*testConfig.machineDeployment.Spec.Replicas + tc.targetSizeIncrement)
437-
if err := controller.machineDeploymentInformer.Informer().GetStore().Add(newUnstructuredFromMachineDeployment(testConfig.machineDeployment)); err != nil {
444+
u := newUnstructuredFromMachineDeployment(testConfig.machineDeployment)
445+
if err := controller.machineDeploymentInformer.Informer().GetStore().Add(u); err != nil {
446+
}
447+
_, err := controller.dynamicclient.Resource(*controller.machineDeploymentResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{})
448+
if err != nil {
449+
t.Fatalf("failed to updating machine: %v", err)
438450
}
439451
default:
440452
t.Errorf("unexpected type: %T", v)
@@ -450,6 +462,7 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) {
450462
if err != nil {
451463
t.Fatalf("unexpected error: %v", err)
452464
}
465+
453466
if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) {
454467
t.Errorf("initially expected %v, got %v", tc.initial, currReplicas)
455468
}

cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type scalableResource interface {
4242
SetSize(nreplicas int32) error
4343

4444
// Replicas returns the current replica count of the resource
45-
Replicas() int32
45+
Replicas() (int32, error)
4646

4747
// MarkMachineForDeletion marks machine for deletion
4848
MarkMachineForDeletion(machine *Machine) error

cluster-autoscaler/cloudprovider/clusterapi/machinedeployment_types.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ type MachineDeploymentSpec struct {
3636
}
3737

3838
// MachineDeploymentStatus is the internal autoscaler Schema for MachineDeploymentStatus
39-
type MachineDeploymentStatus struct{}
39+
type MachineDeploymentStatus struct {
40+
// Number of desired machines. Defaults to 1.
41+
// This is a pointer to distinguish between explicit zero and not specified.
42+
Replicas int32 `json:"replicas,omitempty"`
43+
}
4044

4145
// MachineDeployment is the internal autoscaler Schema for MachineDeployment
4246
type MachineDeployment struct {

cluster-autoscaler/cloudprovider/clusterapi/machineset_types.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ type MachineTemplateSpec struct {
6868
}
6969

7070
// MachineSetStatus is the internal autoscaler Schema for MachineSetStatus
71-
type MachineSetStatus struct{}
71+
type MachineSetStatus struct {
72+
// Replicas is the most recently observed number of replicas.
73+
Replicas int32 `json:"replicas"`
74+
}
7275

7376
// MachineSetList is the internal autoscaler Schema for MachineSetList
7477
type MachineSetList struct {

0 commit comments

Comments
 (0)