Skip to content

Commit 18a9125

Browse files
committed
Node removal latency metrics added
1 parent af53141 commit 18a9125

File tree

12 files changed

+324
-29
lines changed

12 files changed

+324
-29
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ type AutoscalingOptions struct {
345345
// NodeDeletionCandidateTTL is the maximum time a node can be marked as removable without being deleted.
346346
// This is used to prevent nodes from being stuck in the removable state during if the CA deployment becomes inactive.
347347
NodeDeletionCandidateTTL time.Duration
348+
// NodeLatencyTrackingEnabled is used to enable/disable node latency tracking.
349+
NodeLatencyTrackingEnabled bool
348350
}
349351

350352
// KubeClientOptions specify options for kube client

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ var (
228228
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
229229
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
230230
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
231+
nodeLatencyTrackingEnabled = flag.Bool("enable-node-latency-tracking", false, "Whether logic for monitoring of node latency is enabled.")
231232

232233
// Deprecated flags
233234
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
@@ -410,6 +411,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
410411
ProactiveScaleupEnabled: *proactiveScaleupEnabled,
411412
PodInjectionLimit: *podInjectionLimit,
412413
NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL,
414+
NodeLatencyTrackingEnabled: *nodeLatencyTrackingEnabled,
413415
}
414416
}
415417

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2828
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
2929
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
30+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
3031
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3132
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
3233
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
@@ -58,6 +59,7 @@ const (
5859
type Actuator struct {
5960
ctx *context.AutoscalingContext
6061
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
62+
nodeLatencyTracker *latencytracker.NodeLatencyTracker
6163
nodeDeletionScheduler *GroupDeletionScheduler
6264
deleteOptions options.NodeDeleteOptions
6365
drainabilityRules rules.Rules
@@ -78,7 +80,7 @@ type actuatorNodeGroupConfigGetter interface {
7880
}
7981

8082
// NewActuator returns a new instance of Actuator.
81-
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
83+
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, nlt *latencytracker.NodeLatencyTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
8284
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
8385
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
8486
var evictor Evictor
@@ -90,6 +92,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
9092
return &Actuator{
9193
ctx: ctx,
9294
nodeDeletionTracker: ndt,
95+
nodeLatencyTracker: nlt,
9396
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
9497
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
9598
deleteOptions: deleteOptions,
@@ -324,6 +327,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
324327
}
325328

326329
for _, node := range nodes {
330+
if a.nodeLatencyTracker != nil {
331+
a.nodeLatencyTracker.ObserveDeletion(node.Name, time.Now())
332+
}
327333
nodeInfo, err := clusterSnapshot.GetNodeInfo(node.Name)
328334
if err != nil {
329335
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerErrorf(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)}

cluster-autoscaler/core/scaledown/actuation/actuator_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"k8s.io/autoscaler/cluster-autoscaler/config"
4040
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
4141
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
42+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
4243
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
4344
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
4445
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
@@ -1279,6 +1280,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) {
12791280
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
12801281
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
12811282
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
1283+
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
12821284
}
12831285

12841286
var gotResult status.ScaleDownResult
@@ -1557,6 +1559,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
15571559
ctx: &ctx, nodeDeletionTracker: ndt,
15581560
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
15591561
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
1562+
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
15601563
}
15611564

15621565
for _, nodes := range deleteNodes {
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing,
11+
software distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package latencytracker
18+
19+
import (
20+
"sync"
21+
"testing"
22+
"time"
23+
)
24+
25+
func TestUpdateStateWithUnneededList_AddsNewNodes(t *testing.T) {
26+
tracker := NewNodeLatencyTracker()
27+
now := time.Now()
28+
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
29+
30+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
31+
32+
tracker.Lock()
33+
defer tracker.Unlock()
34+
if _, ok := tracker.nodes["node1"]; !ok {
35+
t.Errorf("expected node1 to be tracked, but was not")
36+
}
37+
}
38+
39+
func TestUpdateStateWithUnneededList_DoesNotDuplicate(t *testing.T) {
40+
tracker := NewNodeLatencyTracker()
41+
now := time.Now()
42+
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
43+
44+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
45+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now.Add(time.Minute))
46+
47+
tracker.Lock()
48+
defer tracker.Unlock()
49+
if len(tracker.nodes) != 1 {
50+
t.Errorf("expected 1 tracked node, got %d", len(tracker.nodes))
51+
}
52+
}
53+
54+
func TestObserveDeletion_RemovesNode(t *testing.T) {
55+
tracker := NewNodeLatencyTracker()
56+
now := time.Now()
57+
node := NodeInfo{
58+
Name: "node1",
59+
UnneededSince: now.Add(-10 * time.Minute),
60+
Threshold: 5 * time.Minute,
61+
}
62+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
63+
64+
tracker.ObserveDeletion("node1", now)
65+
66+
tracker.Lock()
67+
defer tracker.Unlock()
68+
if _, ok := tracker.nodes["node1"]; ok {
69+
t.Errorf("expected node1 removed after ObserveDeletion")
70+
}
71+
}
72+
73+
func TestObserveDeletion_NoOpIfNodeNotTracked(t *testing.T) {
74+
tracker := NewNodeLatencyTracker()
75+
now := time.Now()
76+
77+
tracker.ObserveDeletion("node1", now)
78+
79+
tracker.Lock()
80+
defer tracker.Unlock()
81+
if len(tracker.nodes) != 0 {
82+
t.Errorf("expected no nodes tracked, got %d", len(tracker.nodes))
83+
}
84+
}
85+
86+
func TestConcurrentUpdatesAndDeletions(t *testing.T) {
87+
tracker := NewNodeLatencyTracker()
88+
now := time.Now()
89+
90+
node := NodeInfo{
91+
Name: "node1",
92+
UnneededSince: now,
93+
Threshold: 2 * time.Minute,
94+
}
95+
96+
var wg sync.WaitGroup
97+
stop := make(chan struct{})
98+
99+
wg.Add(1)
100+
go func() {
101+
defer wg.Done()
102+
for {
103+
select {
104+
case <-stop:
105+
return
106+
default:
107+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, time.Now())
108+
}
109+
}
110+
}()
111+
112+
wg.Add(1)
113+
go func() {
114+
defer wg.Done()
115+
for {
116+
select {
117+
case <-stop:
118+
return
119+
default:
120+
tracker.ObserveDeletion("node1", time.Now())
121+
}
122+
}
123+
}()
124+
125+
time.Sleep(50 * time.Millisecond)
126+
close(stop)
127+
wg.Wait()
128+
129+
tracker.Lock()
130+
defer tracker.Unlock()
131+
if len(tracker.nodes) > 1 {
132+
t.Errorf("expected at most 1 tracked node, got %d", len(tracker.nodes))
133+
}
134+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package latencytracker
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"k8s.io/autoscaler/cluster-autoscaler/metrics"
8+
9+
"k8s.io/klog/v2"
10+
)
11+
12+
type NodeInfo struct {
13+
Name string
14+
UnneededSince time.Time
15+
Threshold time.Duration
16+
}
17+
18+
type NodeLatencyTracker struct {
19+
sync.Mutex
20+
nodes map[string]NodeInfo
21+
}
22+
23+
// NewNodeLatencyTracker creates a new tracker.
24+
func NewNodeLatencyTracker() *NodeLatencyTracker {
25+
return &NodeLatencyTracker{
26+
nodes: make(map[string]NodeInfo),
27+
}
28+
}
29+
30+
func (t *NodeLatencyTracker) UpdateStateWithUnneededList(list []NodeInfo, timestamp time.Time) {
31+
t.Lock()
32+
defer t.Unlock()
33+
34+
currentSet := make(map[string]struct{}, len(list))
35+
for _, info := range list {
36+
currentSet[info.Name] = struct{}{}
37+
_, exists := t.nodes[info.Name]
38+
if !exists {
39+
t.nodes[info.Name] = NodeInfo{
40+
Name: info.Name,
41+
UnneededSince: info.UnneededSince,
42+
Threshold: info.Threshold,
43+
}
44+
klog.V(2).Infof("Started tracking unneeded node %s at %v with ScaleDownUnneededTime=%v",
45+
info.Name, info.UnneededSince, info.Threshold)
46+
}
47+
}
48+
}
49+
50+
// ObserveDeletion is called by the actuator just before node deletion.
51+
func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) {
52+
t.Lock()
53+
defer t.Unlock()
54+
55+
if info, exists := t.nodes[nodeName]; exists {
56+
duration := timestamp.Sub(info.UnneededSince)
57+
58+
klog.V(2).Infof(
59+
"Observing deletion for node %s, unneeded for %s (threshold was %s).",
60+
nodeName, duration, info.Threshold,
61+
)
62+
63+
metrics.UpdateScaleDownNodeDeletionDuration("true", duration-info.Threshold)
64+
delete(t.nodes, nodeName)
65+
}
66+
}

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/autoscaler/cluster-autoscaler/context"
2727
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2828
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
29+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
2930
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3031
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
3132
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
@@ -76,10 +77,11 @@ type Planner struct {
7677
cc controllerReplicasCalculator
7778
scaleDownSetProcessor nodes.ScaleDownSetProcessor
7879
scaleDownContext *nodes.ScaleDownContext
80+
nodeLatencyTracker *latencytracker.NodeLatencyTracker
7981
}
8082

8183
// New creates a new Planner object.
82-
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner {
84+
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, nlt *latencytracker.NodeLatencyTracker) *Planner {
8385
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
8486
minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime
8587
if minUpdateInterval == 0*time.Nanosecond {
@@ -104,6 +106,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
104106
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
105107
scaleDownContext: nodes.NewDefaultScaleDownContext(),
106108
minUpdateInterval: minUpdateInterval,
109+
nodeLatencyTracker: nlt,
107110
}
108111
}
109112

@@ -307,6 +310,19 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
307310
}
308311
}
309312
p.unneededNodes.Update(removableList, p.latestUpdate)
313+
if p.nodeLatencyTracker != nil {
314+
var unneededList []latencytracker.NodeInfo
315+
for _, n := range p.unneededNodes.AsList() {
316+
if threshold, ok := p.unneededNodes.GetUnneededTimeForNode(p.context, n.Name); ok {
317+
unneededList = append(unneededList, latencytracker.NodeInfo{
318+
Name: n.Name,
319+
UnneededSince: p.latestUpdate,
320+
Threshold: threshold,
321+
})
322+
}
323+
}
324+
p.nodeLatencyTracker.UpdateStateWithUnneededList(unneededList, p.latestUpdate)
325+
}
310326
if unremovableCount > 0 {
311327
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
312328
}

cluster-autoscaler/core/scaledown/planner/planner_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"k8s.io/autoscaler/cluster-autoscaler/config"
3333
"k8s.io/autoscaler/cluster-autoscaler/context"
3434
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
35+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
3536
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3637
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
3738
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
@@ -503,7 +504,7 @@ func TestUpdateClusterState(t *testing.T) {
503504
assert.NoError(t, err)
504505
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
505506
deleteOptions := options.NodeDeleteOptions{}
506-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
507+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
507508
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
508509
if tc.isSimulationTimeout {
509510
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
@@ -699,7 +700,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
699700
assert.NoError(t, err)
700701
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
701702
deleteOptions := options.NodeDeleteOptions{}
702-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
703+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
703704
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
704705
p.minUpdateInterval = tc.updateInterval
705706
p.unneededNodes.Update(previouslyUnneeded, time.Now())
@@ -998,7 +999,7 @@ func TestNodesToDelete(t *testing.T) {
998999
assert.NoError(t, err)
9991000
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
10001001
deleteOptions := options.NodeDeleteOptions{}
1001-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
1002+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
10021003
p.latestUpdate = time.Now()
10031004
p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
10041005
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))

0 commit comments

Comments
 (0)