Skip to content

Commit a8873e1

Browse files
committed
Track pods with required anti-affinity
This is a performance optimization that reduces the overhead of inter-pod affinity PreFilter calculaitons. Basically eliminates that overhead when no pods in the cluster use required pod anti-affinity. This offered 20% improvement on 5k clusters for preferred anti-affinity benchmarks.
1 parent 3b5aedc commit a8873e1

File tree

6 files changed

+93
-25
lines changed

6 files changed

+93
-25
lines changed

pkg/scheduler/framework/plugins/interpodaffinity/filtering.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,25 +163,25 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo
163163
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
164164
// (1) Whether it has PodAntiAffinity
165165
// (2) Whether any AffinityTerm matches the incoming pod
166-
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount {
167-
topoMaps := make([]topologyToMatchedTermCount, len(allNodes))
166+
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
167+
topoMaps := make([]topologyToMatchedTermCount, len(nodes))
168168
index := int32(-1)
169169
processNode := func(i int) {
170-
nodeInfo := allNodes[i]
170+
nodeInfo := nodes[i]
171171
node := nodeInfo.Node()
172172
if node == nil {
173173
klog.Error("node not found")
174174
return
175175
}
176176
topoMap := make(topologyToMatchedTermCount)
177-
for _, existingPod := range nodeInfo.PodsWithAffinity {
177+
for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity {
178178
topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1)
179179
}
180180
if len(topoMap) != 0 {
181181
topoMaps[atomic.AddInt32(&index, 1)] = topoMap
182182
}
183183
}
184-
parallelize.Until(context.Background(), len(allNodes), processNode)
184+
parallelize.Until(context.Background(), len(nodes), processNode)
185185

186186
result := make(topologyToMatchedTermCount)
187187
for i := 0; i <= int(index); i++ {
@@ -241,12 +241,12 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, al
241241
// PreFilter invoked at the prefilter extension point.
242242
func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
243243
var allNodes []*framework.NodeInfo
244-
var havePodsWithAffinityNodes []*framework.NodeInfo
244+
var nodesWithRequiredAntiAffinityPods []*framework.NodeInfo
245245
var err error
246246
if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil {
247247
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos: %v", err))
248248
}
249-
if havePodsWithAffinityNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList(); err != nil {
249+
if nodesWithRequiredAntiAffinityPods, err = pl.sharedLister.NodeInfos().HavePodsWithRequiredAntiAffinityList(); err != nil {
250250
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos with pods with affinity: %v", err))
251251
}
252252

@@ -256,7 +256,7 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
256256
}
257257

258258
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
259-
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes)
259+
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, nodesWithRequiredAntiAffinityPods)
260260

261261
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
262262
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity

pkg/scheduler/framework/v1alpha1/fake/listers.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,12 @@ func (nodes NodeInfoLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, e
245245
return nodes, nil
246246
}
247247

248+
// HavePodsWithRequiredAntiAffinityList is supposed to list nodes with at least one pod with
249+
// required anti-affinity. For the fake lister we just return everything.
250+
func (nodes NodeInfoLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
251+
return nodes, nil
252+
}
253+
248254
// NewNodeInfoLister create a new fake NodeInfoLister from a slice of v1.Nodes.
249255
func NewNodeInfoLister(nodes []*v1.Node) framework.NodeInfoLister {
250256
nodeInfoList := make([]*framework.NodeInfo, len(nodes))

pkg/scheduler/framework/v1alpha1/listers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ type NodeInfoLister interface {
2222
List() ([]*NodeInfo, error)
2323
// Returns the list of NodeInfos of nodes with pods with affinity terms.
2424
HavePodsWithAffinityList() ([]*NodeInfo, error)
25+
// Returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
26+
HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error)
2527
// Returns the NodeInfo of the given node name.
2628
Get(nodeName string) (*NodeInfo, error)
2729
}

pkg/scheduler/framework/v1alpha1/types.go

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ type NodeInfo struct {
196196
// The subset of pods with affinity.
197197
PodsWithAffinity []*PodInfo
198198

199+
// The subset of pods with required anti-affinity.
200+
PodsWithRequiredAntiAffinity []*PodInfo
201+
199202
// Ports allocated on the node.
200203
UsedPorts HostPortInfo
201204

@@ -457,6 +460,9 @@ func (n *NodeInfo) Clone() *NodeInfo {
457460
if len(n.PodsWithAffinity) > 0 {
458461
clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...)
459462
}
463+
if len(n.PodsWithRequiredAntiAffinity) > 0 {
464+
clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...)
465+
}
460466
return clone
461467
}
462468

@@ -486,44 +492,67 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) {
486492
n.NonZeroRequested.MilliCPU += non0CPU
487493
n.NonZeroRequested.Memory += non0Mem
488494
n.Pods = append(n.Pods, podInfo)
489-
affinity := pod.Spec.Affinity
490-
if affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) {
495+
if podWithAffinity(pod) {
491496
n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
492497
}
498+
if podWithRequiredAntiAffinity(pod) {
499+
n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
500+
}
493501

494502
// Consume ports when pods added.
495503
n.updateUsedPorts(podInfo.Pod, true)
496504

497505
n.Generation = nextGeneration()
498506
}
499507

500-
// RemovePod subtracts pod information from this NodeInfo.
501-
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
502-
k1, err := GetPodKey(pod)
503-
if err != nil {
504-
return err
505-
}
508+
func podWithAffinity(p *v1.Pod) bool {
509+
affinity := p.Spec.Affinity
510+
return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
511+
}
512+
513+
func podWithRequiredAntiAffinity(p *v1.Pod) bool {
514+
affinity := p.Spec.Affinity
515+
return affinity != nil && affinity.PodAntiAffinity != nil &&
516+
len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0
517+
}
506518

507-
for i := range n.PodsWithAffinity {
508-
k2, err := GetPodKey(n.PodsWithAffinity[i].Pod)
519+
func removeFromSlice(s []*PodInfo, k string) []*PodInfo {
520+
for i := range s {
521+
k2, err := GetPodKey(s[i].Pod)
509522
if err != nil {
510523
klog.Errorf("Cannot get pod key, err: %v", err)
511524
continue
512525
}
513-
if k1 == k2 {
526+
if k == k2 {
514527
// delete the element
515-
n.PodsWithAffinity[i] = n.PodsWithAffinity[len(n.PodsWithAffinity)-1]
516-
n.PodsWithAffinity = n.PodsWithAffinity[:len(n.PodsWithAffinity)-1]
528+
s[i] = s[len(s)-1]
529+
s = s[:len(s)-1]
517530
break
518531
}
519532
}
533+
return s
534+
}
535+
536+
// RemovePod subtracts pod information from this NodeInfo.
537+
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
538+
k, err := GetPodKey(pod)
539+
if err != nil {
540+
return err
541+
}
542+
if podWithAffinity(pod) {
543+
n.PodsWithAffinity = removeFromSlice(n.PodsWithAffinity, k)
544+
}
545+
if podWithRequiredAntiAffinity(pod) {
546+
n.PodsWithRequiredAntiAffinity = removeFromSlice(n.PodsWithRequiredAntiAffinity, k)
547+
}
548+
520549
for i := range n.Pods {
521550
k2, err := GetPodKey(n.Pods[i].Pod)
522551
if err != nil {
523552
klog.Errorf("Cannot get pod key, err: %v", err)
524553
continue
525554
}
526-
if k1 == k2 {
555+
if k == k2 {
527556
// delete the element
528557
n.Pods[i] = n.Pods[len(n.Pods)-1]
529558
n.Pods = n.Pods[:len(n.Pods)-1]
@@ -558,6 +587,9 @@ func (n *NodeInfo) resetSlicesIfEmpty() {
558587
if len(n.PodsWithAffinity) == 0 {
559588
n.PodsWithAffinity = nil
560589
}
590+
if len(n.PodsWithRequiredAntiAffinity) == 0 {
591+
n.PodsWithRequiredAntiAffinity = nil
592+
}
561593
if len(n.Pods) == 0 {
562594
n.Pods = nil
563595
}

pkg/scheduler/internal/cache/cache.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
213213
// status from having pods with affinity to NOT having pods with affinity or the other
214214
// way around.
215215
updateNodesHavePodsWithAffinity := false
216+
// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
217+
// status from having pods with required anti-affinity to NOT having pods with required
218+
// anti-affinity or the other way around.
219+
updateNodesHavePodsWithRequiredAntiAffinity := false
216220

217221
// Start from the head of the NodeInfo doubly linked list and update snapshot
218222
// of NodeInfos updated after the last snapshot.
@@ -239,6 +243,9 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
239243
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
240244
updateNodesHavePodsWithAffinity = true
241245
}
246+
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
247+
updateNodesHavePodsWithRequiredAntiAffinity = true
248+
}
242249
// We need to preserve the original pointer of the NodeInfo struct since it
243250
// is used in the NodeInfoList, which we may not update.
244251
*existing = *clone
@@ -254,7 +261,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
254261
updateAllLists = true
255262
}
256263

257-
if updateAllLists || updateNodesHavePodsWithAffinity {
264+
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
258265
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
259266
}
260267

@@ -276,6 +283,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
276283

277284
func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
278285
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
286+
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
279287
if updateAll {
280288
// Take a snapshot of the nodes order in the tree
281289
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
@@ -287,6 +295,9 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
287295
if len(n.PodsWithAffinity) > 0 {
288296
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
289297
}
298+
if len(n.PodsWithRequiredAntiAffinity) > 0 {
299+
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, n)
300+
}
290301
} else {
291302
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
292303
}
@@ -296,6 +307,9 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
296307
if len(n.PodsWithAffinity) > 0 {
297308
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
298309
}
310+
if len(n.PodsWithRequiredAntiAffinity) > 0 {
311+
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, n)
312+
}
299313
}
300314
}
301315
}

pkg/scheduler/internal/cache/snapshot.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ type Snapshot struct {
3333
nodeInfoList []*framework.NodeInfo
3434
// havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms.
3535
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
36-
generation int64
36+
// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
37+
// required anti-affinity terms.
38+
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
39+
generation int64
3740
}
3841

3942
var _ framework.SharedLister = &Snapshot{}
@@ -50,17 +53,22 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
5053
nodeInfoMap := createNodeInfoMap(pods, nodes)
5154
nodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
5255
havePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
56+
havePodsWithRequiredAntiAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
5357
for _, v := range nodeInfoMap {
5458
nodeInfoList = append(nodeInfoList, v)
5559
if len(v.PodsWithAffinity) > 0 {
5660
havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v)
5761
}
62+
if len(v.PodsWithRequiredAntiAffinity) > 0 {
63+
havePodsWithRequiredAntiAffinityNodeInfoList = append(havePodsWithRequiredAntiAffinityNodeInfoList, v)
64+
}
5865
}
5966

6067
s := NewEmptySnapshot()
6168
s.nodeInfoMap = nodeInfoMap
6269
s.nodeInfoList = nodeInfoList
6370
s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
71+
s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList
6472

6573
return s
6674
}
@@ -137,11 +145,17 @@ func (s *Snapshot) List() ([]*framework.NodeInfo, error) {
137145
return s.nodeInfoList, nil
138146
}
139147

140-
// HavePodsWithAffinityList returns the list of nodes with at least one pods with inter-pod affinity
148+
// HavePodsWithAffinityList returns the list of nodes with at least one pod with inter-pod affinity
141149
func (s *Snapshot) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) {
142150
return s.havePodsWithAffinityNodeInfoList, nil
143151
}
144152

153+
// HavePodsWithRequiredAntiAffinityList returns the list of nodes with at least one pod with
154+
// required inter-pod anti-affinity
155+
func (s *Snapshot) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
156+
return s.havePodsWithRequiredAntiAffinityNodeInfoList, nil
157+
}
158+
145159
// Get returns the NodeInfo of the given node name.
146160
func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
147161
if v, ok := s.nodeInfoMap[nodeName]; ok && v.Node() != nil {

0 commit comments

Comments
 (0)