Skip to content

Commit dda530c

Browse files
authored
Merge pull request kubernetes#89665 from alculquicondor/per_node_filter_spreading
Optimize topology spreading filter
2 parents b1d85a6 + 93fc02c commit dda530c

File tree

2 files changed

+166
-146
lines changed

2 files changed

+166
-146
lines changed

pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ import (
2020
"context"
2121
"fmt"
2222
"math"
23-
"sync"
23+
"sync/atomic"
2424

2525
v1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/labels"
2727
"k8s.io/klog"
28-
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
28+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
2929
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
3030
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
3131
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@@ -48,7 +48,7 @@ type preFilterState struct {
4848
// it's not guaranteed to be the 2nd minimum match number.
4949
TpKeyToCriticalPaths map[string]*criticalPaths
5050
// TpPairToMatchNum is keyed with topologyPair, and valued with the number of matching pods.
51-
TpPairToMatchNum map[topologyPair]int32
51+
TpPairToMatchNum map[topologyPair]*int32
5252
}
5353

5454
// Clone makes a copy of the given state.
@@ -61,14 +61,15 @@ func (s *preFilterState) Clone() framework.StateData {
6161
// Constraints are shared because they don't change.
6262
Constraints: s.Constraints,
6363
TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(s.TpKeyToCriticalPaths)),
64-
TpPairToMatchNum: make(map[topologyPair]int32, len(s.TpPairToMatchNum)),
64+
TpPairToMatchNum: make(map[topologyPair]*int32, len(s.TpPairToMatchNum)),
6565
}
6666
for tpKey, paths := range s.TpKeyToCriticalPaths {
6767
copy.TpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
6868
}
6969
for tpPair, matchNum := range s.TpPairToMatchNum {
7070
copyPair := topologyPair{key: tpPair.key, value: tpPair.value}
71-
copy.TpPairToMatchNum[copyPair] = matchNum
71+
copyCount := *matchNum
72+
copy.TpPairToMatchNum[copyPair] = &copyCount
7273
}
7374
return &copy
7475
}
@@ -137,9 +138,9 @@ func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v
137138

138139
k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey]
139140
pair := topologyPair{key: k, value: v}
140-
s.TpPairToMatchNum[pair] = s.TpPairToMatchNum[pair] + delta
141+
*s.TpPairToMatchNum[pair] += delta
141142

142-
s.TpKeyToCriticalPaths[k].update(v, s.TpPairToMatchNum[pair])
143+
s.TpKeyToCriticalPaths[k].update(v, *s.TpPairToMatchNum[pair])
143144
}
144145
}
145146

@@ -219,52 +220,44 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
219220
return &preFilterState{}, nil
220221
}
221222

222-
var lock sync.Mutex
223-
224-
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
225-
// In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
226223
s := preFilterState{
227224
Constraints: constraints,
228225
TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
229-
TpPairToMatchNum: make(map[topologyPair]int32),
230-
}
231-
addTopologyPairMatchNum := func(pair topologyPair, num int32) {
232-
lock.Lock()
233-
s.TpPairToMatchNum[pair] += num
234-
lock.Unlock()
226+
TpPairToMatchNum: make(map[topologyPair]*int32, sizeHeuristic(len(allNodes), constraints)),
235227
}
236-
237-
processNode := func(i int) {
238-
nodeInfo := allNodes[i]
239-
node := nodeInfo.Node()
228+
for _, n := range allNodes {
229+
node := n.Node()
240230
if node == nil {
241231
klog.Error("node not found")
242-
return
232+
continue
243233
}
244234
// In accordance to design, if NodeAffinity or NodeSelector is defined,
245235
// spreading is applied to nodes that pass those filters.
246-
if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
247-
return
236+
if !helper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
237+
continue
248238
}
249-
250239
// Ensure current node's labels contains all topologyKeys in 'Constraints'.
251240
if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
252-
return
241+
continue
242+
}
243+
for _, c := range constraints {
244+
pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
245+
s.TpPairToMatchNum[pair] = new(int32)
253246
}
247+
}
248+
249+
processNode := func(i int) {
250+
nodeInfo := allNodes[i]
251+
node := nodeInfo.Node()
252+
254253
for _, constraint := range constraints {
255-
matchTotal := int32(0)
256-
// nodeInfo.Pods() can be empty; or all pods don't fit
257-
for _, existingPod := range nodeInfo.Pods() {
258-
// Bypass terminating Pod (see #87621).
259-
if existingPod.DeletionTimestamp != nil || existingPod.Namespace != pod.Namespace {
260-
continue
261-
}
262-
if constraint.Selector.Matches(labels.Set(existingPod.Labels)) {
263-
matchTotal++
264-
}
265-
}
266254
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
267-
addTopologyPairMatchNum(pair, matchTotal)
255+
tpCount := s.TpPairToMatchNum[pair]
256+
if tpCount == nil {
257+
continue
258+
}
259+
count := countPodsMatchSelector(nodeInfo.Pods(), constraint.Selector, pod.Namespace)
260+
atomic.AddInt32(tpCount, int32(count))
268261
}
269262
}
270263
parallelize.Until(context.Background(), len(allNodes), processNode)
@@ -275,7 +268,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
275268
s.TpKeyToCriticalPaths[key] = newCriticalPaths()
276269
}
277270
for pair, num := range s.TpPairToMatchNum {
278-
s.TpKeyToCriticalPaths[pair.key].update(pair.value, num)
271+
s.TpKeyToCriticalPaths[pair.key].update(pair.value, *num)
279272
}
280273

281274
return &s, nil
@@ -322,7 +315,10 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C
322315
// judging criteria:
323316
// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
324317
minMatchNum := paths[0].MatchNum
325-
matchNum := s.TpPairToMatchNum[pair]
318+
matchNum := int32(0)
319+
if tpCount := s.TpPairToMatchNum[pair]; tpCount != nil {
320+
matchNum = *tpCount
321+
}
326322
skew := matchNum + selfMatchNum - minMatchNum
327323
if skew > c.MaxSkew {
328324
klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: MatchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.MaxSkew)
@@ -332,3 +328,12 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C
332328

333329
return nil
334330
}
331+
332+
func sizeHeuristic(nodes int, constraints []topologySpreadConstraint) int {
333+
for _, c := range constraints {
334+
if c.TopologyKey == v1.LabelHostname {
335+
return nodes
336+
}
337+
}
338+
return 0
339+
}

0 commit comments

Comments
 (0)