Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ type AutoscalingOptions struct {
CapacitybufferControllerEnabled bool
// CapacitybufferPodInjectionEnabled tells if CA should injects fake pods for capacity buffers that are ready for provisioning
CapacitybufferPodInjectionEnabled bool
// LongestNodeScaleDownTimeTrackerEnabled is used to enabled/disable the tracking of longest node ScaleDown evaluation time.
// We want to track all the nodes that were marked as unneeded, but were unprocessed during the ScaleDown.
// If a node was unneeded, but unprocessed multiple times consecutively, we store only the earliest time it happened.
// The difference between the current time and the earliest time among all unprocessed nodes will give the longest time
LongestNodeScaleDownTimeTrackerEnabled bool
}

// KubeClientOptions specify options for kube client
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ var (
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly")
longestNodeScaleDownTimeTrackerEnabled = flag.Bool("longest-node-scaledown-timetracker-enabled", false, "Whether to track the eval time of longestNodeScaleDown")

// Deprecated flags
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
Expand Down Expand Up @@ -414,6 +415,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL,
CapacitybufferControllerEnabled: *capacitybufferControllerEnabled,
CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled,
LongestNodeScaleDownTimeTrackerEnabled: *longestNodeScaleDownTimeTrackerEnabled,
}
}

Expand Down
62 changes: 62 additions & 0 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
Expand Down Expand Up @@ -76,6 +77,7 @@ type Planner struct {
cc controllerReplicasCalculator
scaleDownSetProcessor nodes.ScaleDownSetProcessor
scaleDownContext *nodes.ScaleDownContext
longestNodeScaleDownT *longestNodeScaleDownTime
}

// New creates a new Planner object.
Expand All @@ -91,6 +93,11 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
unneededNodes.LoadFromExistingTaints(context.ListerRegistry, time.Now(), context.AutoscalingOptions.NodeDeletionCandidateTTL)
}

var longestNodeScaleDownTime *longestNodeScaleDownTime
if context.AutoscalingOptions.LongestNodeScaleDownTimeTrackerEnabled {
longestNodeScaleDownTime = newLongestNodeScaleDownTime()
}

return &Planner{
context: context,
unremovableNodes: unremovable.NewNodes(),
Expand All @@ -104,6 +111,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
scaleDownContext: nodes.NewDefaultScaleDownContext(),
minUpdateInterval: minUpdateInterval,
longestNodeScaleDownT: longestNodeScaleDownTime,
}
}

Expand Down Expand Up @@ -277,13 +285,16 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
}
p.nodeUtilizationMap = utilizationMap
timer := time.NewTimer(p.context.ScaleDownSimulationTimeout)
endedPrematurely := false

for i, node := range currentlyUnneededNodeNames {
if timedOut(timer) {
p.processUnneededNodes(currentlyUnneededNodeNames[i:], time.Now(), &endedPrematurely)
klog.Warningf("%d out of %d nodes skipped in scale down simulation due to timeout.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames))
break
}
if len(removableList)-atomicScaleDownNodesCount >= p.unneededNodesLimit() {
p.processUnneededNodes(currentlyUnneededNodeNames[i:], time.Now(), &endedPrematurely)
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more. Total atomic scale down nodes: %d", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList), atomicScaleDownNodesCount)
break
}
Expand All @@ -306,6 +317,7 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
p.unremovableNodes.AddTimeout(unremovable, unremovableTimeout)
}
}
p.processUnneededNodes(nil, time.Now(), &endedPrematurely)
p.unneededNodes.Update(removableList, p.latestUpdate)
if unremovableCount > 0 {
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
Expand Down Expand Up @@ -435,3 +447,53 @@ func timedOut(timer *time.Timer) bool {
return false
}
}

func (p *Planner) processUnneededNodes(currentlyUnneededNodeNames []string, currentTime time.Time, endedPrematurely *bool) {
// if p.longestNodeScaleDownT is not set or endedPrematurely is already true do not do anything -> LongestNodeScaleDownTime is not enabled
// or we already calculated the longest duration in this iteration
// if endedPrematurely is false -> all unneededNodes were successfully simulated and the longest evaluation time is 0
if p.longestNodeScaleDownT == nil || *endedPrematurely {
return
}
*endedPrematurely = true
p.longestNodeScaleDownT.update(currentlyUnneededNodeNames, currentTime)
}

type longestNodeScaleDownTime struct {
defaultTime time.Time
nodeNamesWithTimeStamps map[string]time.Time
}

func newLongestNodeScaleDownTime() *longestNodeScaleDownTime {
return &longestNodeScaleDownTime{}
}

func (l *longestNodeScaleDownTime) get(nodeName string) time.Time {
if _, ok := l.nodeNamesWithTimeStamps[nodeName]; ok {
return l.nodeNamesWithTimeStamps[nodeName]
}
return l.defaultTime
}

func (l *longestNodeScaleDownTime) update(nodeNames []string, currentTime time.Time) {
// if all nodes were processed we need to report time 0
if nodeNames == nil {
l.nodeNamesWithTimeStamps = make(map[string]time.Time)
metrics.ObserveLongestNodeScaleDownTime(0)
return
}
newNodes := make(map[string]time.Time, len(nodeNames))
l.defaultTime = currentTime
minimumTime := l.defaultTime
for _, nodeName := range nodeNames {
// if a node is not in nodeNamesWithTimeStamps use current time
// if a node is already in nodeNamesWithTimeStamps copy the last value
valueFromPrevIter := l.get(nodeName)
newNodes[nodeName] = valueFromPrevIter
if minimumTime.Compare(valueFromPrevIter) > 0 {
minimumTime = valueFromPrevIter
}
}
l.nodeNamesWithTimeStamps = newNodes
metrics.ObserveLongestNodeScaleDownTime(currentTime.Sub(minimumTime))
}
172 changes: 172 additions & 0 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,178 @@ func TestNodesToDelete(t *testing.T) {
}
}

func TestLongestNodeScaleDownTime(t *testing.T) {
testCases := []struct {
name string
timedOutNodesCount int
nodes int
unneededNodes1 []string
unneededNodes2 []string
run int
}{
{
name: "Test to check the functionality of planner's longestNodeScaleDownT",
timedOutNodesCount: 4,
nodes: 6,
unneededNodes1: []string{"n0", "n1", "n2", "n3"},
unneededNodes2: []string{"n2", "n3", "n4", "n5"},
run: 2,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
nodes := make([]*apiv1.Node, tc.nodes)
for i := 0; i < tc.nodes; i++ {
nodes[i] = BuildTestNode(fmt.Sprintf("n%d", i), 1000, 10)
}
provider := testprovider.NewTestCloudProviderBuilder().Build()
provider.AddNodeGroup("ng1", 0, 0, 0)
for _, node := range nodes {
provider.AddNode("ng1", node)
}
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 1 * time.Minute,
},
ScaleDownSimulationTimeout: 1 * time.Hour,
MaxScaleDownParallelism: 10,
LongestNodeScaleDownTimeTrackerEnabled: true,
}, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)

var start time.Time
var timestamp time.Time

// take nodes "n0" - "n3" as unneeded
unneededNodeNames := tc.unneededNodes1
p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
for i := 0; i < tc.run; i++ {
timestamp = timestamp.Add(1 * time.Second)
// try to update the map for unneeded nodes with greater ts than the one already present
p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), tc.timedOutNodesCount)
for _, val := range p.longestNodeScaleDownT.nodeNamesWithTimeStamps {
// check that timestamp for all unneeded nodes is still 0s
assert.Equal(t, val, start)
}
}
// take nodes "n2" - "n5" as unneeded
unneededNodeNames = tc.unneededNodes2
// timestamp is 2s now
p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
// check that for n0 and n1 we will get the default time, for n2 and n3 - start time, and for n4 and n5 - current ts
assert.Equal(t, p.longestNodeScaleDownT.get("n0"), p.longestNodeScaleDownT.defaultTime)
assert.Equal(t, p.longestNodeScaleDownT.get("n1"), p.longestNodeScaleDownT.defaultTime)
assert.Equal(t, p.longestNodeScaleDownT.get("n2"), start)
assert.Equal(t, p.longestNodeScaleDownT.get("n3"), start)
assert.Equal(t, p.longestNodeScaleDownT.get("n4"), timestamp)
assert.Equal(t, p.longestNodeScaleDownT.get("n5"), timestamp)
})
}
}

func TestLongestNodeScaleDownTimeWithTimeout(t *testing.T) {
testCases := []struct {
name string
nodes []*apiv1.Node
actuationStatus *fakeActuationStatus
eligible []string
maxParallel int
isSimulationTimeout bool
unprocessedNodes int
isFlagEnabled bool
}{
{
name: "Unneeded node limit is exceeded",
nodes: []*apiv1.Node{
BuildTestNode("n1", 1000, 10),
BuildTestNode("n2", 1000, 10),
BuildTestNode("n3", 1000, 10),
},
actuationStatus: &fakeActuationStatus{},
eligible: []string{"n1", "n2"},
maxParallel: 0,
isSimulationTimeout: false,
// maxParallel=0 forces p.unneededNodesLimit() to be 0, so we will break in the second check inside p.categorizeNodes() right away
unprocessedNodes: 2,
isFlagEnabled: true,
},
{
name: "Simulation timeout is hit",
nodes: []*apiv1.Node{
BuildTestNode("n1", 1000, 10),
BuildTestNode("n2", 1000, 10),
BuildTestNode("n3", 1000, 10),
},
actuationStatus: &fakeActuationStatus{},
eligible: []string{"n1", "n2"},
maxParallel: 1,
isSimulationTimeout: true,
// first node will be deleted and for the second timeout will be triggered
unprocessedNodes: 1,
isFlagEnabled: true,
},
{
name: "longestLastScaleDownEvalDuration flag is disabled",
nodes: []*apiv1.Node{
BuildTestNode("n1", 1000, 10),
BuildTestNode("n2", 1000, 10),
BuildTestNode("n3", 1000, 10),
},
actuationStatus: &fakeActuationStatus{},
eligible: []string{"n1", "n2"},
maxParallel: 1,
isSimulationTimeout: false,
isFlagEnabled: false,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProviderBuilder().Build()
provider.AddNodeGroup("ng1", 0, 0, 0)
for _, node := range tc.nodes {
provider.AddNode("ng1", node)
}
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 10 * time.Minute,
},
ScaleDownSimulationTimeout: 1 * time.Second,
MaxScaleDownParallelism: tc.maxParallel,
LongestNodeScaleDownTimeTrackerEnabled: tc.isFlagEnabled,
}, &fake.Clientset{}, registry, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
if tc.isSimulationTimeout {
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
rs := &fakeRemovalSimulator{
nodes: tc.nodes,
sleep: 2 * time.Second,
}
p.rs = rs
}
assert.NoError(t, p.UpdateClusterState(tc.nodes, tc.nodes, &fakeActuationStatus{}, time.Now()))
if !tc.isFlagEnabled {
// if flag is disabled p.longestNodeScaleDownT is not initialized
assert.Nil(t, p.longestNodeScaleDownT)
} else {
assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), tc.unprocessedNodes)
}
})
}
}

func sizedNodeGroup(id string, size int, atomic bool) cloudprovider.NodeGroup {
ng := testprovider.NewTestNodeGroup(id, 10000, 0, size, true, false, "n1-standard-2", nil, nil)
ng.SetOptions(&config.NodeGroupAutoscalingOptions{
Expand Down
17 changes: 17 additions & 0 deletions cluster-autoscaler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,14 @@ var (
Buckets: k8smetrics.ExponentialBuckets(1, 2, 6), // 1, 2, 4, ..., 32
}, []string{"instance_type", "cpu_count", "namespace_count"},
)

longestLastScaleDownEvalDuration = k8smetrics.NewGauge(
&k8smetrics.GaugeOpts{
Namespace: caNamespace,
Name: "longest_scale_down_eval",
Help: "Longest node evaluation time during ScaleDown.",
},
)
)

// RegisterAll registers all metrics.
Expand Down Expand Up @@ -461,6 +469,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) {
legacyregistry.MustRegister(nodeTaintsCount)
legacyregistry.MustRegister(inconsistentInstancesMigsCount)
legacyregistry.MustRegister(binpackingHeterogeneity)
legacyregistry.MustRegister(longestLastScaleDownEvalDuration)

if emitPerNodeGroupMetrics {
legacyregistry.MustRegister(nodesGroupMinNodes)
Expand Down Expand Up @@ -748,3 +757,11 @@ func UpdateInconsistentInstancesMigsCount(migCount int) {
func ObserveBinpackingHeterogeneity(instanceType, cpuCount, namespaceCount string, pegCount int) {
binpackingHeterogeneity.WithLabelValues(instanceType, cpuCount, namespaceCount).Observe(float64(pegCount))
}

// ObserveLongestNodeScaleDownTime records the longest time that passed from the first leaving a node unprocessed till now.
// If a node is unneeded, but unprocessed consecutively multiple times, we store only the earliest timestamp.
// Here we report the difference between current time and the earliest time among all unprocessed nodes in current ScaleDown iteration
// If we never timedOut in categorizeNodes() or never exceeded p.unneededNodesLimit(), this value will be 0
func ObserveLongestNodeScaleDownTime(duration time.Duration) {
longestLastScaleDownEvalDuration.Set(float64(duration))
}
Loading