Skip to content

Commit a281a40

Browse files
Fix proactive scale up injecting fake pods for scheduling gated pods
1 parent 94637a2 commit a281a40

File tree

5 files changed

+110
-3
lines changed

5 files changed

+110
-3
lines changed

cluster-autoscaler/processors/podinjection/pod_group.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ limitations under the License.
1717
package podinjection
1818

1919
import (
20+
"fmt"
21+
2022
apiv1 "k8s.io/api/core/v1"
2123
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2224
"k8s.io/apimachinery/pkg/types"
25+
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
2326
)
2427

2528
type podGroup struct {
@@ -45,6 +48,33 @@ func groupPods(pods []*apiv1.Pod, controllers []controller) map[types.UID]podGro
4548
return podGroups
4649
}
4750

51+
func filterOutSchedulingGatedPods(groups map[types.UID]podGroup, allPods []*apiv1.Pod) (map[types.UID]podGroup, error) {
52+
if groups == nil {
53+
return nil, fmt.Errorf("Pod groups should not be nil")
54+
}
55+
if len(groups) > 0 {
56+
podsWithSchedulingGates := kube_util.SchedulingGatedPods(allPods)
57+
for _, pod := range podsWithSchedulingGates {
58+
if pod != nil {
59+
groups = removeSchedulingGatedPodFromGroups(groups, pod)
60+
}
61+
}
62+
}
63+
return groups, nil
64+
}
65+
66+
func removeSchedulingGatedPodFromGroups(groups map[types.UID]podGroup, pod *apiv1.Pod) map[types.UID]podGroup {
67+
for _, podOwnerRef := range pod.OwnerReferences {
68+
// SchedulingGated pods can't be unschedualable nor unprocessed nor scheduled so it is not expected
69+
// to have them as group sample nor in pod count, so decreasing desiredReplicas by one is enough
70+
if grp, found := groups[podOwnerRef.UID]; found {
71+
grp.desiredReplicas -= 1
72+
groups[podOwnerRef.UID] = grp
73+
}
74+
}
75+
return groups
76+
}
77+
4878
// updatePodGroups updates the pod group if ownerRef is the controller of the pod
4979
func updatePodGroups(pod *apiv1.Pod, ownerRef metav1.OwnerReference, podGroups map[types.UID]podGroup) map[types.UID]podGroup {
5080
if ownerRef.Controller == nil {

cluster-autoscaler/processors/podinjection/pod_injection_processor.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,17 @@ func (p *PodInjectionPodListProcessor) Process(ctx *context.AutoscalingContext,
6060
return unschedulablePods, fmt.Errorf("failed to list nodeInfos from cluster snapshot: %v", err)
6161
}
6262
scheduledPods := podsFromNodeInfos(nodeInfos)
63-
6463
groupedPods := groupPods(append(scheduledPods, unschedulablePods...), controllers)
65-
var podsToInject []*apiv1.Pod
6664

65+
allPods, err := ctx.AllPodLister().List()
66+
if err == nil {
67+
groupedPods, err = filterOutSchedulingGatedPods(groupedPods, allPods)
68+
}
69+
if err != nil {
70+
klog.Warningf("Pod injection processor failed to filter out scheduling gated pods with error: %v", err.Error())
71+
}
72+
73+
var podsToInject []*apiv1.Pod
6774
for _, groupedPod := range groupedPods {
6875
var fakePodCount = groupedPod.fakePodCount()
6976
fakePods := makeFakePods(groupedPod.ownerUid, groupedPod.sample, fakePodCount)

cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
4545
scheduledPodRep1Copy1 := buildTestPod("default", "-scheduled-pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID), WithNodeName(node.Name))
4646
podRep1Copy1 := buildTestPod("default", "pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
4747
podRep1Copy2 := buildTestPod("default", "pod-rep1-2", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
48+
podRep1Copy3Gated := buildTestPod("default", "pod-rep1-3", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
49+
podRep1Copy3Gated = WithSchedulingGatedStatus(podRep1Copy3Gated)
4850

4951
job1 := createTestJob("job-1", "default", 10, 10, 0)
5052
scheduledPodJob1Copy1 := buildTestPod("default", "scheduled-pod-job1-1", WithControllerOwnerRef(job1.Name, "Job", job1.UID), WithNodeName(node.Name))
@@ -55,6 +57,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
5557
scheduledParallelStatefulsetPod := buildTestPod("default", "parallel-scheduled-pod-statefulset-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID), WithNodeName(node.Name))
5658
parallelStatefulsetPodCopy1 := buildTestPod("default", "parallel-pod-statefulset1-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
5759
parallelStatefulsetPodCopy2 := buildTestPod("default", "parallel-pod-statefulset1-2", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
60+
parallelStatefulsetPodCopy3Gated := buildTestPod("default", "parallel-pod-statefulset1-3", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
61+
parallelStatefulsetPodCopy3Gated = WithSchedulingGatedStatus(parallelStatefulsetPodCopy3Gated)
5862

5963
sequentialStatefulset := createTestStatefulset("sequential-statefulset-1", "default", appsv1.OrderedReadyPodManagement, 10)
6064
scheduledSequentialStatefulsetPod := buildTestPod("default", "sequential-scheduled-pod-statefulset-1", WithControllerOwnerRef(sequentialStatefulset.Name, "StatefulSet", sequentialStatefulset.UID), WithNodeName(node.Name))
@@ -72,6 +76,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
7276
name string
7377
scheduledPods []*apiv1.Pod
7478
unschedulablePods []*apiv1.Pod
79+
otherPods []*apiv1.Pod
7580
wantPods []*apiv1.Pod
7681
}{
7782
{
@@ -111,6 +116,20 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
111116
makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 7)...,
112117
),
113118
},
119+
{
120+
name: "Mix of controllers with scheduling gated pods",
121+
scheduledPods: []*apiv1.Pod{scheduledPodRep1Copy1, scheduledPodJob1Copy1, scheduledParallelStatefulsetPod},
122+
unschedulablePods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2},
123+
otherPods: []*apiv1.Pod{parallelStatefulsetPodCopy3Gated, podRep1Copy3Gated},
124+
wantPods: append(
125+
append(
126+
append(
127+
[]*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2},
128+
makeFakePods(replicaSet1.UID, scheduledPodRep1Copy1, 1)...),
129+
makeFakePods(job1.UID, scheduledPodJob1Copy1, 7)...),
130+
makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 6)...,
131+
),
132+
},
114133
}
115134

116135
for _, tc := range testCases {
@@ -119,9 +138,10 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
119138
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
120139
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
121140
assert.NoError(t, err)
141+
allPodsLister := fakeAllPodsLister{podsToList: append(append(tc.scheduledPods, tc.unschedulablePods...), tc.otherPods...)}
122142
ctx := context.AutoscalingContext{
123143
AutoscalingKubeClients: context.AutoscalingKubeClients{
124-
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
144+
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, &allPodsLister, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
125145
},
126146
ClusterSnapshot: clusterSnapshot,
127147
}
@@ -433,3 +453,11 @@ func buildTestPod(namespace, name string, opts ...podOption) *apiv1.Pod {
433453
}
434454

435455
type podOption func(*apiv1.Pod)
456+
457+
type fakeAllPodsLister struct {
458+
podsToList []*apiv1.Pod
459+
}
460+
461+
func (l *fakeAllPodsLister) List() ([]*apiv1.Pod, error) {
462+
return l.podsToList, nil
463+
}

cluster-autoscaler/utils/kubernetes/listers.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,30 @@ func SchedulerUnprocessedPods(allPods []*apiv1.Pod, bypassedSchedulers map[strin
204204
return unprocessedPods
205205
}
206206

207+
// SchedulingGatedPods is a helper method that returns all pods which has scheduling gate
208+
// SchedulingGated pods are not scheduled nor deleted by the implementation and are not unschedulable nor unprocessed by definition
209+
func SchedulingGatedPods(allPods []*apiv1.Pod) []*apiv1.Pod {
210+
var schedulingGatedPods []*apiv1.Pod
211+
for _, pod := range allPods {
212+
if pod != nil && isSchedulingGated(pod) {
213+
schedulingGatedPods = append(schedulingGatedPods, pod)
214+
}
215+
}
216+
return schedulingGatedPods
217+
}
218+
219+
// isSchedulingGated returns true in case PodScheduled is false with reason PodReasonSchedulingGated
220+
func isSchedulingGated(pod *apiv1.Pod) bool {
221+
if isScheduled(pod) || isDeleted(pod) {
222+
return false
223+
}
224+
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
225+
if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonSchedulingGated {
226+
return true
227+
}
228+
return false
229+
}
230+
207231
// UnschedulablePods is a helper method that returns all unschedulable pods from given pod list.
208232
func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
209233
var unschedulablePods []*apiv1.Pod

cluster-autoscaler/utils/test/test_utils.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,24 @@ func BuildTestNode(name string, millicpuCapacity int64, memCapacity int64) *apiv
359359
return node
360360
}
361361

362+
// WithSchedulingGatedStatus upserts the condition with type PodScheduled to be of status false
363+
// and reason PodReasonSchedulingGated
364+
func WithSchedulingGatedStatus(pod *apiv1.Pod) *apiv1.Pod {
365+
gatedPodCondition := apiv1.PodCondition{
366+
Type: apiv1.PodScheduled,
367+
Status: apiv1.ConditionFalse,
368+
Reason: apiv1.PodReasonSchedulingGated,
369+
}
370+
for index := range pod.Status.Conditions {
371+
if pod.Status.Conditions[index].Type == apiv1.PodScheduled {
372+
pod.Status.Conditions[index] = gatedPodCondition
373+
return pod
374+
}
375+
}
376+
pod.Status.Conditions = append(pod.Status.Conditions, gatedPodCondition)
377+
return pod
378+
}
379+
362380
// WithAllocatable adds specified milliCpu and memory to Allocatable of the node in-place.
363381
func WithAllocatable(node *apiv1.Node, millicpuAllocatable, memAllocatable int64) *apiv1.Node {
364382
node.Status.Allocatable[apiv1.ResourceCPU] = *resource.NewMilliQuantity(millicpuAllocatable, resource.DecimalSI)

0 commit comments

Comments
 (0)