Skip to content

Commit 5e0211c

Browse files
committed
Added pre-processed required affinity terms to scheduler's PodInfo type.
1 parent b8be11e commit 5e0211c

File tree

8 files changed

+146
-220
lines changed

8 files changed

+146
-220
lines changed

pkg/scheduler/framework/plugins/interpodaffinity/BUILD

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ go_library(
1616
"//pkg/scheduler/util:go_default_library",
1717
"//staging/src/k8s.io/api/core/v1:go_default_library",
1818
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
19-
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
2019
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
21-
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2220
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
2321
"//vendor/k8s.io/klog:go_default_library",
2422
],

pkg/scheduler/framework/plugins/interpodaffinity/filtering.go

Lines changed: 67 additions & 178 deletions
Large diffs are not rendered by default.

pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2035,7 +2035,6 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
20352035
pod *v1.Pod
20362036
wantAffinityPodsMap topologyToMatchedTermCount
20372037
wantAntiAffinityPodsMap topologyToMatchedTermCount
2038-
wantErr bool
20392038
}{
20402039
{
20412040
name: "nil test",
@@ -2195,11 +2194,7 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
21952194
t.Run(tt.name, func(t *testing.T) {
21962195
s := cache.NewSnapshot(tt.existingPods, tt.nodes)
21972196
l, _ := s.NodeInfos().List()
2198-
gotAffinityPodsMap, gotAntiAffinityPodsMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, l)
2199-
if (err != nil) != tt.wantErr {
2200-
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() error = %v, wantErr %v", err, tt.wantErr)
2201-
return
2202-
}
2197+
gotAffinityPodsMap, gotAntiAffinityPodsMap := getTPMapMatchingIncomingAffinityAntiAffinity(framework.NewPodInfo(tt.pod), l)
22032198
if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) {
22042199
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap)
22052200
}

pkg/scheduler/framework/plugins/interpodaffinity/scoring.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (s *preScoreState) Clone() framework.StateData {
4848

4949
// A "processed" representation of v1.WeightedAffinityTerm.
5050
type weightedAffinityTerm struct {
51-
affinityTerm
51+
framework.AffinityTerm
5252
weight int32
5353
}
5454

@@ -58,7 +58,7 @@ func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32
5858
if err != nil {
5959
return nil, err
6060
}
61-
return &weightedAffinityTerm{affinityTerm: affinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey}, weight: weight}, nil
61+
return &weightedAffinityTerm{AffinityTerm: framework.AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}, weight: weight}, nil
6262
}
6363

6464
func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) {
@@ -87,13 +87,13 @@ func (m scoreMap) processTerm(
8787
return
8888
}
8989

90-
match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector)
91-
tpValue, tpValueExist := fixedNode.Labels[term.topologyKey]
90+
match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.Namespaces, term.Selector)
91+
tpValue, tpValueExist := fixedNode.Labels[term.TopologyKey]
9292
if match && tpValueExist {
93-
if m[term.topologyKey] == nil {
94-
m[term.topologyKey] = make(map[string]int64)
93+
if m[term.TopologyKey] == nil {
94+
m[term.TopologyKey] = make(map[string]int64)
9595
}
96-
m[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
96+
m[term.TopologyKey][tpValue] += int64(term.weight * int32(multiplier))
9797
}
9898
return
9999
}

pkg/scheduler/framework/v1alpha1/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ go_library(
2525
"//pkg/scheduler/util:go_default_library",
2626
"//staging/src/k8s.io/api/core/v1:go_default_library",
2727
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
28+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
29+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
2830
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2931
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
3032
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",

pkg/scheduler/framework/v1alpha1/types.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import (
2525

2626
"k8s.io/api/core/v1"
2727
"k8s.io/apimachinery/pkg/api/resource"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/labels"
30+
"k8s.io/apimachinery/pkg/util/sets"
2831
utilfeature "k8s.io/apiserver/pkg/util/feature"
2932
"k8s.io/klog"
3033
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@@ -65,13 +68,54 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
6568
// accelerate processing. This information is typically immutable (e.g., pre-processed
6669
// inter-pod affinity selectors).
6770
type PodInfo struct {
68-
Pod *v1.Pod
71+
Pod *v1.Pod
72+
RequiredAffinityTerms []AffinityTerm
73+
RequiredAntiAffinityTerms []AffinityTerm
74+
}
75+
76+
// AffinityTerm is a processed version of v1.PodAffinityTerm.
77+
type AffinityTerm struct {
78+
Namespaces sets.String
79+
Selector labels.Selector
80+
TopologyKey string
81+
}
82+
83+
func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) *AffinityTerm {
84+
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term)
85+
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
86+
if err != nil {
87+
klog.Errorf("Cannot process label selector: %v", err)
88+
return nil
89+
}
90+
return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}
91+
}
92+
93+
// getAffinityTerms receives a Pod and affinity terms and returns the namespaces and
94+
// selectors of the terms.
95+
func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) []AffinityTerm {
96+
if v1Terms == nil {
97+
return nil
98+
}
99+
100+
var terms []AffinityTerm
101+
for _, term := range v1Terms {
102+
t := newAffinityTerm(pod, &term)
103+
if t == nil {
104+
// We get here if the label selector failed to process, this is not supposed
105+
// to happen because the pod should have been validated by the api server.
106+
return nil
107+
}
108+
terms = append(terms, *t)
109+
}
110+
return terms
69111
}
70112

71113
// NewPodInfo return a new PodInfo
72114
func NewPodInfo(pod *v1.Pod) *PodInfo {
73115
return &PodInfo{
74-
Pod: pod,
116+
Pod: pod,
117+
RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)),
118+
RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)),
75119
}
76120
}
77121

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -532,21 +532,19 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
532532
var podsToMove []*framework.QueuedPodInfo
533533
for _, pInfo := range p.unschedulableQ.podInfoMap {
534534
up := pInfo.Pod
535-
affinity := up.Spec.Affinity
536-
if affinity != nil && affinity.PodAffinity != nil {
537-
terms := util.GetPodAffinityTerms(affinity.PodAffinity)
538-
for _, term := range terms {
539-
namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
540-
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
541-
if err != nil {
542-
klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
543-
}
544-
if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
545-
podsToMove = append(podsToMove, pInfo)
546-
break
547-
}
535+
terms := util.GetPodAffinityTerms(up.Spec.Affinity)
536+
for _, term := range terms {
537+
namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
538+
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
539+
if err != nil {
540+
klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
541+
}
542+
if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
543+
podsToMove = append(podsToMove, pInfo)
544+
break
548545
}
549546
}
547+
550548
}
551549
return podsToMove
552550
}

pkg/scheduler/util/utils.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,28 +83,28 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool {
8383
}
8484

8585
// GetPodAffinityTerms gets pod affinity terms by a pod affinity object.
86-
func GetPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) {
87-
if podAffinity != nil {
88-
if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
89-
terms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
86+
func GetPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
87+
if affinity != nil && affinity.PodAffinity != nil {
88+
if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
89+
terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
9090
}
9191
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
92-
//if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
93-
// terms = append(terms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
92+
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
93+
// terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
9494
//}
9595
}
9696
return terms
9797
}
9898

9999
// GetPodAntiAffinityTerms gets pod affinity terms by a pod anti-affinity.
100-
func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) {
101-
if podAntiAffinity != nil {
102-
if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
103-
terms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
100+
func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
101+
if affinity != nil && affinity.PodAntiAffinity != nil {
102+
if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
103+
terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
104104
}
105105
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
106-
//if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
107-
// terms = append(terms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
106+
//if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
107+
// terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
108108
//}
109109
}
110110
return terms

0 commit comments

Comments
 (0)