Skip to content

Commit 519e89c

Browse files
committed
Add UpdateThreshold method to ndlt and use it during RemovableAt
1 parent 7824eae commit 519e89c

File tree

4 files changed

+60
-12
lines changed

4 files changed

+60
-12
lines changed

cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
type LatencyTracker interface {
1212
ObserveDeletion(nodeName string, timestamp time.Time)
1313
UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time)
14+
UpdateThreshold(nodeName string, threshold time.Duration)
1415
}
1516
type NodeInfo struct {
1617
UnneededSince time.Time
@@ -75,6 +76,17 @@ func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Tim
7576
}
7677
}
7778

79+
// UpdateThreshold updates the scale-down threshold for a tracked node.
80+
func (t *NodeLatencyTracker) UpdateThreshold(nodeName string, threshold time.Duration) {
81+
if info, exists := t.nodes[nodeName]; exists {
82+
info.Threshold = threshold
83+
t.nodes[nodeName] = info
84+
klog.V(2).Infof("Updated threshold for node %q to %s", nodeName, threshold)
85+
} else {
86+
klog.Warningf("Attempted to update threshold for unknown node %q", nodeName)
87+
}
88+
}
89+
7890
// GetTrackedNodes returns the names of all nodes currently tracked as unneeded.
7991
func (t *NodeLatencyTracker) GetTrackedNodes() []string {
8092
names := make([]string, 0, len(t.nodes))

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
9696
return &Planner{
9797
context: context,
9898
unremovableNodes: unremovable.NewNodes(),
99-
unneededNodes: unneededNodes,
99+
unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder, nlt),
100100
rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, deleteOptions, drainabilityRules, true),
101101
actuationInjector: scheduling.NewHintingSimulator(),
102102
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),

cluster-autoscaler/core/scaledown/unneeded/nodes.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/autoscaler/cluster-autoscaler/context"
2626
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2727
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
28+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
2829
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
2930
"k8s.io/autoscaler/cluster-autoscaler/metrics"
3031
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
@@ -39,11 +40,12 @@ import (
3940

4041
// Nodes tracks the state of cluster nodes that are not needed.
4142
type Nodes struct {
42-
sdtg scaleDownTimeGetter
43-
limitsFinder *resource.LimitsFinder
44-
cachedList []*apiv1.Node
45-
byName map[string]*node
46-
unneededTimeCache map[string]time.Duration
43+
sdtg scaleDownTimeGetter
44+
limitsFinder *resource.LimitsFinder
45+
nodeLatencyTracker latencytracker.LatencyTracker
46+
cachedList []*apiv1.Node
47+
byName map[string]*node
48+
unneededTimeCache map[string]time.Duration
4749
}
4850

4951
type node struct {
@@ -59,11 +61,12 @@ type scaleDownTimeGetter interface {
5961
}
6062

6163
// NewNodes returns a new initialized Nodes object.
62-
func NewNodes(sdtg scaleDownTimeGetter, limitsFinder *resource.LimitsFinder) *Nodes {
64+
func NewNodes(sdtg scaleDownTimeGetter, limitsFinder *resource.LimitsFinder, nlt latencytracker.LatencyTracker) *Nodes {
6365
return &Nodes{
64-
sdtg: sdtg,
65-
limitsFinder: limitsFinder,
66-
unneededTimeCache: make(map[string]time.Duration),
66+
sdtg: sdtg,
67+
limitsFinder: limitsFinder,
68+
unneededTimeCache: make(map[string]time.Duration),
69+
nodeLatencyTracker: nlt,
6770
}
6871
}
6972

@@ -268,6 +271,9 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDown
268271
if ready {
269272
// Check how long a ready node was underutilized.
270273
unneededTime, err := n.sdtg.GetScaleDownUnneededTime(nodeGroup)
274+
if n.nodeLatencyTracker != nil {
275+
n.nodeLatencyTracker.UpdateThreshold(node.Name, unneededTime)
276+
}
271277
if err != nil {
272278
klog.Errorf("Error trying to get ScaleDownUnneededTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
273279
return simulator.UnexpectedError
@@ -278,6 +284,9 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDown
278284
} else {
279285
// Unready nodes may be deleted after a different time than underutilized nodes.
280286
unreadyTime, err := n.sdtg.GetScaleDownUnreadyTime(nodeGroup)
287+
if n.nodeLatencyTracker != nil {
288+
n.nodeLatencyTracker.UpdateThreshold(node.Name, unreadyTime)
289+
}
281290
if err != nil {
282291
klog.Errorf("Error trying to get ScaleDownUnreadyTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
283292
return simulator.UnexpectedError

cluster-autoscaler/core/scaledown/unneeded/nodes_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func TestUpdate(t *testing.T) {
101101
tc := tc
102102
t.Run(tc.desc, func(t *testing.T) {
103103
t.Parallel()
104-
nodes := NewNodes(nil, nil)
104+
nodes := NewNodes(nil, nil, nil)
105105
nodes.Update(tc.initialNodes, initialTimestamp)
106106
nodes.Update(tc.finalNodes, finalTimestamp)
107107
wantNodes := len(tc.wantTimestamps)
@@ -203,7 +203,8 @@ func TestRemovableAt(t *testing.T) {
203203
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ScaleDownSimulationTimeout: 5 * time.Minute}, &fake.Clientset{}, registry, provider, nil, nil)
204204
assert.NoError(t, err)
205205

206-
n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{})
206+
fakeTracker := NewFakeLatencyTracker()
207+
n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{}, fakeTracker)
207208
n.Update(removableNodes, time.Now())
208209
gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, nodes.ScaleDownContext{
209210
ActuationStatus: as,
@@ -213,6 +214,16 @@ func TestRemovableAt(t *testing.T) {
213214
if len(gotDrainToRemove) != tc.numDrainToRemove || len(gotEmptyToRemove) != tc.numEmptyToRemove {
214215
t.Errorf("%s: getNodesToRemove() return %d, %d, want %d, %d", tc.name, len(gotEmptyToRemove), len(gotDrainToRemove), tc.numEmptyToRemove, tc.numDrainToRemove)
215216
}
217+
expectedThreshold := 0 * time.Second // matches fakeScaleDownTimeGetter
218+
for _, node := range removableNodes {
219+
nodeName := node.Node.Name
220+
got, ok := fakeTracker.Observed[nodeName]
221+
if !ok {
222+
t.Errorf("NodeLatencyTracker not called for node %s", nodeName)
223+
} else if got != expectedThreshold {
224+
t.Errorf("NodeLatencyTracker called with %v for node %s, want %v", got, nodeName, expectedThreshold)
225+
}
226+
}
216227
})
217228
}
218229
}
@@ -342,3 +353,19 @@ func (f *fakeScaleDownTimeGetter) GetScaleDownUnneededTime(cloudprovider.NodeGro
342353
func (f *fakeScaleDownTimeGetter) GetScaleDownUnreadyTime(cloudprovider.NodeGroup) (time.Duration, error) {
343354
return 0 * time.Second, nil
344355
}
356+
357+
type fakeLatencyTracker struct {
358+
Observed map[string]time.Duration
359+
}
360+
361+
func NewFakeLatencyTracker() *fakeLatencyTracker {
362+
return &fakeLatencyTracker{Observed: make(map[string]time.Duration)}
363+
}
364+
365+
func (f *fakeLatencyTracker) UpdateThreshold(nodeName string, threshold time.Duration) {
366+
f.Observed[nodeName] = threshold
367+
}
368+
func (f *fakeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) {
369+
}
370+
func (f *fakeLatencyTracker) UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time) {
371+
}

0 commit comments

Comments
 (0)