diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index a144580989cd..5a2c26b9cd7c 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -349,6 +349,8 @@ type AutoscalingOptions struct { CapacitybufferControllerEnabled bool // CapacitybufferPodInjectionEnabled tells if CA should injects fake pods for capacity buffers that are ready for provisioning CapacitybufferPodInjectionEnabled bool + // NodeRemovalLatencyTrackingEnabled is used to enable/disable node removal latency tracking. + NodeRemovalLatencyTrackingEnabled bool } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/config/flags/flags.go b/cluster-autoscaler/config/flags/flags.go index 0f7209ebbb1b..d60ba2c33885 100644 --- a/cluster-autoscaler/config/flags/flags.go +++ b/cluster-autoscaler/config/flags/flags.go @@ -230,6 +230,7 @@ var ( nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive") capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not") capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly") + nodeRemovalLatencyTrackingEnabled = flag.Bool("enable-node-removal-latency-tracking", false, "Whether to track latency from when a node is marked unneeded until it is removed or needed again.") // Deprecated flags ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)") @@ -414,6 +415,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL, CapacitybufferControllerEnabled: *capacitybufferControllerEnabled, CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled, + NodeRemovalLatencyTrackingEnabled: *nodeRemovalLatencyTrackingEnabled, } } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index 55ef2e5a8fa6..198c02230281 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -27,6 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/utils" @@ -58,6 +59,7 @@ const ( type Actuator struct { ctx *context.AutoscalingContext nodeDeletionTracker *deletiontracker.NodeDeletionTracker + nodeLatencyTracker latencytracker.LatencyTracker nodeDeletionScheduler *GroupDeletionScheduler deleteOptions options.NodeDeleteOptions drainabilityRules rules.Rules @@ -78,7 +80,7 @@ type actuatorNodeGroupConfigGetter interface { } // NewActuator returns a new instance of Actuator. -func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { +func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, nlt latencytracker.LatencyTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) var evictor Evictor @@ -90,6 +92,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch return &Actuator{ ctx: ctx, nodeDeletionTracker: ndt, + nodeLatencyTracker: nlt, nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), deleteOptions: deleteOptions, @@ -346,10 +349,16 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider if force { go a.nodeDeletionScheduler.scheduleForceDeletion(nodeInfo, nodeGroup, batchSize, drain) + if a.nodeLatencyTracker != nil { + a.nodeLatencyTracker.ObserveDeletion(node.Name, time.Now()) + } continue } go a.nodeDeletionScheduler.ScheduleDeletion(nodeInfo, nodeGroup, batchSize, drain) + if a.nodeLatencyTracker != nil { + a.nodeLatencyTracker.ObserveDeletion(node.Name, time.Now()) + } } } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index c2b6788f6247..35be2939ef7c 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -18,6 +18,7 @@ package actuation import ( "fmt" + "strings" "sync" "testing" "time" @@ -89,6 +90,21 @@ type startDeletionTestCase struct { wantNodeDeleteResults map[string]status.NodeDeleteResult } +// FakeLatencyTracker implements the same interface as NodeLatencyTracker +type fakeLatencyTracker struct { + ObservedNodes []string +} + +// ObserveDeletion simply records the node name +func (f *fakeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) { + f.ObservedNodes = append(f.ObservedNodes, nodeName) +} +func (f *fakeLatencyTracker) UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time) { +} +func (f *fakeLatencyTracker) UpdateThreshold(nodeName string, threshold time.Duration) {} + +func (f *fakeLatencyTracker) GetTrackedNodes() []string { return nil } + func getStartDeletionTestCases(ignoreDaemonSetsUtilization bool, force bool, suffix string) map[string]startDeletionTestCase { toBeDeletedTaint := apiv1.Taint{Key: taints.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule} @@ -1274,11 +1290,13 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) { ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, 0*time.Second) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig, fullDsEviction: force} + fakeNodeLatencyTracker := &fakeLatencyTracker{} actuator := Actuator{ ctx: &ctx, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults), + nodeLatencyTracker: fakeNodeLatencyTracker, } var gotResult status.ScaleDownResult @@ -1375,6 +1393,19 @@ taintsLoop: if diff := cmp.Diff(tc.wantNodeDeleteResults, nodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" { t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff) } + // Verify ObserveDeletion was called for all nodes that were actually deleted + for _, expectedNode := range tc.wantDeletedNodes { + found := false + for _, observed := range fakeNodeLatencyTracker.ObservedNodes { + if observed == expectedNode { + found = true + break + } + } + if !found { + t.Errorf("Expected ObserveDeletion to be called for node %s, but it wasn't", expectedNode) + } + } } func TestStartDeletion(t *testing.T) { @@ -1553,10 +1584,12 @@ func TestStartDeletionInBatchBasic(t *testing.T) { ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, deleteInterval) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig} + fakeNodeLatencyTracker := &fakeLatencyTracker{} actuator := Actuator{ ctx: &ctx, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), + nodeLatencyTracker: fakeNodeLatencyTracker, } for _, nodes := range deleteNodes { @@ -1584,6 +1617,33 @@ func TestStartDeletionInBatchBasic(t *testing.T) { if diff := cmp.Diff(test.wantSuccessfulDeletion, gotDeletedNodes); diff != "" { t.Errorf("Successful deleteions per node group diff (-want +got):\n%s", diff) } + for _, nodes := range deleteNodes { + for _, node := range nodes { + // Extract node group from node name + parts := strings.Split(node.Name, "-") + if len(parts) < 3 { + continue + } + ngName := strings.Join(parts[:2], "-") + + // Skip check if no successful deletions expected for this group + if test.wantSuccessfulDeletion[ngName] == 0 { + continue + } + + // Verify ObserveDeletion was called + found := false + for _, observedNode := range fakeNodeLatencyTracker.ObservedNodes { + if observedNode == node.Name { + found = true + break + } + } + if !found { + t.Errorf("Expected ObserveDeletion to be called for node %s", node.Name) + } + } + } }) } } diff --git a/cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker.go b/cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker.go new file mode 100644 index 000000000000..93a9c1514836 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker.go @@ -0,0 +1,120 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package latencytracker + +import ( + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/klog/v2" +) + +// LatencyTracker defines the interface for tracking node removal latency. +// Implementations record when nodes become unneeded, observe deletion events, +// and expose thresholds for measuring node removal duration. +type LatencyTracker interface { + ObserveDeletion(nodeName string, timestamp time.Time) + UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time) + UpdateThreshold(nodeName string, threshold time.Duration) + GetTrackedNodes() []string +} +type nodeInfo struct { + unneededSince time.Time + threshold time.Duration +} + +// NodeLatencyTracker is a concrete implementation of LatencyTracker. +// It keeps track of nodes that are marked as unneeded, when they became unneeded, +// and thresholds to adjust node removal latency metrics. +type NodeLatencyTracker struct { + nodes map[string]nodeInfo +} + +// NewNodeLatencyTracker creates a new tracker. +func NewNodeLatencyTracker() *NodeLatencyTracker { + return &NodeLatencyTracker{ + nodes: make(map[string]nodeInfo), + } +} + +// UpdateStateWithUnneededList records unneeded nodes and handles missing ones. +func (t *NodeLatencyTracker) UpdateStateWithUnneededList( + list []*apiv1.Node, + currentlyInDeletion map[string]bool, + timestamp time.Time, +) { + currentSet := make(map[string]struct{}, len(list)) + for _, node := range list { + currentSet[node.Name] = struct{}{} + + if _, exists := t.nodes[node.Name]; !exists { + t.nodes[node.Name] = nodeInfo{ + unneededSince: timestamp, + threshold: 0, + } + klog.V(4).Infof("Started tracking unneeded node %s at %v", node.Name, timestamp) + } + } + + for name, info := range t.nodes { + if _, stillUnneeded := currentSet[name]; !stillUnneeded { + if _, inDeletion := currentlyInDeletion[name]; !inDeletion { + duration := timestamp.Sub(info.unneededSince) + metrics.UpdateScaleDownNodeRemovalLatency(false, duration-info.threshold) + delete(t.nodes, name) + klog.V(4).Infof("Node %q reported as deleted/missing (unneeded for %s, threshold %s)", + name, duration, info.threshold) + } + } + } +} + +// ObserveDeletion is called by the actuator just before node deletion. +func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) { + if info, exists := t.nodes[nodeName]; exists { + duration := timestamp.Sub(info.unneededSince) + + klog.V(4).Infof( + "Observing deletion for node %s, unneeded for %s (threshold was %s).", + nodeName, duration, info.threshold, + ) + + metrics.UpdateScaleDownNodeRemovalLatency(true, duration-info.threshold) + delete(t.nodes, nodeName) + } +} + +// UpdateThreshold updates the scale-down threshold for a tracked node. +func (t *NodeLatencyTracker) UpdateThreshold(nodeName string, threshold time.Duration) { + if info, exists := t.nodes[nodeName]; exists { + info.threshold = threshold + t.nodes[nodeName] = info + klog.V(4).Infof("Updated threshold for node %q to %s", nodeName, threshold) + } else { + klog.Warningf("Attempted to update threshold for unknown node %q", nodeName) + } +} + +// GetTrackedNodes returns the names of all nodes currently tracked as unneeded. +func (t *NodeLatencyTracker) GetTrackedNodes() []string { + names := make([]string, 0, len(t.nodes)) + for name := range t.nodes { + names = append(names, name) + } + return names +} diff --git a/cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker_test.go b/cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker_test.go new file mode 100644 index 000000000000..45226a65ce89 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/latencytracker/node_latency_tracker_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package latencytracker + +import ( + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestNodeLatencyTracker(t *testing.T) { + baseTime := time.Now() + + tests := []struct { + name string + setupNodes map[string]nodeInfo + unneededList []string + currentlyInDeletion map[string]bool + updateThresholds map[string]time.Duration + observeDeletion []string + wantTrackedNodes []string + wantDeletionTimes map[string]time.Duration + }{ + { + name: "add new unneeded nodes", + setupNodes: map[string]nodeInfo{}, + unneededList: []string{"node1", "node2"}, + currentlyInDeletion: map[string]bool{}, + updateThresholds: map[string]time.Duration{}, + observeDeletion: []string{}, + wantTrackedNodes: []string{"node1", "node2"}, + }, + { + name: "observe deletion with threshold", + setupNodes: map[string]nodeInfo{ + "node1": {unneededSince: baseTime, threshold: 2 * time.Second}, + }, + unneededList: []string{}, + currentlyInDeletion: map[string]bool{}, + updateThresholds: map[string]time.Duration{}, + observeDeletion: []string{"node1"}, + wantTrackedNodes: []string{}, + wantDeletionTimes: map[string]time.Duration{ + "node1": 3 * time.Second, // simulate observation 5s after UnneededSince, threshold 2s + }, + }, + { + name: "remove unneeded node not in deletion", + setupNodes: map[string]nodeInfo{ + "node1": {unneededSince: baseTime, threshold: 1 * time.Second}, + "node2": {unneededSince: baseTime, threshold: 0}, + }, + unneededList: []string{"node2"}, // node1 is removed from unneeded + currentlyInDeletion: map[string]bool{}, + updateThresholds: map[string]time.Duration{}, + observeDeletion: []string{}, + wantTrackedNodes: []string{"node2"}, + wantDeletionTimes: map[string]time.Duration{ + "node1": 5*time.Second - 1*time.Second, // assume current timestamp baseTime+5s + }, + }, + { + name: "update threshold", + setupNodes: map[string]nodeInfo{ + "node1": {unneededSince: baseTime, threshold: 1 * time.Second}, + }, + unneededList: []string{"node1"}, + currentlyInDeletion: map[string]bool{}, + updateThresholds: map[string]time.Duration{ + "node1": 4 * time.Second, + }, + observeDeletion: []string{}, + wantTrackedNodes: []string{"node1"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tracker := NewNodeLatencyTracker() + for name, info := range tt.setupNodes { + tracker.nodes[name] = info + } + + for node, threshold := range tt.updateThresholds { + tracker.UpdateThreshold(node, threshold) + } + unneededNodes := make([]*apiv1.Node, len(tt.unneededList)) + for i, name := range tt.unneededList { + unneededNodes[i] = &apiv1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}} + } + // simulate current timestamp as baseTime + 5s + currentTime := baseTime.Add(5 * time.Second) + tracker.UpdateStateWithUnneededList(unneededNodes, tt.currentlyInDeletion, currentTime) + + // Observe deletions + for _, node := range tt.observeDeletion { + tracker.ObserveDeletion(node, currentTime) + } + + // Check tracked nodes + gotTracked := tracker.GetTrackedNodes() + expectedMap := make(map[string]struct{}) + for _, n := range tt.wantTrackedNodes { + expectedMap[n] = struct{}{} + } + for _, n := range gotTracked { + if _, ok := expectedMap[n]; !ok { + t.Errorf("unexpected tracked node %q", n) + } + delete(expectedMap, n) + } + for n := range expectedMap { + t.Errorf("expected node %q to be tracked, but was not", n) + } + + for node, expectedDuration := range tt.wantDeletionTimes { + info, ok := tt.setupNodes[node] + if !ok { + continue + } + duration := currentTime.Sub(info.unneededSince) - info.threshold + if duration != expectedDuration { + t.Errorf("node %q expected deletion duration %v, got %v", node, expectedDuration, duration) + } + } + }) + } +} diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index 2e2263fe84e0..f7259cb17c74 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -26,6 +26,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded" @@ -76,17 +77,18 @@ type Planner struct { cc controllerReplicasCalculator scaleDownSetProcessor nodes.ScaleDownSetProcessor scaleDownContext *nodes.ScaleDownContext + nodeLatencyTracker latencytracker.LatencyTracker } // New creates a new Planner object. -func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner { +func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, nlt latencytracker.LatencyTracker) *Planner { resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor) minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime if minUpdateInterval == 0*time.Nanosecond { minUpdateInterval = 1 * time.Nanosecond } - unneededNodes := unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder) + unneededNodes := unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder, nlt) if context.AutoscalingOptions.NodeDeletionCandidateTTL != 0 { unneededNodes.LoadFromExistingTaints(context.ListerRegistry, time.Now(), context.AutoscalingOptions.NodeDeletionCandidateTTL) } @@ -104,6 +106,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling scaleDownSetProcessor: processors.ScaleDownSetProcessor, scaleDownContext: nodes.NewDefaultScaleDownContext(), minUpdateInterval: minUpdateInterval, + nodeLatencyTracker: nlt, } } @@ -128,6 +131,9 @@ func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*api podDestinations = filterOutOngoingDeletions(podDestinations, deletions) scaleDownCandidates = filterOutOngoingDeletions(scaleDownCandidates, deletions) p.categorizeNodes(asMap(nodeNames(podDestinations)), scaleDownCandidates) + if p.nodeLatencyTracker != nil { + p.nodeLatencyTracker.UpdateStateWithUnneededList(p.unneededNodes.AsList(), deletions, p.latestUpdate) + } p.rs.DropOldHints() p.actuationInjector.DropOldHints() return nil diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go index 051a5f591645..50432930aa0c 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner_test.go +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" @@ -503,7 +504,7 @@ func TestUpdateClusterState(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker()) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)} if tc.isSimulationTimeout { context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second @@ -521,6 +522,13 @@ func TestUpdateClusterState(t *testing.T) { assert.Equal(t, wantUnneeded[n.Name], p.unneededNodes.Contains(n.Name), []string{n.Name, "unneeded"}) assert.Equal(t, wantUnremovable[n.Name], p.unremovableNodes.Contains(n.Name), []string{n.Name, "unremovable"}) } + tracked := p.nodeLatencyTracker.GetTrackedNodes() + for _, name := range tc.wantUnneeded { + assert.Contains(t, tracked, name, "expected node in latency tracker") + } + for _, name := range tc.wantUnremovable { + assert.NotContains(t, tracked, name, "expected node not in latency tracker") + } }) } } @@ -699,7 +707,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker()) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))} p.minUpdateInterval = tc.updateInterval p.unneededNodes.Update(previouslyUnneeded, time.Now()) @@ -833,7 +841,7 @@ func TestNewPlannerWithExistingDeletionCandidateNodes(t *testing.T) { assert.NoError(t, err) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, nil) p.unneededNodes.AsList() }) @@ -1023,7 +1031,7 @@ func TestNodesToDelete(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker()) p.latestUpdate = time.Now() p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second) p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour)) diff --git a/cluster-autoscaler/core/scaledown/unneeded/nodes.go b/cluster-autoscaler/core/scaledown/unneeded/nodes.go index ba1ad8e4d7cc..83d2db057a74 100644 --- a/cluster-autoscaler/core/scaledown/unneeded/nodes.go +++ b/cluster-autoscaler/core/scaledown/unneeded/nodes.go @@ -25,6 +25,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" @@ -39,10 +40,12 @@ import ( // Nodes tracks the state of cluster nodes that are not needed. type Nodes struct { - sdtg scaleDownTimeGetter - limitsFinder *resource.LimitsFinder - cachedList []*apiv1.Node - byName map[string]*node + sdtg scaleDownTimeGetter + limitsFinder *resource.LimitsFinder + nodeLatencyTracker latencytracker.LatencyTracker + cachedList []*apiv1.Node + byName map[string]*node + unneededTimeCache map[string]time.Duration } type node struct { @@ -58,10 +61,11 @@ type scaleDownTimeGetter interface { } // NewNodes returns a new initialized Nodes object. -func NewNodes(sdtg scaleDownTimeGetter, limitsFinder *resource.LimitsFinder) *Nodes { +func NewNodes(sdtg scaleDownTimeGetter, limitsFinder *resource.LimitsFinder, nlt latencytracker.LatencyTracker) *Nodes { return &Nodes{ - sdtg: sdtg, - limitsFinder: limitsFinder, + sdtg: sdtg, + limitsFinder: limitsFinder, + nodeLatencyTracker: nlt, } } @@ -231,6 +235,9 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDown if ready { // Check how long a ready node was underutilized. unneededTime, err := n.sdtg.GetScaleDownUnneededTime(nodeGroup) + if n.nodeLatencyTracker != nil { + n.nodeLatencyTracker.UpdateThreshold(node.Name, unneededTime) + } if err != nil { klog.Errorf("Error trying to get ScaleDownUnneededTime for node %s (in group: %s)", node.Name, nodeGroup.Id()) return simulator.UnexpectedError @@ -241,6 +248,9 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDown } else { // Unready nodes may be deleted after a different time than underutilized nodes. unreadyTime, err := n.sdtg.GetScaleDownUnreadyTime(nodeGroup) + if n.nodeLatencyTracker != nil { + n.nodeLatencyTracker.UpdateThreshold(node.Name, unreadyTime) + } if err != nil { klog.Errorf("Error trying to get ScaleDownUnreadyTime for node %s (in group: %s)", node.Name, nodeGroup.Id()) return simulator.UnexpectedError diff --git a/cluster-autoscaler/core/scaledown/unneeded/nodes_test.go b/cluster-autoscaler/core/scaledown/unneeded/nodes_test.go index 7a650854ed2f..ea551a6db10e 100644 --- a/cluster-autoscaler/core/scaledown/unneeded/nodes_test.go +++ b/cluster-autoscaler/core/scaledown/unneeded/nodes_test.go @@ -101,7 +101,7 @@ func TestUpdate(t *testing.T) { tc := tc t.Run(tc.desc, func(t *testing.T) { t.Parallel() - nodes := NewNodes(nil, nil) + nodes := NewNodes(nil, nil, nil) nodes.Update(tc.initialNodes, initialTimestamp) nodes.Update(tc.finalNodes, finalTimestamp) wantNodes := len(tc.wantTimestamps) @@ -203,7 +203,8 @@ func TestRemovableAt(t *testing.T) { ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ScaleDownSimulationTimeout: 5 * time.Minute}, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) - n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{}) + fakeTracker := NewFakeLatencyTracker() + n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{}, fakeTracker) n.Update(removableNodes, time.Now()) gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, nodeprocessors.ScaleDownContext{ ActuationStatus: as, @@ -213,6 +214,16 @@ func TestRemovableAt(t *testing.T) { if len(gotDrainToRemove) != tc.numDrainToRemove || len(gotEmptyToRemove) != tc.numEmptyToRemove { t.Errorf("%s: getNodesToRemove() return %d, %d, want %d, %d", tc.name, len(gotEmptyToRemove), len(gotDrainToRemove), tc.numEmptyToRemove, tc.numDrainToRemove) } + expectedThreshold := 0 * time.Second // matches fakeScaleDownTimeGetter + for _, node := range removableNodes { + nodeName := node.Node.Name + got, ok := fakeTracker.Observed[nodeName] + if !ok { + t.Errorf("NodeLatencyTracker not called for node %s", nodeName) + } else if got != expectedThreshold { + t.Errorf("NodeLatencyTracker called with %v for node %s, want %v", got, nodeName, expectedThreshold) + } + } }) } } @@ -273,8 +284,9 @@ func TestNodeLoadFromExistingTaints(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Parallel() + currentTime = time.Now() - nodes := NewNodes(nil, nil) + nodes := NewNodes(nil, nil, nil) allNodeLister := kubernetes.NewTestNodeLister(nil) allNodeLister.SetNodes(tc.allNodes) @@ -339,3 +351,20 @@ func (f *fakeScaleDownTimeGetter) GetScaleDownUnneededTime(cloudprovider.NodeGro func (f *fakeScaleDownTimeGetter) GetScaleDownUnreadyTime(cloudprovider.NodeGroup) (time.Duration, error) { return 0 * time.Second, nil } + +type fakeLatencyTracker struct { + Observed map[string]time.Duration +} + +func NewFakeLatencyTracker() *fakeLatencyTracker { + return &fakeLatencyTracker{Observed: make(map[string]time.Duration)} +} + +func (f *fakeLatencyTracker) UpdateThreshold(nodeName string, threshold time.Duration) { + f.Observed[nodeName] = threshold +} +func (f *fakeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) { +} +func (f *fakeLatencyTracker) UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time) { +} +func (f *fakeLatencyTracker) GetTrackedNodes() []string { return nil } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 77e4e13bfea3..83c4a2a74129 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -30,6 +30,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner" scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" @@ -170,11 +171,15 @@ func NewStaticAutoscaler( // TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext // during the struct creation rather than here. - scaleDownPlanner := planner.New(autoscalingContext, processors, deleteOptions, drainabilityRules) + var nldt latencytracker.LatencyTracker + if autoscalingContext.AutoscalingOptions.NodeRemovalLatencyTrackingEnabled { + nldt = latencytracker.NewNodeLatencyTracker() + } + scaleDownPlanner := planner.New(autoscalingContext, processors, deleteOptions, drainabilityRules, nldt) processorCallbacks.scaleDownPlanner = scaleDownPlanner ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) - scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) + scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, nldt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) autoscalingContext.ScaleDownActuator = scaleDownActuator if scaleUpOrchestrator == nil { diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 02425eb2a996..6b1095aab87b 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" @@ -165,7 +166,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error { func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) { deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) - ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor) + ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), nil, deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor) } type nodeGroup struct { @@ -321,8 +322,8 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { if nodeDeletionTracker == nil { nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } - ctx.ScaleDownActuator = actuation.NewActuator(&ctx, clusterState, nodeDeletionTracker, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) - sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules) + ctx.ScaleDownActuator = actuation.NewActuator(&ctx, clusterState, nodeDeletionTracker, nil, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) + sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules, nil) processorCallbacks.scaleDownPlanner = sdPlanner @@ -378,6 +379,21 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup) assert.NotNil(t, ng1) assert.NotNil(t, provider) + // NodeLatencyTracker mock + nltMock := &latencytrackerMock{} + nltMock.On("ObserveDeletion", + "n2", + mock.MatchedBy(func(t time.Time) bool { return !t.IsZero() }), + ).Return() + nltMock.On("UpdateStateWithUnneededList", + mock.MatchedBy(func(nodes []*apiv1.Node) bool { return true }), + mock.MatchedBy(func(m map[string]bool) bool { return true }), + mock.Anything, + ).Return() + nltMock.On("UpdateThreshold", + "n2", + time.Minute, + ).Return() // Create context with mocked lister registry. options := config.AutoscalingOptions{ @@ -410,7 +426,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { } processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nltMock) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -678,7 +694,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) processors.ScaleStateNotifier.Register(clusterState) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -822,7 +838,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -975,7 +991,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // broken node failed to register in time clusterState.UpdateNodes(nodes, nil, later) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -1130,7 +1146,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -1261,7 +1277,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -1359,7 +1375,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -1707,7 +1723,7 @@ func TestStaticAutoscalerRunOnceWithExistingDeletionCandidateNodes(t *testing.T) } processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -2467,7 +2483,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker) // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. - actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor) + actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), nil, options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor) ctx.ScaleDownActuator = actuator // Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState. @@ -3128,7 +3144,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { } } -func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) (scaledown.Planner, scaledown.Actuator) { +func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, nodeDeletionLatencyTracker latencytracker.LatencyTracker) (scaledown.Planner, scaledown.Actuator) { ctx.MaxScaleDownParallelism = 10 ctx.MaxDrainParallelism = 1 ctx.NodeDeletionBatcherInterval = 0 * time.Second @@ -3143,8 +3159,8 @@ func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_proce if nodeDeletionTracker == nil { nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } - planner := planner.New(ctx, p, deleteOptions, nil) - actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor) + planner := planner.New(ctx, p, deleteOptions, nil, nodeDeletionLatencyTracker) + actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, nodeDeletionLatencyTracker, deleteOptions, nil, p.NodeGroupConfigProcessor) return planner, actuator } @@ -3260,13 +3276,13 @@ func buildStaticAutoscaler(t *testing.T, provider cloudprovider.CloudProvider, a processors.ScaleDownNodeProcessor = cp csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: 1}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker) - actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor) + actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), nil, options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor) ctx.ScaleDownActuator = actuator deleteOptions := options.NewNodeDeleteOptions(ctx.AutoscalingOptions) drainabilityRules := rules.Default(deleteOptions) - sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules) + sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules, nil) autoscaler := &StaticAutoscaler{ AutoscalingContext: &ctx, @@ -3316,3 +3332,25 @@ func assertNodesSoftTaintsStatus(t *testing.T, fakeClient *fake.Clientset, nodes assert.Equal(t, tainted, taints.HasDeletionCandidateTaint(newNode)) } } + +// latencytrackerMock implements LatencyTracker for mocking +type latencytrackerMock struct { + mock.Mock +} + +func (m *latencytrackerMock) ObserveDeletion(nodeName string, timestamp time.Time) { + m.Called(nodeName, timestamp) +} + +func (m *latencytrackerMock) UpdateStateWithUnneededList(list []*apiv1.Node, currentlyInDeletion map[string]bool, timestamp time.Time) { + m.Called(list, currentlyInDeletion, timestamp) +} + +func (m *latencytrackerMock) UpdateThreshold(nodeName string, threshold time.Duration) { + m.Called(nodeName, threshold) +} + +func (m *latencytrackerMock) GetTrackedNodes() []string { + args := m.Called() + return args.Get(0).([]string) +} diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index ebc5541c5edb..e5598c71d011 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "fmt" + "strconv" "time" "k8s.io/autoscaler/cluster-autoscaler/simulator" @@ -427,6 +428,15 @@ var ( Buckets: k8smetrics.ExponentialBuckets(1, 2, 6), // 1, 2, 4, ..., 32 }, []string{"instance_type", "cpu_count", "namespace_count"}, ) + + scaleDownNodeRemovalLatency = k8smetrics.NewHistogramVec( + &k8smetrics.HistogramOpts{ + Namespace: caNamespace, + Name: "node_removal_latency_seconds", + Help: "Latency from planning (node marked) to final outcome (deleted, aborted, rescued).", + Buckets: k8smetrics.ExponentialBuckets(1, 2, 18), //1, 2, 4, 8, ..., 131072 approx 1.5 days + }, []string{"deleted"}, + ) ) // RegisterAll registers all metrics. @@ -463,6 +473,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) { legacyregistry.MustRegister(nodeTaintsCount) legacyregistry.MustRegister(inconsistentInstancesMigsCount) legacyregistry.MustRegister(binpackingHeterogeneity) + legacyregistry.MustRegister(scaleDownNodeRemovalLatency) if emitPerNodeGroupMetrics { legacyregistry.MustRegister(nodesGroupMinNodes) @@ -750,3 +761,9 @@ func UpdateInconsistentInstancesMigsCount(migCount int) { func ObserveBinpackingHeterogeneity(instanceType, cpuCount, namespaceCount string, pegCount int) { binpackingHeterogeneity.WithLabelValues(instanceType, cpuCount, namespaceCount).Observe(float64(pegCount)) } + +// UpdateScaleDownNodeRemovalLatency records the time after which node was deleted/needed +// again after being marked unneded +func UpdateScaleDownNodeRemovalLatency(deleted bool, duration time.Duration) { + scaleDownNodeRemovalLatency.WithLabelValues(strconv.FormatBool(deleted)).Observe(duration.Seconds()) +}