From 86a98d11aefd6a5f8d664212abbb353ab9eb1af0 Mon Sep 17 00:00:00 2001 From: Olzhas Shaikenov Date: Fri, 3 Oct 2025 12:04:07 +0000 Subject: [PATCH] feat(nodeScaleDownTimeTracker): add a new metric to track unprocessed nodes during scaleDown --- .../config/autoscaling_options.go | 5 + cluster-autoscaler/config/flags/flags.go | 2 + .../core/scaledown/planner/planner.go | 62 +++++++ .../core/scaledown/planner/planner_test.go | 172 ++++++++++++++++++ cluster-autoscaler/metrics/metrics.go | 17 ++ 5 files changed, 258 insertions(+) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index a144580989cd..47f76e1b2f2b 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -349,6 +349,11 @@ type AutoscalingOptions struct { CapacitybufferControllerEnabled bool // CapacitybufferPodInjectionEnabled tells if CA should injects fake pods for capacity buffers that are ready for provisioning CapacitybufferPodInjectionEnabled bool + // LongestNodeScaleDownTimeTrackerEnabled is used to enabled/disable the tracking of longest node ScaleDown evaluation time. + // We want to track all the nodes that were marked as unneeded, but were unprocessed during the ScaleDown. + // If a node was unneeded, but unprocessed multiple times consecutively, we store only the earliest time it happened. + // The difference between the current time and the earliest time among all unprocessed nodes will give the longest time + LongestNodeScaleDownTimeTrackerEnabled bool } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/config/flags/flags.go b/cluster-autoscaler/config/flags/flags.go index 0f7209ebbb1b..7b04de783524 100644 --- a/cluster-autoscaler/config/flags/flags.go +++ b/cluster-autoscaler/config/flags/flags.go @@ -230,6 +230,7 @@ var ( nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive") capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not") capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly") + longestNodeScaleDownTimeTrackerEnabled = flag.Bool("longest-node-scaledown-timetracker-enabled", false, "Whether to track the eval time of longestNodeScaleDown") // Deprecated flags ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)") @@ -414,6 +415,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL, CapacitybufferControllerEnabled: *capacitybufferControllerEnabled, CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled, + LongestNodeScaleDownTimeTrackerEnabled: *longestNodeScaleDownTimeTrackerEnabled, } } diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index 2e2263fe84e0..4fcec9ee53f7 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -30,6 +30,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" + "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/simulator" @@ -76,6 +77,7 @@ type Planner struct { cc controllerReplicasCalculator scaleDownSetProcessor nodes.ScaleDownSetProcessor scaleDownContext *nodes.ScaleDownContext + longestNodeScaleDownT *longestNodeScaleDownTime } // New creates a new Planner object. @@ -91,6 +93,11 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling unneededNodes.LoadFromExistingTaints(context.ListerRegistry, time.Now(), context.AutoscalingOptions.NodeDeletionCandidateTTL) } + var longestNodeScaleDownTime *longestNodeScaleDownTime + if context.AutoscalingOptions.LongestNodeScaleDownTimeTrackerEnabled { + longestNodeScaleDownTime = newLongestNodeScaleDownTime() + } + return &Planner{ context: context, unremovableNodes: unremovable.NewNodes(), @@ -104,6 +111,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling scaleDownSetProcessor: processors.ScaleDownSetProcessor, scaleDownContext: nodes.NewDefaultScaleDownContext(), minUpdateInterval: minUpdateInterval, + longestNodeScaleDownT: longestNodeScaleDownTime, } } @@ -277,13 +285,16 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand } p.nodeUtilizationMap = utilizationMap timer := time.NewTimer(p.context.ScaleDownSimulationTimeout) + endedPrematurely := false for i, node := range currentlyUnneededNodeNames { if timedOut(timer) { + p.processUnneededNodes(currentlyUnneededNodeNames[i:], time.Now(), &endedPrematurely) klog.Warningf("%d out of %d nodes skipped in scale down simulation due to timeout.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames)) break } if len(removableList)-atomicScaleDownNodesCount >= p.unneededNodesLimit() { + p.processUnneededNodes(currentlyUnneededNodeNames[i:], time.Now(), &endedPrematurely) klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more. Total atomic scale down nodes: %d", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList), atomicScaleDownNodesCount) break } @@ -306,6 +317,7 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand p.unremovableNodes.AddTimeout(unremovable, unremovableTimeout) } } + p.processUnneededNodes(nil, time.Now(), &endedPrematurely) p.unneededNodes.Update(removableList, p.latestUpdate) if unremovableCount > 0 { klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout) @@ -435,3 +447,53 @@ func timedOut(timer *time.Timer) bool { return false } } + +func (p *Planner) processUnneededNodes(currentlyUnneededNodeNames []string, currentTime time.Time, endedPrematurely *bool) { + // if p.longestNodeScaleDownT is not set or endedPrematurely is already true do not do anything -> LongestNodeScaleDownTime is not enabled + // or we already calculated the longest duration in this iteration + // if endedPrematurely is false -> all unneededNodes were successfully simulated and the longest evaluation time is 0 + if p.longestNodeScaleDownT == nil || *endedPrematurely { + return + } + *endedPrematurely = true + p.longestNodeScaleDownT.update(currentlyUnneededNodeNames, currentTime) +} + +type longestNodeScaleDownTime struct { + defaultTime time.Time + nodeNamesWithTimeStamps map[string]time.Time +} + +func newLongestNodeScaleDownTime() *longestNodeScaleDownTime { + return &longestNodeScaleDownTime{} +} + +func (l *longestNodeScaleDownTime) get(nodeName string) time.Time { + if _, ok := l.nodeNamesWithTimeStamps[nodeName]; ok { + return l.nodeNamesWithTimeStamps[nodeName] + } + return l.defaultTime +} + +func (l *longestNodeScaleDownTime) update(nodeNames []string, currentTime time.Time) { + // if all nodes were processed we need to report time 0 + if nodeNames == nil { + l.nodeNamesWithTimeStamps = make(map[string]time.Time) + metrics.ObserveLongestNodeScaleDownTime(0) + return + } + newNodes := make(map[string]time.Time, len(nodeNames)) + l.defaultTime = currentTime + minimumTime := l.defaultTime + for _, nodeName := range nodeNames { + // if a node is not in nodeNamesWithTimeStamps use current time + // if a node is already in nodeNamesWithTimeStamps copy the last value + valueFromPrevIter := l.get(nodeName) + newNodes[nodeName] = valueFromPrevIter + if minimumTime.Compare(valueFromPrevIter) > 0 { + minimumTime = valueFromPrevIter + } + } + l.nodeNamesWithTimeStamps = newNodes + metrics.ObserveLongestNodeScaleDownTime(currentTime.Sub(minimumTime)) +} diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go index 051a5f591645..f27f9744ba06 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner_test.go +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -1035,6 +1035,178 @@ func TestNodesToDelete(t *testing.T) { } } +func TestLongestNodeScaleDownTime(t *testing.T) { + testCases := []struct { + name string + timedOutNodesCount int + nodes int + unneededNodes1 []string + unneededNodes2 []string + run int + }{ + { + name: "Test to check the functionality of planner's longestNodeScaleDownT", + timedOutNodesCount: 4, + nodes: 6, + unneededNodes1: []string{"n0", "n1", "n2", "n3"}, + unneededNodes2: []string{"n2", "n3", "n4", "n5"}, + run: 2, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + nodes := make([]*apiv1.Node, tc.nodes) + for i := 0; i < tc.nodes; i++ { + nodes[i] = BuildTestNode(fmt.Sprintf("n%d", i), 1000, 10) + } + provider := testprovider.NewTestCloudProviderBuilder().Build() + provider.AddNodeGroup("ng1", 0, 0, 0) + for _, node := range nodes { + provider.AddNode("ng1", node) + } + context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: 1 * time.Minute, + }, + ScaleDownSimulationTimeout: 1 * time.Hour, + MaxScaleDownParallelism: 10, + LongestNodeScaleDownTimeTrackerEnabled: true, + }, &fake.Clientset{}, nil, provider, nil, nil) + assert.NoError(t, err) + clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil) + deleteOptions := options.NodeDeleteOptions{} + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + + var start time.Time + var timestamp time.Time + + // take nodes "n0" - "n3" as unneeded + unneededNodeNames := tc.unneededNodes1 + p.longestNodeScaleDownT.update(unneededNodeNames, timestamp) + for i := 0; i < tc.run; i++ { + timestamp = timestamp.Add(1 * time.Second) + // try to update the map for unneeded nodes with greater ts than the one already present + p.longestNodeScaleDownT.update(unneededNodeNames, timestamp) + assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), tc.timedOutNodesCount) + for _, val := range p.longestNodeScaleDownT.nodeNamesWithTimeStamps { + // check that timestamp for all unneeded nodes is still 0s + assert.Equal(t, val, start) + } + } + // take nodes "n2" - "n5" as unneeded + unneededNodeNames = tc.unneededNodes2 + // timestamp is 2s now + p.longestNodeScaleDownT.update(unneededNodeNames, timestamp) + // check that for n0 and n1 we will get the default time, for n2 and n3 - start time, and for n4 and n5 - current ts + assert.Equal(t, p.longestNodeScaleDownT.get("n0"), p.longestNodeScaleDownT.defaultTime) + assert.Equal(t, p.longestNodeScaleDownT.get("n1"), p.longestNodeScaleDownT.defaultTime) + assert.Equal(t, p.longestNodeScaleDownT.get("n2"), start) + assert.Equal(t, p.longestNodeScaleDownT.get("n3"), start) + assert.Equal(t, p.longestNodeScaleDownT.get("n4"), timestamp) + assert.Equal(t, p.longestNodeScaleDownT.get("n5"), timestamp) + }) + } +} + +func TestLongestNodeScaleDownTimeWithTimeout(t *testing.T) { + testCases := []struct { + name string + nodes []*apiv1.Node + actuationStatus *fakeActuationStatus + eligible []string + maxParallel int + isSimulationTimeout bool + unprocessedNodes int + isFlagEnabled bool + }{ + { + name: "Unneeded node limit is exceeded", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + }, + actuationStatus: &fakeActuationStatus{}, + eligible: []string{"n1", "n2"}, + maxParallel: 0, + isSimulationTimeout: false, + // maxParallel=0 forces p.unneededNodesLimit() to be 0, so we will break in the second check inside p.categorizeNodes() right away + unprocessedNodes: 2, + isFlagEnabled: true, + }, + { + name: "Simulation timeout is hit", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + }, + actuationStatus: &fakeActuationStatus{}, + eligible: []string{"n1", "n2"}, + maxParallel: 1, + isSimulationTimeout: true, + // first node will be deleted and for the second timeout will be triggered + unprocessedNodes: 1, + isFlagEnabled: true, + }, + { + name: "longestLastScaleDownEvalDuration flag is disabled", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + }, + actuationStatus: &fakeActuationStatus{}, + eligible: []string{"n1", "n2"}, + maxParallel: 1, + isSimulationTimeout: false, + isFlagEnabled: false, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil) + provider := testprovider.NewTestCloudProviderBuilder().Build() + provider.AddNodeGroup("ng1", 0, 0, 0) + for _, node := range tc.nodes { + provider.AddNode("ng1", node) + } + context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: 10 * time.Minute, + }, + ScaleDownSimulationTimeout: 1 * time.Second, + MaxScaleDownParallelism: tc.maxParallel, + LongestNodeScaleDownTimeTrackerEnabled: tc.isFlagEnabled, + }, &fake.Clientset{}, registry, provider, nil, nil) + assert.NoError(t, err) + clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, nil) + deleteOptions := options.NodeDeleteOptions{} + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)} + if tc.isSimulationTimeout { + context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second + rs := &fakeRemovalSimulator{ + nodes: tc.nodes, + sleep: 2 * time.Second, + } + p.rs = rs + } + assert.NoError(t, p.UpdateClusterState(tc.nodes, tc.nodes, &fakeActuationStatus{}, time.Now())) + if !tc.isFlagEnabled { + // if flag is disabled p.longestNodeScaleDownT is not initialized + assert.Nil(t, p.longestNodeScaleDownT) + } else { + assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), tc.unprocessedNodes) + } + }) + } +} + func sizedNodeGroup(id string, size int, atomic bool) cloudprovider.NodeGroup { ng := testprovider.NewTestNodeGroup(id, 10000, 0, size, true, false, "n1-standard-2", nil, nil) ng.SetOptions(&config.NodeGroupAutoscalingOptions{ diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index 6739ebb4d2b7..0dd2bf99690c 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -425,6 +425,14 @@ var ( Buckets: k8smetrics.ExponentialBuckets(1, 2, 6), // 1, 2, 4, ..., 32 }, []string{"instance_type", "cpu_count", "namespace_count"}, ) + + longestLastScaleDownEvalDuration = k8smetrics.NewGauge( + &k8smetrics.GaugeOpts{ + Namespace: caNamespace, + Name: "longest_scale_down_eval", + Help: "Longest node evaluation time during ScaleDown.", + }, + ) ) // RegisterAll registers all metrics. @@ -461,6 +469,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) { legacyregistry.MustRegister(nodeTaintsCount) legacyregistry.MustRegister(inconsistentInstancesMigsCount) legacyregistry.MustRegister(binpackingHeterogeneity) + legacyregistry.MustRegister(longestLastScaleDownEvalDuration) if emitPerNodeGroupMetrics { legacyregistry.MustRegister(nodesGroupMinNodes) @@ -748,3 +757,11 @@ func UpdateInconsistentInstancesMigsCount(migCount int) { func ObserveBinpackingHeterogeneity(instanceType, cpuCount, namespaceCount string, pegCount int) { binpackingHeterogeneity.WithLabelValues(instanceType, cpuCount, namespaceCount).Observe(float64(pegCount)) } + +// ObserveLongestNodeScaleDownTime records the longest time that passed from the first leaving a node unprocessed till now. +// If a node is unneeded, but unprocessed consecutively multiple times, we store only the earliest timestamp. +// Here we report the difference between current time and the earliest time among all unprocessed nodes in current ScaleDown iteration +// If we never timedOut in categorizeNodes() or never exceeded p.unneededNodesLimit(), this value will be 0 +func ObserveLongestNodeScaleDownTime(duration time.Duration) { + longestLastScaleDownEvalDuration.Set(float64(duration)) +}