Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ type AutoscalingOptions struct {
CapacitybufferControllerEnabled bool
// CapacitybufferPodInjectionEnabled tells if CA should injects fake pods for capacity buffers that are ready for provisioning
CapacitybufferPodInjectionEnabled bool
// LongestNodeScaleDownTimeTrackerEnabled is used to enabled/disable the tracking of longest node ScaleDown evaluation time.
// We want to track all the nodes that were marked as unneeded, but were unprocessed during the ScaleDown.
// If a node was unneeded, but unprocessed multiple times consecutively, we store only the earliest time it happened.
// The difference between the current time and the earliest time among all unprocessed nodes will give the longest time
LongestNodeScaleDownTimeTrackerEnabled bool
}

// KubeClientOptions specify options for kube client
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ var (
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly")
longestNodeScaleDownTimeTrackerEnabled = flag.Bool("longest-node-scaledown-timetracker-enabled", false, "Whether to track the eval time of longestNodeScaleDown")

// Deprecated flags
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
Expand Down Expand Up @@ -414,6 +415,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL,
CapacitybufferControllerEnabled: *capacitybufferControllerEnabled,
CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled,
LongestNodeScaleDownTimeTrackerEnabled: *longestNodeScaleDownTimeTrackerEnabled,
}
}

Expand Down
62 changes: 62 additions & 0 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
Expand Down Expand Up @@ -76,6 +77,7 @@ type Planner struct {
cc controllerReplicasCalculator
scaleDownSetProcessor nodes.ScaleDownSetProcessor
scaleDownContext *nodes.ScaleDownContext
longestNodeScaleDownT *longestNodeScaleDownTime
}

// New creates a new Planner object.
Expand All @@ -91,6 +93,11 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
unneededNodes.LoadFromExistingTaints(context.ListerRegistry, time.Now(), context.AutoscalingOptions.NodeDeletionCandidateTTL)
}

var longestNodeScaleDownTime *longestNodeScaleDownTime
if context.AutoscalingOptions.LongestNodeScaleDownTimeTrackerEnabled {
longestNodeScaleDownTime = newLongestNodeScaleDownTime()
}

return &Planner{
context: context,
unremovableNodes: unremovable.NewNodes(),
Expand All @@ -104,6 +111,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
scaleDownContext: nodes.NewDefaultScaleDownContext(),
minUpdateInterval: minUpdateInterval,
longestNodeScaleDownT: longestNodeScaleDownTime,
}
}

Expand Down Expand Up @@ -277,13 +285,16 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
}
p.nodeUtilizationMap = utilizationMap
timer := time.NewTimer(p.context.ScaleDownSimulationTimeout)
endedPrematurely := false

for i, node := range currentlyUnneededNodeNames {
if timedOut(timer) {
p.processUnneededNodes(currentlyUnneededNodeNames[i:], time.Now(), &endedPrematurely)
klog.Warningf("%d out of %d nodes skipped in scale down simulation due to timeout.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames))
break
}
if len(removableList)-atomicScaleDownNodesCount >= p.unneededNodesLimit() {
p.processUnneededNodes(currentlyUnneededNodeNames[i:], time.Now(), &endedPrematurely)
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more. Total atomic scale down nodes: %d", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList), atomicScaleDownNodesCount)
break
}
Expand All @@ -306,6 +317,7 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
p.unremovableNodes.AddTimeout(unremovable, unremovableTimeout)
}
}
p.processUnneededNodes(nil, time.Now(), &endedPrematurely)
p.unneededNodes.Update(removableList, p.latestUpdate)
if unremovableCount > 0 {
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
Expand Down Expand Up @@ -435,3 +447,53 @@ func timedOut(timer *time.Timer) bool {
return false
}
}

func (p *Planner) processUnneededNodes(currentlyUnneededNodeNames []string, currentTime time.Time, endedPrematurely *bool) {
// if p.longestNodeScaleDownT is not set or endedPrematurely is already true do not do anything -> LongestNodeScaleDownTime is not enabled
// or we already calculated the longest duration in this iteration
// if endedPrematurely is false -> all unneededNodes were successfully simulated and the longest evaluation time is 0
if p.longestNodeScaleDownT == nil || *endedPrematurely {
return
}
*endedPrematurely = true
p.longestNodeScaleDownT.update(currentlyUnneededNodeNames, currentTime)
}

type longestNodeScaleDownTime struct {
defaultTime time.Time

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe rename it to sth more meaningful: evalTime, lastEvalTime, ...?

nodeNamesWithTimeStamps map[string]time.Time
}

func newLongestNodeScaleDownTime() *longestNodeScaleDownTime {
return &longestNodeScaleDownTime{}
}

func (l *longestNodeScaleDownTime) get(nodeName string) time.Time {
if _, ok := l.nodeNamesWithTimeStamps[nodeName]; ok {
return l.nodeNamesWithTimeStamps[nodeName]
}
return l.defaultTime
}

func (l *longestNodeScaleDownTime) update(nodeNames []string, currentTime time.Time) {
// if all nodes were processed we need to report time 0
if nodeNames == nil {
l.nodeNamesWithTimeStamps = make(map[string]time.Time)
metrics.ObserveLongestNodeScaleDownTime(0)
Comment on lines +481 to +482

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should report the biggest time (if there were any carry over from the previous loop) before clearing it.
Also l.defaultTime should be updated here.
Please add a unit test for this scenario.

return
}
newNodes := make(map[string]time.Time, len(nodeNames))
l.defaultTime = currentTime

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you override l.defaultTime too early.

Let's imagine very simple scenario with one node:

time unneeded reportedTime nodeNamesWithTimeStamps
t [] 0
t+1 [n1] 1 n1 : t
t+2 [n1] 2 n1 : t
t+3 [] 3
t+4 [] 0

Seems like with your implementation you would at t +1 save n1: t+1 and report 0.

minimumTime := l.defaultTime
for _, nodeName := range nodeNames {
// if a node is not in nodeNamesWithTimeStamps use current time
// if a node is already in nodeNamesWithTimeStamps copy the last value
valueFromPrevIter := l.get(nodeName)
newNodes[nodeName] = valueFromPrevIter
if minimumTime.Compare(valueFromPrevIter) > 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: From what I see Compare() is not used here. Please use Before() or After().

minimumTime = valueFromPrevIter
}
}
l.nodeNamesWithTimeStamps = newNodes
metrics.ObserveLongestNodeScaleDownTime(currentTime.Sub(minimumTime))
}
172 changes: 172 additions & 0 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,178 @@ func TestNodesToDelete(t *testing.T) {
}
}

func TestLongestNodeScaleDownTime(t *testing.T) {
testCases := []struct {
name string
timedOutNodesCount int
nodes int
unneededNodes1 []string
unneededNodes2 []string
run int
}{
{
name: "Test to check the functionality of planner's longestNodeScaleDownT",
timedOutNodesCount: 4,
nodes: 6,
unneededNodes1: []string{"n0", "n1", "n2", "n3"},
unneededNodes2: []string{"n2", "n3", "n4", "n5"},
run: 2,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
nodes := make([]*apiv1.Node, tc.nodes)
for i := 0; i < tc.nodes; i++ {
nodes[i] = BuildTestNode(fmt.Sprintf("n%d", i), 1000, 10)
}
provider := testprovider.NewTestCloudProviderBuilder().Build()
provider.AddNodeGroup("ng1", 0, 0, 0)
for _, node := range nodes {
provider.AddNode("ng1", node)
}
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 1 * time.Minute,
},
ScaleDownSimulationTimeout: 1 * time.Hour,
MaxScaleDownParallelism: 10,
LongestNodeScaleDownTimeTrackerEnabled: true,
}, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
Comment on lines +1060 to +1080

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be easier to just create instance with newLongestNodeScaleDownTime()?


var start time.Time
var timestamp time.Time

// take nodes "n0" - "n3" as unneeded
unneededNodeNames := tc.unneededNodes1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need unneededNodeNames variable?

p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
for i := 0; i < tc.run; i++ {
timestamp = timestamp.Add(1 * time.Second)
// try to update the map for unneeded nodes with greater ts than the one already present
p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), tc.timedOutNodesCount)
for _, val := range p.longestNodeScaleDownT.nodeNamesWithTimeStamps {
// check that timestamp for all unneeded nodes is still 0s
assert.Equal(t, val, start)
}
}
// take nodes "n2" - "n5" as unneeded
unneededNodeNames = tc.unneededNodes2
// timestamp is 2s now
p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
// check that for n0 and n1 we will get the default time, for n2 and n3 - start time, and for n4 and n5 - current ts
assert.Equal(t, p.longestNodeScaleDownT.get("n0"), p.longestNodeScaleDownT.defaultTime)
assert.Equal(t, p.longestNodeScaleDownT.get("n1"), p.longestNodeScaleDownT.defaultTime)
assert.Equal(t, p.longestNodeScaleDownT.get("n2"), start)
assert.Equal(t, p.longestNodeScaleDownT.get("n3"), start)
assert.Equal(t, p.longestNodeScaleDownT.get("n4"), timestamp)
assert.Equal(t, p.longestNodeScaleDownT.get("n5"), timestamp)
})
}
}

func TestLongestNodeScaleDownTimeWithTimeout(t *testing.T) {
testCases := []struct {
name string
nodes []*apiv1.Node
actuationStatus *fakeActuationStatus
eligible []string
maxParallel int
isSimulationTimeout bool
unprocessedNodes int
isFlagEnabled bool
}{
{
name: "Unneeded node limit is exceeded",
nodes: []*apiv1.Node{
BuildTestNode("n1", 1000, 10),
BuildTestNode("n2", 1000, 10),
BuildTestNode("n3", 1000, 10),
},
actuationStatus: &fakeActuationStatus{},
eligible: []string{"n1", "n2"},
maxParallel: 0,
isSimulationTimeout: false,
// maxParallel=0 forces p.unneededNodesLimit() to be 0, so we will break in the second check inside p.categorizeNodes() right away
unprocessedNodes: 2,
isFlagEnabled: true,
},
{
name: "Simulation timeout is hit",
nodes: []*apiv1.Node{
BuildTestNode("n1", 1000, 10),
BuildTestNode("n2", 1000, 10),
BuildTestNode("n3", 1000, 10),
},
actuationStatus: &fakeActuationStatus{},
eligible: []string{"n1", "n2"},
maxParallel: 1,
isSimulationTimeout: true,
// first node will be deleted and for the second timeout will be triggered
unprocessedNodes: 1,
isFlagEnabled: true,
},
{
name: "longestLastScaleDownEvalDuration flag is disabled",
nodes: []*apiv1.Node{
BuildTestNode("n1", 1000, 10),
BuildTestNode("n2", 1000, 10),
BuildTestNode("n3", 1000, 10),
},
actuationStatus: &fakeActuationStatus{},
eligible: []string{"n1", "n2"},
maxParallel: 1,
isSimulationTimeout: false,
isFlagEnabled: false,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProviderBuilder().Build()
provider.AddNodeGroup("ng1", 0, 0, 0)
for _, node := range tc.nodes {
provider.AddNode("ng1", node)
}
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 10 * time.Minute,
},
ScaleDownSimulationTimeout: 1 * time.Second,
MaxScaleDownParallelism: tc.maxParallel,
LongestNodeScaleDownTimeTrackerEnabled: tc.isFlagEnabled,
}, &fake.Clientset{}, registry, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
if tc.isSimulationTimeout {
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
rs := &fakeRemovalSimulator{
nodes: tc.nodes,
sleep: 2 * time.Second,
}
p.rs = rs
}
assert.NoError(t, p.UpdateClusterState(tc.nodes, tc.nodes, &fakeActuationStatus{}, time.Now()))
if !tc.isFlagEnabled {
// if flag is disabled p.longestNodeScaleDownT is not initialized
assert.Nil(t, p.longestNodeScaleDownT)
} else {
assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), tc.unprocessedNodes)
}
})
}
}

func sizedNodeGroup(id string, size int, atomic bool) cloudprovider.NodeGroup {
ng := testprovider.NewTestNodeGroup(id, 10000, 0, size, true, false, "n1-standard-2", nil, nil)
ng.SetOptions(&config.NodeGroupAutoscalingOptions{
Expand Down
17 changes: 17 additions & 0 deletions cluster-autoscaler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,14 @@ var (
Buckets: k8smetrics.ExponentialBuckets(1, 2, 6), // 1, 2, 4, ..., 32
}, []string{"instance_type", "cpu_count", "namespace_count"},
)

longestLastScaleDownEvalDuration = k8smetrics.NewGauge(
&k8smetrics.GaugeOpts{
Namespace: caNamespace,
Name: "longest_scale_down_eval",
Help: "Longest node evaluation time during ScaleDown.",
Comment on lines +432 to +433

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a unit as suffix (eg. like function_duration_seconds).
Have you checked what buckets are used by default and if they are fine for this use case?

},
)
)

// RegisterAll registers all metrics.
Expand Down Expand Up @@ -461,6 +469,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) {
legacyregistry.MustRegister(nodeTaintsCount)
legacyregistry.MustRegister(inconsistentInstancesMigsCount)
legacyregistry.MustRegister(binpackingHeterogeneity)
legacyregistry.MustRegister(longestLastScaleDownEvalDuration)

if emitPerNodeGroupMetrics {
legacyregistry.MustRegister(nodesGroupMinNodes)
Expand Down Expand Up @@ -748,3 +757,11 @@ func UpdateInconsistentInstancesMigsCount(migCount int) {
func ObserveBinpackingHeterogeneity(instanceType, cpuCount, namespaceCount string, pegCount int) {
binpackingHeterogeneity.WithLabelValues(instanceType, cpuCount, namespaceCount).Observe(float64(pegCount))
}

// ObserveLongestNodeScaleDownTime records the longest time that passed from the first leaving a node unprocessed till now.
// If a node is unneeded, but unprocessed consecutively multiple times, we store only the earliest timestamp.
// Here we report the difference between current time and the earliest time among all unprocessed nodes in current ScaleDown iteration
// If we never timedOut in categorizeNodes() or never exceeded p.unneededNodesLimit(), this value will be 0
func ObserveLongestNodeScaleDownTime(duration time.Duration) {
longestLastScaleDownEvalDuration.Set(float64(duration))
}
Loading