Skip to content

Commit 30a2fc9

Browse files
committed
Node removal latency metrics added
1 parent 94637a2 commit 30a2fc9

File tree

12 files changed

+324
-29
lines changed

12 files changed

+324
-29
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,8 @@ type AutoscalingOptions struct {
349349
CapacitybufferControllerEnabled bool
350350
// CapacitybufferPodInjectionEnabled tells if CA should injects fake pods for capacity buffers that are ready for provisioning
351351
CapacitybufferPodInjectionEnabled bool
352+
// NodeLatencyTrackingEnabled is used to enable/disable node latency tracking.
353+
NodeLatencyTrackingEnabled bool
352354
}
353355

354356
// KubeClientOptions specify options for kube client

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ var (
230230
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
231231
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
232232
capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly")
233+
nodeLatencyTrackingEnabled = flag.Bool("enable-node-latency-tracking", false, "Whether logic for monitoring of node latency is enabled.")
233234

234235
// Deprecated flags
235236
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
@@ -414,6 +415,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
414415
NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL,
415416
CapacitybufferControllerEnabled: *capacitybufferControllerEnabled,
416417
CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled,
418+
NodeLatencyTrackingEnabled: *nodeLatencyTrackingEnabled,
417419
}
418420
}
419421

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2828
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
2929
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
30+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
3031
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3132
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
3233
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
@@ -58,6 +59,7 @@ const (
5859
type Actuator struct {
5960
ctx *context.AutoscalingContext
6061
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
62+
nodeLatencyTracker *latencytracker.NodeLatencyTracker
6163
nodeDeletionScheduler *GroupDeletionScheduler
6264
deleteOptions options.NodeDeleteOptions
6365
drainabilityRules rules.Rules
@@ -78,7 +80,7 @@ type actuatorNodeGroupConfigGetter interface {
7880
}
7981

8082
// NewActuator returns a new instance of Actuator.
81-
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
83+
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, nlt *latencytracker.NodeLatencyTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
8284
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
8385
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
8486
var evictor Evictor
@@ -90,6 +92,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
9092
return &Actuator{
9193
ctx: ctx,
9294
nodeDeletionTracker: ndt,
95+
nodeLatencyTracker: nlt,
9396
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
9497
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
9598
deleteOptions: deleteOptions,
@@ -324,6 +327,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
324327
}
325328

326329
for _, node := range nodes {
330+
if a.nodeLatencyTracker != nil {
331+
a.nodeLatencyTracker.ObserveDeletion(node.Name, time.Now())
332+
}
327333
nodeInfo, err := clusterSnapshot.GetNodeInfo(node.Name)
328334
if err != nil {
329335
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerErrorf(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)}

cluster-autoscaler/core/scaledown/actuation/actuator_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"k8s.io/autoscaler/cluster-autoscaler/config"
4040
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
4141
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
42+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
4243
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
4344
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
4445
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
@@ -1279,6 +1280,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) {
12791280
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
12801281
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
12811282
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
1283+
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
12821284
}
12831285

12841286
var gotResult status.ScaleDownResult
@@ -1557,6 +1559,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
15571559
ctx: &ctx, nodeDeletionTracker: ndt,
15581560
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
15591561
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
1562+
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
15601563
}
15611564

15621565
for _, nodes := range deleteNodes {
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing,
11+
software distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package latencytracker
18+
19+
import (
20+
"sync"
21+
"testing"
22+
"time"
23+
)
24+
25+
func TestUpdateStateWithUnneededList_AddsNewNodes(t *testing.T) {
26+
tracker := NewNodeLatencyTracker()
27+
now := time.Now()
28+
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
29+
30+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
31+
32+
tracker.Lock()
33+
defer tracker.Unlock()
34+
if _, ok := tracker.nodes["node1"]; !ok {
35+
t.Errorf("expected node1 to be tracked, but was not")
36+
}
37+
}
38+
39+
func TestUpdateStateWithUnneededList_DoesNotDuplicate(t *testing.T) {
40+
tracker := NewNodeLatencyTracker()
41+
now := time.Now()
42+
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
43+
44+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
45+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now.Add(time.Minute))
46+
47+
tracker.Lock()
48+
defer tracker.Unlock()
49+
if len(tracker.nodes) != 1 {
50+
t.Errorf("expected 1 tracked node, got %d", len(tracker.nodes))
51+
}
52+
}
53+
54+
func TestObserveDeletion_RemovesNode(t *testing.T) {
55+
tracker := NewNodeLatencyTracker()
56+
now := time.Now()
57+
node := NodeInfo{
58+
Name: "node1",
59+
UnneededSince: now.Add(-10 * time.Minute),
60+
Threshold: 5 * time.Minute,
61+
}
62+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
63+
64+
tracker.ObserveDeletion("node1", now)
65+
66+
tracker.Lock()
67+
defer tracker.Unlock()
68+
if _, ok := tracker.nodes["node1"]; ok {
69+
t.Errorf("expected node1 removed after ObserveDeletion")
70+
}
71+
}
72+
73+
func TestObserveDeletion_NoOpIfNodeNotTracked(t *testing.T) {
74+
tracker := NewNodeLatencyTracker()
75+
now := time.Now()
76+
77+
tracker.ObserveDeletion("node1", now)
78+
79+
tracker.Lock()
80+
defer tracker.Unlock()
81+
if len(tracker.nodes) != 0 {
82+
t.Errorf("expected no nodes tracked, got %d", len(tracker.nodes))
83+
}
84+
}
85+
86+
func TestConcurrentUpdatesAndDeletions(t *testing.T) {
87+
tracker := NewNodeLatencyTracker()
88+
now := time.Now()
89+
90+
node := NodeInfo{
91+
Name: "node1",
92+
UnneededSince: now,
93+
Threshold: 2 * time.Minute,
94+
}
95+
96+
var wg sync.WaitGroup
97+
stop := make(chan struct{})
98+
99+
wg.Add(1)
100+
go func() {
101+
defer wg.Done()
102+
for {
103+
select {
104+
case <-stop:
105+
return
106+
default:
107+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, time.Now())
108+
}
109+
}
110+
}()
111+
112+
wg.Add(1)
113+
go func() {
114+
defer wg.Done()
115+
for {
116+
select {
117+
case <-stop:
118+
return
119+
default:
120+
tracker.ObserveDeletion("node1", time.Now())
121+
}
122+
}
123+
}()
124+
125+
time.Sleep(50 * time.Millisecond)
126+
close(stop)
127+
wg.Wait()
128+
129+
tracker.Lock()
130+
defer tracker.Unlock()
131+
if len(tracker.nodes) > 1 {
132+
t.Errorf("expected at most 1 tracked node, got %d", len(tracker.nodes))
133+
}
134+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package latencytracker
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"k8s.io/autoscaler/cluster-autoscaler/metrics"
8+
9+
"k8s.io/klog/v2"
10+
)
11+
12+
type NodeInfo struct {
13+
Name string
14+
UnneededSince time.Time
15+
Threshold time.Duration
16+
}
17+
18+
type NodeLatencyTracker struct {
19+
sync.Mutex
20+
nodes map[string]NodeInfo
21+
}
22+
23+
// NewNodeLatencyTracker creates a new tracker.
24+
func NewNodeLatencyTracker() *NodeLatencyTracker {
25+
return &NodeLatencyTracker{
26+
nodes: make(map[string]NodeInfo),
27+
}
28+
}
29+
30+
func (t *NodeLatencyTracker) UpdateStateWithUnneededList(list []NodeInfo, timestamp time.Time) {
31+
t.Lock()
32+
defer t.Unlock()
33+
34+
currentSet := make(map[string]struct{}, len(list))
35+
for _, info := range list {
36+
currentSet[info.Name] = struct{}{}
37+
_, exists := t.nodes[info.Name]
38+
if !exists {
39+
t.nodes[info.Name] = NodeInfo{
40+
Name: info.Name,
41+
UnneededSince: info.UnneededSince,
42+
Threshold: info.Threshold,
43+
}
44+
klog.V(2).Infof("Started tracking unneeded node %s at %v with ScaleDownUnneededTime=%v",
45+
info.Name, info.UnneededSince, info.Threshold)
46+
}
47+
}
48+
}
49+
50+
// ObserveDeletion is called by the actuator just before node deletion.
51+
func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) {
52+
t.Lock()
53+
defer t.Unlock()
54+
55+
if info, exists := t.nodes[nodeName]; exists {
56+
duration := timestamp.Sub(info.UnneededSince)
57+
58+
klog.V(2).Infof(
59+
"Observing deletion for node %s, unneeded for %s (threshold was %s).",
60+
nodeName, duration, info.Threshold,
61+
)
62+
63+
metrics.UpdateScaleDownNodeDeletionDuration("true", duration-info.Threshold)
64+
delete(t.nodes, nodeName)
65+
}
66+
}

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/autoscaler/cluster-autoscaler/context"
2727
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2828
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
29+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
2930
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3031
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
3132
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
@@ -76,10 +77,11 @@ type Planner struct {
7677
cc controllerReplicasCalculator
7778
scaleDownSetProcessor nodes.ScaleDownSetProcessor
7879
scaleDownContext *nodes.ScaleDownContext
80+
nodeLatencyTracker *latencytracker.NodeLatencyTracker
7981
}
8082

8183
// New creates a new Planner object.
82-
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner {
84+
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, nlt *latencytracker.NodeLatencyTracker) *Planner {
8385
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
8486
minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime
8587
if minUpdateInterval == 0*time.Nanosecond {
@@ -104,6 +106,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
104106
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
105107
scaleDownContext: nodes.NewDefaultScaleDownContext(),
106108
minUpdateInterval: minUpdateInterval,
109+
nodeLatencyTracker: nlt,
107110
}
108111
}
109112

@@ -307,6 +310,19 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
307310
}
308311
}
309312
p.unneededNodes.Update(removableList, p.latestUpdate)
313+
if p.nodeLatencyTracker != nil {
314+
var unneededList []latencytracker.NodeInfo
315+
for _, n := range p.unneededNodes.AsList() {
316+
if threshold, ok := p.unneededNodes.GetUnneededTimeForNode(p.context, n.Name); ok {
317+
unneededList = append(unneededList, latencytracker.NodeInfo{
318+
Name: n.Name,
319+
UnneededSince: p.latestUpdate,
320+
Threshold: threshold,
321+
})
322+
}
323+
}
324+
p.nodeLatencyTracker.UpdateStateWithUnneededList(unneededList, p.latestUpdate)
325+
}
310326
if unremovableCount > 0 {
311327
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
312328
}

cluster-autoscaler/core/scaledown/planner/planner_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"k8s.io/autoscaler/cluster-autoscaler/config"
3333
"k8s.io/autoscaler/cluster-autoscaler/context"
3434
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
35+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
3536
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3637
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
3738
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
@@ -503,7 +504,7 @@ func TestUpdateClusterState(t *testing.T) {
503504
assert.NoError(t, err)
504505
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
505506
deleteOptions := options.NodeDeleteOptions{}
506-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
507+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
507508
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
508509
if tc.isSimulationTimeout {
509510
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
@@ -699,7 +700,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
699700
assert.NoError(t, err)
700701
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
701702
deleteOptions := options.NodeDeleteOptions{}
702-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
703+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
703704
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
704705
p.minUpdateInterval = tc.updateInterval
705706
p.unneededNodes.Update(previouslyUnneeded, time.Now())
@@ -998,7 +999,7 @@ func TestNodesToDelete(t *testing.T) {
998999
assert.NoError(t, err)
9991000
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
10001001
deleteOptions := options.NodeDeleteOptions{}
1001-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
1002+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
10021003
p.latestUpdate = time.Now()
10031004
p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
10041005
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))

0 commit comments

Comments
 (0)