Skip to content

Commit cfae8da

Browse files
committed
Change UpdateStateWithUnneededList logic to also process nodes that are currently under deletion
1 parent b8f4089 commit cfae8da

File tree

3 files changed

+31
-117
lines changed

3 files changed

+31
-117
lines changed

cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@ package latencytracker
33
import (
44
"time"
55

6+
apiv1 "k8s.io/api/core/v1"
67
"k8s.io/autoscaler/cluster-autoscaler/metrics"
78
"k8s.io/klog/v2"
89
)
910

1011
type LatencyTracker interface {
1112
ObserveDeletion(nodeName string, timestamp time.Time)
12-
UpdateStateWithUnneededList(list []NodeInfo, timestamp time.Time)
13+
UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time)
1314
}
1415
type NodeInfo struct {
15-
Name string
1616
UnneededSince time.Time
1717
Threshold time.Duration
1818
}
@@ -28,19 +28,34 @@ func NewNodeLatencyTracker() *NodeLatencyTracker {
2828
}
2929
}
3030

31-
func (t *NodeLatencyTracker) UpdateStateWithUnneededList(list []NodeInfo, timestamp time.Time) {
31+
// UpdateStateWithUnneededList records unneeded nodes and handles missing ones.
32+
func (t *NodeLatencyTracker) UpdateStateWithUnneededList(
33+
list []*apiv1.Node,
34+
currentlyInDeletion map[string]bool,
35+
timestamp time.Time,
36+
) {
3237
currentSet := make(map[string]struct{}, len(list))
33-
for _, info := range list {
34-
currentSet[info.Name] = struct{}{}
35-
_, exists := t.nodes[info.Name]
36-
if !exists {
37-
t.nodes[info.Name] = NodeInfo{
38-
Name: info.Name,
39-
UnneededSince: info.UnneededSince,
40-
Threshold: info.Threshold,
38+
for _, node := range list {
39+
currentSet[node.Name] = struct{}{}
40+
41+
if _, exists := t.nodes[node.Name]; !exists {
42+
t.nodes[node.Name] = NodeInfo{
43+
UnneededSince: timestamp,
44+
Threshold: 0,
45+
}
46+
klog.V(2).Infof("Started tracking unneeded node %s at %v", node.Name, timestamp)
47+
}
48+
}
49+
50+
for name, info := range t.nodes {
51+
if _, stillUnneeded := currentSet[name]; !stillUnneeded {
52+
if _, inDeletion := currentlyInDeletion[name]; !inDeletion {
53+
duration := timestamp.Sub(info.UnneededSince)
54+
metrics.UpdateScaleDownNodeDeletionDuration("false", duration-info.Threshold)
55+
delete(t.nodes, name)
56+
klog.V(2).Infof("Node %q reported as deleted/missing (unneeded for %s, threshold %s)",
57+
name, duration, info.Threshold)
4158
}
42-
klog.V(2).Infof("Started tracking unneeded node %s at %v with ScaleDownUnneededTime=%v",
43-
info.Name, info.UnneededSince, info.Threshold)
4459
}
4560
}
4661
}

cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker_test.go

Lines changed: 0 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -17,53 +17,10 @@ limitations under the License.
1717
package latencytracker
1818

1919
import (
20-
"sync"
2120
"testing"
2221
"time"
2322
)
2423

25-
func TestUpdateStateWithUnneededList_AddsNewNodes(t *testing.T) {
26-
tracker := NewNodeLatencyTracker()
27-
now := time.Now()
28-
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
29-
30-
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
31-
32-
if _, ok := tracker.nodes["node1"]; !ok {
33-
t.Errorf("expected node1 to be tracked, but was not")
34-
}
35-
}
36-
37-
func TestUpdateStateWithUnneededList_DoesNotDuplicate(t *testing.T) {
38-
tracker := NewNodeLatencyTracker()
39-
now := time.Now()
40-
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
41-
42-
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
43-
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now.Add(time.Minute))
44-
45-
if len(tracker.nodes) != 1 {
46-
t.Errorf("expected 1 tracked node, got %d", len(tracker.nodes))
47-
}
48-
}
49-
50-
func TestObserveDeletion_RemovesNode(t *testing.T) {
51-
tracker := NewNodeLatencyTracker()
52-
now := time.Now()
53-
node := NodeInfo{
54-
Name: "node1",
55-
UnneededSince: now.Add(-10 * time.Minute),
56-
Threshold: 5 * time.Minute,
57-
}
58-
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
59-
60-
tracker.ObserveDeletion("node1", now)
61-
62-
if _, ok := tracker.nodes["node1"]; ok {
63-
t.Errorf("expected node1 removed after ObserveDeletion")
64-
}
65-
}
66-
6724
func TestObserveDeletion_NoOpIfNodeNotTracked(t *testing.T) {
6825
tracker := NewNodeLatencyTracker()
6926
now := time.Now()
@@ -74,51 +31,3 @@ func TestObserveDeletion_NoOpIfNodeNotTracked(t *testing.T) {
7431
t.Errorf("expected no nodes tracked, got %d", len(tracker.nodes))
7532
}
7633
}
77-
78-
func TestConcurrentUpdatesAndDeletions(t *testing.T) {
79-
tracker := NewNodeLatencyTracker()
80-
now := time.Now()
81-
82-
node := NodeInfo{
83-
Name: "node1",
84-
UnneededSince: now,
85-
Threshold: 2 * time.Minute,
86-
}
87-
88-
var wg sync.WaitGroup
89-
stop := make(chan struct{})
90-
91-
wg.Add(1)
92-
go func() {
93-
defer wg.Done()
94-
for {
95-
select {
96-
case <-stop:
97-
return
98-
default:
99-
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, time.Now())
100-
}
101-
}
102-
}()
103-
104-
wg.Add(1)
105-
go func() {
106-
defer wg.Done()
107-
for {
108-
select {
109-
case <-stop:
110-
return
111-
default:
112-
tracker.ObserveDeletion("node1", time.Now())
113-
}
114-
}
115-
}()
116-
117-
time.Sleep(50 * time.Millisecond)
118-
close(stop)
119-
wg.Wait()
120-
121-
if len(tracker.nodes) > 1 {
122-
t.Errorf("expected at most 1 tracked node, got %d", len(tracker.nodes))
123-
}
124-
}

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*api
131131
podDestinations = filterOutOngoingDeletions(podDestinations, deletions)
132132
scaleDownCandidates = filterOutOngoingDeletions(scaleDownCandidates, deletions)
133133
p.categorizeNodes(asMap(nodeNames(podDestinations)), scaleDownCandidates)
134+
if p.nodeLatencyTracker != nil {
135+
p.nodeLatencyTracker.UpdateStateWithUnneededList(p.unneededNodes.AsList(), deletions, p.latestUpdate)
136+
}
134137
p.rs.DropOldHints()
135138
p.actuationInjector.DropOldHints()
136139
return nil
@@ -310,19 +313,6 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
310313
}
311314
}
312315
p.unneededNodes.Update(removableList, p.latestUpdate)
313-
if p.nodeLatencyTracker != nil {
314-
var unneededList []latencytracker.NodeInfo
315-
for _, n := range p.unneededNodes.AsList() {
316-
if threshold, ok := p.unneededNodes.GetUnneededTimeForNode(p.context, n.Name); ok {
317-
unneededList = append(unneededList, latencytracker.NodeInfo{
318-
Name: n.Name,
319-
UnneededSince: p.latestUpdate,
320-
Threshold: threshold,
321-
})
322-
}
323-
}
324-
p.nodeLatencyTracker.UpdateStateWithUnneededList(unneededList, p.latestUpdate)
325-
}
326316
if unremovableCount > 0 {
327317
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
328318
}

0 commit comments

Comments
 (0)