Skip to content

Commit fe2fdcd

Browse files
authored
Merge pull request kubernetes#89162 from alculquicondor/affinity-less-lock
Reduce locking when calculating affinity scores
2 parents e74ad38 + d0dc178 commit fe2fdcd

File tree

1 file changed

+35
-18
lines changed
  • pkg/scheduler/framework/plugins/interpodaffinity

1 file changed

+35
-18
lines changed

pkg/scheduler/framework/plugins/interpodaffinity/scoring.go

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ import (
3232
// preScoreStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring.
3333
const preScoreStateKey = "PreScore" + Name
3434

35+
type scoreMap map[string]map[string]int64
36+
3537
// preScoreState computed at PreScore and used at Score.
3638
type preScoreState struct {
37-
topologyScore map[string]map[string]int64
39+
topologyScore scoreMap
3840
affinityTerms []*weightedAffinityTerm
3941
antiAffinityTerms []*weightedAffinityTerm
4042
}
@@ -76,8 +78,7 @@ func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm)
7678
return terms, nil
7779
}
7880

79-
func (pl *InterPodAffinity) processTerm(
80-
state *preScoreState,
81+
func (m scoreMap) processTerm(
8182
term *weightedAffinityTerm,
8283
podToCheck *v1.Pod,
8384
fixedNode *v1.Node,
@@ -90,24 +91,34 @@ func (pl *InterPodAffinity) processTerm(
9091
match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector)
9192
tpValue, tpValueExist := fixedNode.Labels[term.topologyKey]
9293
if match && tpValueExist {
93-
pl.Lock()
94-
if state.topologyScore[term.topologyKey] == nil {
95-
state.topologyScore[term.topologyKey] = make(map[string]int64)
94+
if m[term.topologyKey] == nil {
95+
m[term.topologyKey] = make(map[string]int64)
9696
}
97-
state.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
98-
pl.Unlock()
97+
m[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
9998
}
10099
return
101100
}
102101

103-
func (pl *InterPodAffinity) processTerms(state *preScoreState, terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
102+
func (m scoreMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
104103
for _, term := range terms {
105-
pl.processTerm(state, term, podToCheck, fixedNode, multiplier)
104+
m.processTerm(term, podToCheck, fixedNode, multiplier)
105+
}
106+
}
107+
108+
func (m scoreMap) append(other scoreMap) {
109+
for topology, oScores := range other {
110+
scores := m[topology]
111+
if scores == nil {
112+
m[topology] = oScores
113+
continue
114+
}
115+
for k, v := range oScores {
116+
scores[k] += v
117+
}
106118
}
107-
return nil
108119
}
109120

110-
func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *nodeinfo.NodeInfo, incomingPod *v1.Pod) error {
121+
func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *nodeinfo.NodeInfo, incomingPod *v1.Pod, topoScore scoreMap) error {
111122
existingPodAffinity := existingPod.Spec.Affinity
112123
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
113124
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
@@ -116,12 +127,12 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod
116127
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
117128
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
118129
// value as that of <existingPods>`s node by the term`s weight.
119-
pl.processTerms(state, state.affinityTerms, existingPod, existingPodNode, 1)
130+
topoScore.processTerms(state.affinityTerms, existingPod, existingPodNode, 1)
120131

121132
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
122133
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
123134
// value as that of <existingPod>`s node by the term`s weight.
124-
pl.processTerms(state, state.antiAffinityTerms, existingPod, existingPodNode, -1)
135+
topoScore.processTerms(state.antiAffinityTerms, existingPod, existingPodNode, -1)
125136

126137
if existingHasAffinityConstraints {
127138
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
@@ -139,7 +150,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod
139150
if err != nil {
140151
return err
141152
}
142-
pl.processTerm(state, processedTerm, incomingPod, existingPodNode, 1)
153+
topoScore.processTerm(processedTerm, incomingPod, existingPodNode, 1)
143154
}
144155
}
145156
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
@@ -151,7 +162,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod
151162
return nil
152163
}
153164

154-
pl.processTerms(state, terms, incomingPod, existingPodNode, 1)
165+
topoScore.processTerms(terms, incomingPod, existingPodNode, 1)
155166
}
156167
if existingHasAntiAffinityConstraints {
157168
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
@@ -161,7 +172,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod
161172
if err != nil {
162173
return err
163174
}
164-
pl.processTerms(state, terms, incomingPod, existingPodNode, -1)
175+
topoScore.processTerms(terms, incomingPod, existingPodNode, -1)
165176
}
166177
return nil
167178
}
@@ -235,12 +246,18 @@ func (pl *InterPodAffinity) PreScore(
235246
podsToProcess = nodeInfo.Pods()
236247
}
237248

249+
topoScore := make(scoreMap)
238250
for _, existingPod := range podsToProcess {
239-
if err := pl.processExistingPod(state, existingPod, nodeInfo, pod); err != nil {
251+
if err := pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore); err != nil {
240252
errCh.SendErrorWithCancel(err, cancel)
241253
return
242254
}
243255
}
256+
if len(topoScore) > 0 {
257+
pl.Lock()
258+
state.topologyScore.append(topoScore)
259+
pl.Unlock()
260+
}
244261
}
245262
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
246263
if err := errCh.ReceiveError(); err != nil {

0 commit comments

Comments
 (0)