Skip to content

Commit b9e7969

Browse files
committed
feat(nodeScaleDownTimeTracker): add a new metric to track unprocessed nodes during scaleDown
1 parent 524270b commit b9e7969

File tree

5 files changed

+212
-0
lines changed

5 files changed

+212
-0
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,11 @@ type AutoscalingOptions struct {
349349
CapacitybufferControllerEnabled bool
350350
// CapacitybufferPodInjectionEnabled tells if CA should injects fake pods for capacity buffers that are ready for provisioning
351351
CapacitybufferPodInjectionEnabled bool
352+
// LongestNodeScaleDownTimeTrackerEnabled is used to enabled/disable the tracking of longest node ScaleDown evaluation time.
353+
// We want to track all the nodes that were marked as unneeded, but were unprocessed during the ScaleDown.
354+
// If a node was unneeded, but unprocessed multiple times consecutively, we store only the earliest time it happened.
355+
// The difference between the current time and the earliest time among all unprocessed nodes will give the longest time
356+
LongestNodeScaleDownTimeTrackerEnabled bool
352357
}
353358

354359
// KubeClientOptions specify options for kube client

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ var (
230230
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
231231
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
232232
capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly")
233+
longestNodeScaleDownTimeTrackerEnabled = flag.Bool("longest-node-scaledown-timetracker-enabled", false, "Whether to track the eval time of longestNodeScaleDown")
233234

234235
// Deprecated flags
235236
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
@@ -414,6 +415,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
414415
NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL,
415416
CapacitybufferControllerEnabled: *capacitybufferControllerEnabled,
416417
CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled,
418+
LongestNodeScaleDownTimeTrackerEnabled: *longestNodeScaleDownTimeTrackerEnabled,
417419
}
418420
}
419421

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
3131
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
3232
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
33+
"k8s.io/autoscaler/cluster-autoscaler/metrics"
3334
"k8s.io/autoscaler/cluster-autoscaler/processors"
3435
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
3536
"k8s.io/autoscaler/cluster-autoscaler/simulator"
@@ -76,6 +77,7 @@ type Planner struct {
7677
cc controllerReplicasCalculator
7778
scaleDownSetProcessor nodes.ScaleDownSetProcessor
7879
scaleDownContext *nodes.ScaleDownContext
80+
longestNodeScaleDownT longestNodeScaleDownTime
7981
}
8082

8183
// New creates a new Planner object.
@@ -104,6 +106,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
104106
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
105107
scaleDownContext: nodes.NewDefaultScaleDownContext(),
106108
minUpdateInterval: minUpdateInterval,
109+
longestNodeScaleDownT: *NewLongestNodeScaleDownTime(time.Now()),
107110
}
108111
}
109112

@@ -280,10 +283,12 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
280283

281284
for i, node := range currentlyUnneededNodeNames {
282285
if timedOut(timer) {
286+
p.longestNodeScaleDownT.update(currentlyUnneededNodeNames[i:], time.Now())
283287
klog.Warningf("%d out of %d nodes skipped in scale down simulation due to timeout.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames))
284288
break
285289
}
286290
if len(removableList)-atomicScaleDownNodesCount >= p.unneededNodesLimit() {
291+
p.longestNodeScaleDownT.update(currentlyUnneededNodeNames[i:], time.Now())
287292
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more. Total atomic scale down nodes: %d", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList), atomicScaleDownNodesCount)
288293
break
289294
}
@@ -435,3 +440,36 @@ func timedOut(timer *time.Timer) bool {
435440
return false
436441
}
437442
}
443+
444+
type longestNodeScaleDownTime struct {
445+
defaultTime time.Time
446+
nodeNamesWithTimeStamps map[string]time.Time
447+
}
448+
449+
func NewLongestNodeScaleDownTime(currentTime time.Time) *longestNodeScaleDownTime {
450+
return &longestNodeScaleDownTime{defaultTime: currentTime}
451+
}
452+
453+
func (l *longestNodeScaleDownTime) get(nodeName string) time.Time {
454+
if _, ok := l.nodeNamesWithTimeStamps[nodeName]; ok {
455+
return l.nodeNamesWithTimeStamps[nodeName]
456+
}
457+
return l.defaultTime
458+
}
459+
460+
func (l *longestNodeScaleDownTime) update(nodeNames []string, currentTime time.Time) {
461+
newNodes := make(map[string]time.Time, len(nodeNames))
462+
l.defaultTime = currentTime
463+
minimumTime := l.defaultTime
464+
for _, nodeName := range nodeNames {
465+
// if a node is not in candidates use current time
466+
// if a node is already in candidates copy the last value
467+
valueFromPrevIter := l.get(nodeName)
468+
newNodes[nodeName] = valueFromPrevIter
469+
if minimumTime.Compare(valueFromPrevIter) > 0 {
470+
minimumTime = valueFromPrevIter
471+
}
472+
}
473+
l.nodeNamesWithTimeStamps = newNodes
474+
metrics.ObserveLongestNodeScaleDownTime(currentTime.Sub(minimumTime))
475+
}

cluster-autoscaler/core/scaledown/planner/planner_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,156 @@ func TestNodesToDelete(t *testing.T) {
10351035
}
10361036
}
10371037

1038+
func TestLongestLatestEvalTime(t *testing.T) {
1039+
testCases := []struct {
1040+
name string
1041+
timedOutNodesCount int
1042+
nodes int
1043+
unneededNodes1 []string
1044+
unneededNodes2 []string
1045+
run int
1046+
}{
1047+
{
1048+
name: "Test to check the functionality of planner's longestNodeScaleDownT",
1049+
timedOutNodesCount: 4,
1050+
nodes: 6,
1051+
unneededNodes1: []string{"n0", "n1", "n2", "n3"},
1052+
unneededNodes2: []string{"n2", "n3", "n4", "n5"},
1053+
run: 2,
1054+
},
1055+
}
1056+
for _, tc := range testCases {
1057+
tc := tc
1058+
t.Run(tc.name, func(t *testing.T) {
1059+
t.Parallel()
1060+
nodes := make([]*apiv1.Node, tc.nodes)
1061+
for i := 0; i < tc.nodes; i++ {
1062+
nodes[i] = BuildTestNode(fmt.Sprintf("n%d", i), 1000, 10)
1063+
}
1064+
provider := testprovider.NewTestCloudProviderBuilder().Build()
1065+
provider.AddNodeGroup("ng1", 0, 0, 0)
1066+
for _, node := range nodes {
1067+
provider.AddNode("ng1", node)
1068+
}
1069+
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
1070+
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
1071+
ScaleDownUnneededTime: 1 * time.Minute,
1072+
},
1073+
ScaleDownSimulationTimeout: 1 * time.Hour,
1074+
MaxScaleDownParallelism: 10,
1075+
}, &fake.Clientset{}, nil, provider, nil, nil)
1076+
assert.NoError(t, err)
1077+
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
1078+
deleteOptions := options.NodeDeleteOptions{}
1079+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
1080+
1081+
var start time.Time
1082+
var timestamp time.Time
1083+
1084+
// take nodes "n0" - "n3" as unneeded
1085+
unneededNodeNames := tc.unneededNodes1
1086+
p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
1087+
for i := 0; i < tc.run; i++ {
1088+
timestamp = timestamp.Add(1 * time.Second)
1089+
// try to update the map for unneeded nodes with greater ts than the one already present
1090+
p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
1091+
assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), tc.timedOutNodesCount)
1092+
for _, val := range p.longestNodeScaleDownT.nodeNamesWithTimeStamps {
1093+
// check that timestamp for all unneeded nodes is still 0s
1094+
assert.Equal(t, val, start)
1095+
}
1096+
}
1097+
// take nodes "n2" - "n5" as unneeded
1098+
unneededNodeNames = tc.unneededNodes2
1099+
// timestamp is 2s now
1100+
p.longestNodeScaleDownT.update(unneededNodeNames, timestamp)
1101+
// check that for n0 and n1 we will get the default time, for n2 and n3 - start time, and for n4 and n5 - current ts
1102+
assert.Equal(t, p.longestNodeScaleDownT.get("n0"), p.longestNodeScaleDownT.defaultTime)
1103+
assert.Equal(t, p.longestNodeScaleDownT.get("n1"), p.longestNodeScaleDownT.defaultTime)
1104+
assert.Equal(t, p.longestNodeScaleDownT.get("n2"), start)
1105+
assert.Equal(t, p.longestNodeScaleDownT.get("n3"), start)
1106+
assert.Equal(t, p.longestNodeScaleDownT.get("n4"), timestamp)
1107+
assert.Equal(t, p.longestNodeScaleDownT.get("n5"), timestamp)
1108+
})
1109+
}
1110+
}
1111+
1112+
func TestLongestLatestEvalTimeWithTimeout(t *testing.T) {
1113+
testCases := []struct {
1114+
name string
1115+
nodes []*apiv1.Node
1116+
actuationStatus *fakeActuationStatus
1117+
eligible []string
1118+
maxParallel int
1119+
isSimulationTimeout bool
1120+
}{
1121+
{
1122+
name: "Unneeded node limit is exceeded",
1123+
nodes: []*apiv1.Node{
1124+
BuildTestNode("n1", 1000, 10),
1125+
BuildTestNode("n2", 1000, 10),
1126+
BuildTestNode("n3", 1000, 10),
1127+
},
1128+
actuationStatus: &fakeActuationStatus{},
1129+
eligible: []string{"n1", "n2"},
1130+
maxParallel: 0,
1131+
isSimulationTimeout: false,
1132+
},
1133+
{
1134+
name: "Simulation timeout is hit",
1135+
nodes: []*apiv1.Node{
1136+
BuildTestNode("n1", 1000, 10),
1137+
BuildTestNode("n2", 1000, 10),
1138+
BuildTestNode("n3", 1000, 10),
1139+
},
1140+
actuationStatus: &fakeActuationStatus{},
1141+
eligible: []string{"n1", "n2"},
1142+
maxParallel: 1,
1143+
isSimulationTimeout: true,
1144+
},
1145+
}
1146+
for _, tc := range testCases {
1147+
tc := tc
1148+
t.Run(tc.name, func(t *testing.T) {
1149+
t.Parallel()
1150+
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
1151+
provider := testprovider.NewTestCloudProviderBuilder().Build()
1152+
provider.AddNodeGroup("ng1", 0, 0, 0)
1153+
for _, node := range tc.nodes {
1154+
provider.AddNode("ng1", node)
1155+
}
1156+
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
1157+
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
1158+
ScaleDownUnneededTime: 10 * time.Minute,
1159+
},
1160+
ScaleDownSimulationTimeout: 1 * time.Second,
1161+
MaxScaleDownParallelism: tc.maxParallel,
1162+
}, &fake.Clientset{}, registry, provider, nil, nil)
1163+
assert.NoError(t, err)
1164+
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, nil)
1165+
deleteOptions := options.NodeDeleteOptions{}
1166+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
1167+
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
1168+
if tc.isSimulationTimeout {
1169+
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
1170+
rs := &fakeRemovalSimulator{
1171+
nodes: tc.nodes,
1172+
sleep: 2 * time.Second,
1173+
}
1174+
p.rs = rs
1175+
}
1176+
assert.NoError(t, p.UpdateClusterState(tc.nodes, tc.nodes, &fakeActuationStatus{}, time.Now()))
1177+
if tc.isSimulationTimeout {
1178+
// first node will be deleted and for the second timeout will be triggered
1179+
assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), 1)
1180+
} else {
1181+
// maxParallel=0 forces p.unneededNodesLimit() to be 0, so we will break in the second check inside p.categorizeNodes()
1182+
assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), 2)
1183+
}
1184+
})
1185+
}
1186+
}
1187+
10381188
func sizedNodeGroup(id string, size int, atomic bool) cloudprovider.NodeGroup {
10391189
ng := testprovider.NewTestNodeGroup(id, 10000, 0, size, true, false, "n1-standard-2", nil, nil)
10401190
ng.SetOptions(&config.NodeGroupAutoscalingOptions{

cluster-autoscaler/metrics/metrics.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,14 @@ var (
425425
Buckets: k8smetrics.ExponentialBuckets(1, 2, 6), // 1, 2, 4, ..., 32
426426
}, []string{"instance_type", "cpu_count", "namespace_count"},
427427
)
428+
429+
longestLastScaleDownEvalDuration = k8smetrics.NewGauge(
430+
&k8smetrics.GaugeOpts{
431+
Namespace: caNamespace,
432+
Name: "longest_scale_down_eval",
433+
Help: "Longest node evaluation time during ScaleDown.",
434+
},
435+
)
428436
)
429437

430438
// RegisterAll registers all metrics.
@@ -461,6 +469,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) {
461469
legacyregistry.MustRegister(nodeTaintsCount)
462470
legacyregistry.MustRegister(inconsistentInstancesMigsCount)
463471
legacyregistry.MustRegister(binpackingHeterogeneity)
472+
legacyregistry.MustRegister(longestLastScaleDownEvalDuration)
464473

465474
if emitPerNodeGroupMetrics {
466475
legacyregistry.MustRegister(nodesGroupMinNodes)
@@ -748,3 +757,11 @@ func UpdateInconsistentInstancesMigsCount(migCount int) {
748757
func ObserveBinpackingHeterogeneity(instanceType, cpuCount, namespaceCount string, pegCount int) {
749758
binpackingHeterogeneity.WithLabelValues(instanceType, cpuCount, namespaceCount).Observe(float64(pegCount))
750759
}
760+
761+
// ObserveLongestNodeScaleDownTime records the longest time that passed from the first leaving a node unprocessed till now.
762+
// If a node is unneeded, but unprocessed consecutively multiple times, we store only the earliest timestamp.
763+
// Here we report the difference between current time and the earliest time among all unprocessed nodes in current ScaleDown iteration
764+
// If we never timedOut in categorizeNodes() or never exceeded p.unneededNodesLimit(), this value will be 0
765+
func ObserveLongestNodeScaleDownTime(duration time.Duration) {
766+
longestLastScaleDownEvalDuration.Set(float64(duration))
767+
}

0 commit comments

Comments
 (0)