Skip to content

Commit 4627816

Browse files
Fix proactive scale up injecting fake pods for scheduling gated pods
1 parent 94637a2 commit 4627816

File tree

4 files changed

+99
-3
lines changed

4 files changed

+99
-3
lines changed

cluster-autoscaler/processors/podinjection/pod_group.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ limitations under the License.
1717
package podinjection
1818

1919
import (
20+
"fmt"
21+
2022
apiv1 "k8s.io/api/core/v1"
2123
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2224
"k8s.io/apimachinery/pkg/types"
25+
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
2326
)
2427

2528
type podGroup struct {
@@ -45,6 +48,33 @@ func groupPods(pods []*apiv1.Pod, controllers []controller) map[types.UID]podGro
4548
return podGroups
4649
}
4750

51+
func filterOutSchedulingGatedPods(groups map[types.UID]podGroup, allPods []*apiv1.Pod) (map[types.UID]podGroup, error) {
52+
if groups == nil {
53+
return nil, fmt.Errorf("Pod groups should not be nil")
54+
}
55+
if len(groups) > 0 {
56+
podsWithSchedulingGates := kube_util.SchedulingGatedPods(allPods)
57+
for _, pod := range podsWithSchedulingGates {
58+
if pod != nil {
59+
groups = removeSchedulingGatedPodFromGroups(groups, pod)
60+
}
61+
}
62+
}
63+
return groups, nil
64+
}
65+
66+
func removeSchedulingGatedPodFromGroups(groups map[types.UID]podGroup, pod *apiv1.Pod) map[types.UID]podGroup {
67+
for _, podOwnerRef := range pod.OwnerReferences {
68+
// SchedulingGated pods can't be unschedualable nor unprocessed nor scheduled so it is not expected
69+
// to have them as group sample nor in pod count, so decreasing desiredReplicas by one is enough
70+
if grp, found := groups[podOwnerRef.UID]; found {
71+
grp.desiredReplicas -= 1
72+
groups[podOwnerRef.UID] = grp
73+
}
74+
}
75+
return groups
76+
}
77+
4878
// updatePodGroups updates the pod group if ownerRef is the controller of the pod
4979
func updatePodGroups(pod *apiv1.Pod, ownerRef metav1.OwnerReference, podGroups map[types.UID]podGroup) map[types.UID]podGroup {
5080
if ownerRef.Controller == nil {

cluster-autoscaler/processors/podinjection/pod_injection_processor.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,17 @@ func (p *PodInjectionPodListProcessor) Process(ctx *context.AutoscalingContext,
6060
return unschedulablePods, fmt.Errorf("failed to list nodeInfos from cluster snapshot: %v", err)
6161
}
6262
scheduledPods := podsFromNodeInfos(nodeInfos)
63-
6463
groupedPods := groupPods(append(scheduledPods, unschedulablePods...), controllers)
65-
var podsToInject []*apiv1.Pod
6664

65+
allPods, err := ctx.AllPodLister().List()
66+
if err == nil {
67+
groupedPods, err = filterOutSchedulingGatedPods(groupedPods, allPods)
68+
}
69+
if err != nil {
70+
klog.Warningf("Pod injection processor failed to filter out scheduling gated pods with error: %v", err.Error())
71+
}
72+
73+
var podsToInject []*apiv1.Pod
6774
for _, groupedPod := range groupedPods {
6875
var fakePodCount = groupedPod.fakePodCount()
6976
fakePods := makeFakePods(groupedPod.ownerUid, groupedPod.sample, fakePodCount)

cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,22 @@ import (
3838
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
3939
)
4040

41+
func setStatusConditionGated(pod *apiv1.Pod) {
42+
pod.Status.Conditions = []apiv1.PodCondition{{
43+
Type: apiv1.PodScheduled,
44+
Status: apiv1.ConditionFalse,
45+
Reason: apiv1.PodReasonSchedulingGated,
46+
}}
47+
}
4148
func TestTargetCountInjectionPodListProcessor(t *testing.T) {
4249
node := BuildTestNode("node1", 100, 0)
4350

4451
replicaSet1 := createTestReplicaSet("rep-set-1", "default", 5)
4552
scheduledPodRep1Copy1 := buildTestPod("default", "-scheduled-pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID), WithNodeName(node.Name))
4653
podRep1Copy1 := buildTestPod("default", "pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
4754
podRep1Copy2 := buildTestPod("default", "pod-rep1-2", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
55+
podRep1Copy3Gated := buildTestPod("default", "pod-rep1-3", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
56+
setStatusConditionGated(podRep1Copy3Gated)
4857

4958
job1 := createTestJob("job-1", "default", 10, 10, 0)
5059
scheduledPodJob1Copy1 := buildTestPod("default", "scheduled-pod-job1-1", WithControllerOwnerRef(job1.Name, "Job", job1.UID), WithNodeName(node.Name))
@@ -55,6 +64,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
5564
scheduledParallelStatefulsetPod := buildTestPod("default", "parallel-scheduled-pod-statefulset-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID), WithNodeName(node.Name))
5665
parallelStatefulsetPodCopy1 := buildTestPod("default", "parallel-pod-statefulset1-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
5766
parallelStatefulsetPodCopy2 := buildTestPod("default", "parallel-pod-statefulset1-2", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
67+
parallelStatefulsetPodCopy3Gated := buildTestPod("default", "parallel-pod-statefulset1-3", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
68+
setStatusConditionGated(parallelStatefulsetPodCopy3Gated)
5869

5970
sequentialStatefulset := createTestStatefulset("sequential-statefulset-1", "default", appsv1.OrderedReadyPodManagement, 10)
6071
scheduledSequentialStatefulsetPod := buildTestPod("default", "sequential-scheduled-pod-statefulset-1", WithControllerOwnerRef(sequentialStatefulset.Name, "StatefulSet", sequentialStatefulset.UID), WithNodeName(node.Name))
@@ -72,6 +83,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
7283
name string
7384
scheduledPods []*apiv1.Pod
7485
unschedulablePods []*apiv1.Pod
86+
otherPods []*apiv1.Pod
7587
wantPods []*apiv1.Pod
7688
}{
7789
{
@@ -111,6 +123,20 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
111123
makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 7)...,
112124
),
113125
},
126+
{
127+
name: "Mix of controllers with scheduling gated pods",
128+
scheduledPods: []*apiv1.Pod{scheduledPodRep1Copy1, scheduledPodJob1Copy1, scheduledParallelStatefulsetPod},
129+
unschedulablePods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2},
130+
otherPods: []*apiv1.Pod{parallelStatefulsetPodCopy3Gated, podRep1Copy3Gated},
131+
wantPods: append(
132+
append(
133+
append(
134+
[]*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2},
135+
makeFakePods(replicaSet1.UID, scheduledPodRep1Copy1, 1)...),
136+
makeFakePods(job1.UID, scheduledPodJob1Copy1, 7)...),
137+
makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 6)...,
138+
),
139+
},
114140
}
115141

116142
for _, tc := range testCases {
@@ -119,9 +145,10 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
119145
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
120146
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
121147
assert.NoError(t, err)
148+
allPodsLister := fakeAllPodsLister{podsToList: append(append(tc.scheduledPods, tc.unschedulablePods...), tc.otherPods...)}
122149
ctx := context.AutoscalingContext{
123150
AutoscalingKubeClients: context.AutoscalingKubeClients{
124-
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
151+
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, &allPodsLister, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
125152
},
126153
ClusterSnapshot: clusterSnapshot,
127154
}
@@ -433,3 +460,11 @@ func buildTestPod(namespace, name string, opts ...podOption) *apiv1.Pod {
433460
}
434461

435462
type podOption func(*apiv1.Pod)
463+
464+
type fakeAllPodsLister struct {
465+
podsToList []*apiv1.Pod
466+
}
467+
468+
func (l *fakeAllPodsLister) List() ([]*apiv1.Pod, error) {
469+
return l.podsToList, nil
470+
}

cluster-autoscaler/utils/kubernetes/listers.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,30 @@ func SchedulerUnprocessedPods(allPods []*apiv1.Pod, bypassedSchedulers map[strin
204204
return unprocessedPods
205205
}
206206

207+
// SchedulingGatedPods is a helper method that returns all pods which has scheduling gate
208+
// SchedulingGated pods are not scheduled nor deleted by the implementation and are not unschedulable nor unprocessed by definition
209+
func SchedulingGatedPods(allPods []*apiv1.Pod) []*apiv1.Pod {
210+
var schedulingGatedPods []*apiv1.Pod
211+
for _, pod := range allPods {
212+
if pod != nil && isSchedulingGated(pod) {
213+
schedulingGatedPods = append(schedulingGatedPods, pod)
214+
}
215+
}
216+
return schedulingGatedPods
217+
}
218+
219+
// isSchedulingGated returns true in case PodScheduled is false with reason PodReasonSchedulingGated
220+
func isSchedulingGated(pod *apiv1.Pod) bool {
221+
if isScheduled(pod) || isDeleted(pod) {
222+
return false
223+
}
224+
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
225+
if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonSchedulingGated {
226+
return true
227+
}
228+
return false
229+
}
230+
207231
// UnschedulablePods is a helper method that returns all unschedulable pods from given pod list.
208232
func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
209233
var unschedulablePods []*apiv1.Pod

0 commit comments

Comments
 (0)