diff --git a/cluster-autoscaler/processors/podinjection/pod_group.go b/cluster-autoscaler/processors/podinjection/pod_group.go index fa4c4bccfa68..fc3faf63a100 100644 --- a/cluster-autoscaler/processors/podinjection/pod_group.go +++ b/cluster-autoscaler/processors/podinjection/pod_group.go @@ -36,7 +36,6 @@ func groupPods(pods []*apiv1.Pod, controllers []controller) map[types.UID]podGro for _, con := range controllers { podGroups[con.uid] = makePodGroup(con.desiredReplicas) } - for _, pod := range pods { for _, ownerRef := range pod.OwnerReferences { podGroups = updatePodGroups(pod, ownerRef, podGroups) diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor.go index fd4d78c0d432..480216b43dda 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor.go @@ -26,6 +26,7 @@ import ( podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" "k8s.io/autoscaler/cluster-autoscaler/simulator/fake" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/klog/v2" ) @@ -61,9 +62,16 @@ func (p *PodInjectionPodListProcessor) Process(ctx *context.AutoscalingContext, } scheduledPods := podsFromNodeInfos(nodeInfos) - groupedPods := groupPods(append(scheduledPods, unschedulablePods...), controllers) - var podsToInject []*apiv1.Pod + allPods, err := ctx.AllPodLister().List() + if err != nil { + klog.Errorf("Failed to list all pods from all pod lister: %v", err) + return unschedulablePods, fmt.Errorf("failed to list all pods from all pod lister: %v", err) + } + schedulingGatedPods := kube_util.SchedulingGatedPods(allPods) + groupedPods := groupPods(append(append(scheduledPods, unschedulablePods...), schedulingGatedPods...), controllers) + + var podsToInject []*apiv1.Pod for _, groupedPod := range groupedPods { var fakePodCount = groupedPod.fakePodCount() fakePods := makeFakePods(groupedPod.ownerUid, groupedPod.sample, fakePodCount) diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index 3882eb88cb7c..80a34188b41c 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -45,6 +45,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { scheduledPodRep1Copy1 := buildTestPod("default", "-scheduled-pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID), WithNodeName(node.Name)) podRep1Copy1 := buildTestPod("default", "pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) podRep1Copy2 := buildTestPod("default", "pod-rep1-2", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) + podRep1Copy3Gated := buildTestPod("default", "pod-rep1-3", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) + podRep1Copy3Gated = WithSchedulingGatedStatus(podRep1Copy3Gated) job1 := createTestJob("job-1", "default", 10, 10, 0) scheduledPodJob1Copy1 := buildTestPod("default", "scheduled-pod-job1-1", WithControllerOwnerRef(job1.Name, "Job", job1.UID), WithNodeName(node.Name)) @@ -55,6 +57,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { scheduledParallelStatefulsetPod := buildTestPod("default", "parallel-scheduled-pod-statefulset-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID), WithNodeName(node.Name)) parallelStatefulsetPodCopy1 := buildTestPod("default", "parallel-pod-statefulset1-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID)) parallelStatefulsetPodCopy2 := buildTestPod("default", "parallel-pod-statefulset1-2", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID)) + parallelStatefulsetPodCopy3Gated := buildTestPod("default", "parallel-pod-statefulset1-3", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID)) + parallelStatefulsetPodCopy3Gated = WithSchedulingGatedStatus(parallelStatefulsetPodCopy3Gated) sequentialStatefulset := createTestStatefulset("sequential-statefulset-1", "default", appsv1.OrderedReadyPodManagement, 10) scheduledSequentialStatefulsetPod := buildTestPod("default", "sequential-scheduled-pod-statefulset-1", WithControllerOwnerRef(sequentialStatefulset.Name, "StatefulSet", sequentialStatefulset.UID), WithNodeName(node.Name)) @@ -72,6 +76,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { name string scheduledPods []*apiv1.Pod unschedulablePods []*apiv1.Pod + otherPods []*apiv1.Pod wantPods []*apiv1.Pod }{ { @@ -111,6 +116,20 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 7)..., ), }, + { + name: "Mix of controllers with scheduling gated pods", + scheduledPods: []*apiv1.Pod{scheduledPodRep1Copy1, scheduledPodJob1Copy1, scheduledParallelStatefulsetPod}, + unschedulablePods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2}, + otherPods: []*apiv1.Pod{parallelStatefulsetPodCopy3Gated, podRep1Copy3Gated}, + wantPods: append( + append( + append( + []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2}, + makeFakePods(replicaSet1.UID, scheduledPodRep1Copy1, 1)...), + makeFakePods(job1.UID, scheduledPodJob1Copy1, 7)...), + makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 6)..., + ), + }, } for _, tc := range testCases { @@ -119,9 +138,10 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16)) err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) assert.NoError(t, err) + allPodsLister := fakeAllPodsLister{podsToList: append(append(tc.scheduledPods, tc.unschedulablePods...), tc.otherPods...)} ctx := context.AutoscalingContext{ AutoscalingKubeClients: context.AutoscalingKubeClients{ - ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister), + ListerRegistry: kubernetes.NewListerRegistry(nil, nil, &allPodsLister, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister), }, ClusterSnapshot: clusterSnapshot, } @@ -433,3 +453,11 @@ func buildTestPod(namespace, name string, opts ...podOption) *apiv1.Pod { } type podOption func(*apiv1.Pod) + +type fakeAllPodsLister struct { + podsToList []*apiv1.Pod +} + +func (l *fakeAllPodsLister) List() ([]*apiv1.Pod, error) { + return l.podsToList, nil +} diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index b9be94b6e665..bcf321978fcb 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -204,6 +204,30 @@ func SchedulerUnprocessedPods(allPods []*apiv1.Pod, bypassedSchedulers map[strin return unprocessedPods } +// SchedulingGatedPods is a helper method that returns all pods which has scheduling gate +// SchedulingGated pods are not scheduled nor deleted by the implementation and are not unschedulable nor unprocessed by definition +func SchedulingGatedPods(allPods []*apiv1.Pod) []*apiv1.Pod { + var schedulingGatedPods []*apiv1.Pod + for _, pod := range allPods { + if pod != nil && isSchedulingGated(pod) { + schedulingGatedPods = append(schedulingGatedPods, pod) + } + } + return schedulingGatedPods +} + +// isSchedulingGated returns true in case PodScheduled is false with reason PodReasonSchedulingGated +func isSchedulingGated(pod *apiv1.Pod) bool { + if isScheduled(pod) || isDeleted(pod) { + return false + } + _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) + if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonSchedulingGated { + return true + } + return false +} + // UnschedulablePods is a helper method that returns all unschedulable pods from given pod list. func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod { var unschedulablePods []*apiv1.Pod diff --git a/cluster-autoscaler/utils/test/test_utils.go b/cluster-autoscaler/utils/test/test_utils.go index 984924715317..b212c8610562 100644 --- a/cluster-autoscaler/utils/test/test_utils.go +++ b/cluster-autoscaler/utils/test/test_utils.go @@ -359,6 +359,24 @@ func BuildTestNode(name string, millicpuCapacity int64, memCapacity int64) *apiv return node } +// WithSchedulingGatedStatus upserts the condition with type PodScheduled to be of status false +// and reason PodReasonSchedulingGated +func WithSchedulingGatedStatus(pod *apiv1.Pod) *apiv1.Pod { + gatedPodCondition := apiv1.PodCondition{ + Type: apiv1.PodScheduled, + Status: apiv1.ConditionFalse, + Reason: apiv1.PodReasonSchedulingGated, + } + for index := range pod.Status.Conditions { + if pod.Status.Conditions[index].Type == apiv1.PodScheduled { + pod.Status.Conditions[index] = gatedPodCondition + return pod + } + } + pod.Status.Conditions = append(pod.Status.Conditions, gatedPodCondition) + return pod +} + // WithAllocatable adds specified milliCpu and memory to Allocatable of the node in-place. func WithAllocatable(node *apiv1.Node, millicpuAllocatable, memAllocatable int64) *apiv1.Node { node.Status.Allocatable[apiv1.ResourceCPU] = *resource.NewMilliQuantity(millicpuAllocatable, resource.DecimalSI)