Skip to content

Commit 41b9b45

Browse files
committed
Add UpdateThreshold method to ndlt and use it during RemovableAt
1 parent 4271a1a commit 41b9b45

File tree

4 files changed

+60
-12
lines changed

4 files changed

+60
-12
lines changed

cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
type LatencyTracker interface {
1212
ObserveDeletion(nodeName string, timestamp time.Time)
1313
UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time)
14+
UpdateThreshold(nodeName string, threshold time.Duration)
1415
}
1516
type NodeInfo struct {
1617
UnneededSince time.Time
@@ -75,6 +76,17 @@ func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Tim
7576
}
7677
}
7778

79+
// UpdateThreshold updates the scale-down threshold for a tracked node.
80+
func (t *NodeLatencyTracker) UpdateThreshold(nodeName string, threshold time.Duration) {
81+
if info, exists := t.nodes[nodeName]; exists {
82+
info.Threshold = threshold
83+
t.nodes[nodeName] = info
84+
klog.V(2).Infof("Updated threshold for node %q to %s", nodeName, threshold)
85+
} else {
86+
klog.Warningf("Attempted to update threshold for unknown node %q", nodeName)
87+
}
88+
}
89+
7890
// GetTrackedNodes returns the names of all nodes currently tracked as unneeded.
7991
func (t *NodeLatencyTracker) GetTrackedNodes() []string {
8092
names := make([]string, 0, len(t.nodes))

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
9090
return &Planner{
9191
context: context,
9292
unremovableNodes: unremovable.NewNodes(),
93-
unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder),
93+
unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder, nlt),
9494
rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, deleteOptions, drainabilityRules, true),
9595
actuationInjector: scheduling.NewHintingSimulator(),
9696
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),

cluster-autoscaler/core/scaledown/unneeded/nodes.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/autoscaler/cluster-autoscaler/context"
2525
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2626
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
27+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
2728
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
2829
"k8s.io/autoscaler/cluster-autoscaler/metrics"
2930
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
@@ -37,11 +38,12 @@ import (
3738

3839
// Nodes tracks the state of cluster nodes that are not needed.
3940
type Nodes struct {
40-
sdtg scaleDownTimeGetter
41-
limitsFinder *resource.LimitsFinder
42-
cachedList []*apiv1.Node
43-
byName map[string]*node
44-
unneededTimeCache map[string]time.Duration
41+
sdtg scaleDownTimeGetter
42+
limitsFinder *resource.LimitsFinder
43+
nodeLatencyTracker latencytracker.LatencyTracker
44+
cachedList []*apiv1.Node
45+
byName map[string]*node
46+
unneededTimeCache map[string]time.Duration
4547
}
4648

4749
type node struct {
@@ -57,11 +59,12 @@ type scaleDownTimeGetter interface {
5759
}
5860

5961
// NewNodes returns a new initialized Nodes object.
60-
func NewNodes(sdtg scaleDownTimeGetter, limitsFinder *resource.LimitsFinder) *Nodes {
62+
func NewNodes(sdtg scaleDownTimeGetter, limitsFinder *resource.LimitsFinder, nlt latencytracker.LatencyTracker) *Nodes {
6163
return &Nodes{
62-
sdtg: sdtg,
63-
limitsFinder: limitsFinder,
64-
unneededTimeCache: make(map[string]time.Duration),
64+
sdtg: sdtg,
65+
limitsFinder: limitsFinder,
66+
unneededTimeCache: make(map[string]time.Duration),
67+
nodeLatencyTracker: nlt,
6568
}
6669
}
6770

@@ -200,6 +203,9 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDown
200203
if ready {
201204
// Check how long a ready node was underutilized.
202205
unneededTime, err := n.sdtg.GetScaleDownUnneededTime(nodeGroup)
206+
if n.nodeLatencyTracker != nil {
207+
n.nodeLatencyTracker.UpdateThreshold(node.Name, unneededTime)
208+
}
203209
if err != nil {
204210
klog.Errorf("Error trying to get ScaleDownUnneededTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
205211
return simulator.UnexpectedError
@@ -210,6 +216,9 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDown
210216
} else {
211217
// Unready nodes may be deleted after a different time than underutilized nodes.
212218
unreadyTime, err := n.sdtg.GetScaleDownUnreadyTime(nodeGroup)
219+
if n.nodeLatencyTracker != nil {
220+
n.nodeLatencyTracker.UpdateThreshold(node.Name, unreadyTime)
221+
}
213222
if err != nil {
214223
klog.Errorf("Error trying to get ScaleDownUnreadyTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
215224
return simulator.UnexpectedError

cluster-autoscaler/core/scaledown/unneeded/nodes_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestUpdate(t *testing.T) {
9999
tc := tc
100100
t.Run(tc.desc, func(t *testing.T) {
101101
t.Parallel()
102-
nodes := NewNodes(nil, nil)
102+
nodes := NewNodes(nil, nil, nil)
103103
nodes.Update(tc.initialNodes, initialTimestamp)
104104
nodes.Update(tc.finalNodes, finalTimestamp)
105105
wantNodes := len(tc.wantTimestamps)
@@ -201,7 +201,8 @@ func TestRemovableAt(t *testing.T) {
201201
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ScaleDownSimulationTimeout: 5 * time.Minute}, &fake.Clientset{}, registry, provider, nil, nil)
202202
assert.NoError(t, err)
203203

204-
n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{})
204+
fakeTracker := NewFakeLatencyTracker()
205+
n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{}, fakeTracker)
205206
n.Update(removableNodes, time.Now())
206207
gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, nodes.ScaleDownContext{
207208
ActuationStatus: as,
@@ -211,6 +212,16 @@ func TestRemovableAt(t *testing.T) {
211212
if len(gotDrainToRemove) != tc.numDrainToRemove || len(gotEmptyToRemove) != tc.numEmptyToRemove {
212213
t.Errorf("%s: getNodesToRemove() return %d, %d, want %d, %d", tc.name, len(gotEmptyToRemove), len(gotDrainToRemove), tc.numEmptyToRemove, tc.numDrainToRemove)
213214
}
215+
expectedThreshold := 0 * time.Second // matches fakeScaleDownTimeGetter
216+
for _, node := range removableNodes {
217+
nodeName := node.Node.Name
218+
got, ok := fakeTracker.Observed[nodeName]
219+
if !ok {
220+
t.Errorf("NodeLatencyTracker not called for node %s", nodeName)
221+
} else if got != expectedThreshold {
222+
t.Errorf("NodeLatencyTracker called with %v for node %s, want %v", got, nodeName, expectedThreshold)
223+
}
224+
}
214225
})
215226
}
216227
}
@@ -245,3 +256,19 @@ func (f *fakeScaleDownTimeGetter) GetScaleDownUnneededTime(cloudprovider.NodeGro
245256
func (f *fakeScaleDownTimeGetter) GetScaleDownUnreadyTime(cloudprovider.NodeGroup) (time.Duration, error) {
246257
return 0 * time.Second, nil
247258
}
259+
260+
type fakeLatencyTracker struct {
261+
Observed map[string]time.Duration
262+
}
263+
264+
func NewFakeLatencyTracker() *fakeLatencyTracker {
265+
return &fakeLatencyTracker{Observed: make(map[string]time.Duration)}
266+
}
267+
268+
func (f *fakeLatencyTracker) UpdateThreshold(nodeName string, threshold time.Duration) {
269+
f.Observed[nodeName] = threshold
270+
}
271+
func (f *fakeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) {
272+
}
273+
func (f *fakeLatencyTracker) UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time) {
274+
}

0 commit comments

Comments
 (0)