Skip to content

Commit 0746f16

Browse files
authored
Merge pull request kubernetes#91229 from ahg-g/ahg-affinity3
Eliminate locking in (anti)affinity calculations
2 parents 739a61a + 4ff554b commit 0746f16

File tree

3 files changed

+43
-59
lines changed

3 files changed

+43
-59
lines changed

pkg/scheduler/framework/plugins/interpodaffinity/filtering.go

Lines changed: 35 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package interpodaffinity
1919
import (
2020
"context"
2121
"fmt"
22-
"sync"
22+
"sync/atomic"
2323

2424
v1 "k8s.io/api/core/v1"
2525
"k8s.io/klog/v2"
@@ -128,7 +128,7 @@ func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, t
128128
}
129129
}
130130

131-
// updateAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
131+
// updateWithAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
132132
// for each anti-affinity term matched the target pod.
133133
func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []framework.AffinityTerm, value int64) {
134134
// Check anti-affinity terms.
@@ -160,101 +160,82 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo
160160
return true
161161
}
162162

163-
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
164-
// (1) Whether it has PodAntiAffinity
165-
// (2) Whether ANY AffinityTerm matches the incoming pod
166-
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *framework.PodInfo, node *v1.Node) topologyToMatchedTermCount {
167-
topologyMap := make(topologyToMatchedTermCount)
168-
for _, term := range existingPod.RequiredAntiAffinityTerms {
169-
if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, term.Namespaces, term.Selector) {
170-
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
171-
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
172-
topologyMap[pair]++
173-
}
174-
}
175-
}
176-
return topologyMap
177-
}
178-
179163
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
180164
// (1) Whether it has PodAntiAffinity
181165
// (2) Whether any AffinityTerm matches the incoming pod
182166
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount {
183-
var lock sync.Mutex
184-
topologyMap := make(topologyToMatchedTermCount)
185-
186-
appendResult := func(toAppend topologyToMatchedTermCount) {
187-
lock.Lock()
188-
defer lock.Unlock()
189-
topologyMap.append(toAppend)
190-
}
191-
167+
topoMaps := make([]topologyToMatchedTermCount, len(allNodes))
168+
index := int32(-1)
192169
processNode := func(i int) {
193170
nodeInfo := allNodes[i]
194171
node := nodeInfo.Node()
195172
if node == nil {
196173
klog.Error("node not found")
197174
return
198175
}
176+
topoMap := make(topologyToMatchedTermCount)
199177
for _, existingPod := range nodeInfo.PodsWithAffinity {
200-
existingPodTopologyMaps := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node)
201-
if len(existingPodTopologyMaps) != 0 {
202-
appendResult(existingPodTopologyMaps)
203-
}
178+
topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1)
179+
}
180+
if len(topoMap) != 0 {
181+
topoMaps[atomic.AddInt32(&index, 1)] = topoMap
204182
}
205183
}
206184
parallelize.Until(context.Background(), len(allNodes), processNode)
207185

208-
return topologyMap
186+
result := make(topologyToMatchedTermCount)
187+
for i := 0; i <= int(index); i++ {
188+
result.append(topoMaps[i])
189+
}
190+
191+
return result
209192
}
210193

211194
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
212195
// It returns a topologyToMatchedTermCount that are checked later by the affinity
213196
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
214197
// need to check all the pods in the cluster.
215198
func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
216-
topologyPairsAffinityPodsMap := make(topologyToMatchedTermCount)
217-
topologyToMatchedExistingAntiAffinityTerms := make(topologyToMatchedTermCount)
199+
affinityCounts := make(topologyToMatchedTermCount)
200+
antiAffinityCounts := make(topologyToMatchedTermCount)
218201
if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 {
219-
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms
220-
}
221-
222-
var lock sync.Mutex
223-
appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap topologyToMatchedTermCount) {
224-
lock.Lock()
225-
defer lock.Unlock()
226-
if len(nodeTopologyPairsAffinityPodsMap) > 0 {
227-
topologyPairsAffinityPodsMap.append(nodeTopologyPairsAffinityPodsMap)
228-
}
229-
if len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {
230-
topologyToMatchedExistingAntiAffinityTerms.append(nodeTopologyPairsAntiAffinityPodsMap)
231-
}
202+
return affinityCounts, antiAffinityCounts
232203
}
233204

205+
affinityCountsList := make([]topologyToMatchedTermCount, len(allNodes))
206+
antiAffinityCountsList := make([]topologyToMatchedTermCount, len(allNodes))
207+
index := int32(-1)
234208
processNode := func(i int) {
235209
nodeInfo := allNodes[i]
236210
node := nodeInfo.Node()
237211
if node == nil {
238212
klog.Error("node not found")
239213
return
240214
}
241-
nodeTopologyPairsAffinityPodsMap := make(topologyToMatchedTermCount)
242-
nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount)
215+
affinity := make(topologyToMatchedTermCount)
216+
antiAffinity := make(topologyToMatchedTermCount)
243217
for _, existingPod := range nodeInfo.Pods {
244218
// Check affinity terms.
245-
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1)
219+
affinity.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1)
246220

247221
// Check anti-affinity terms.
248-
nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1)
222+
antiAffinity.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1)
249223
}
250224

251-
if len(nodeTopologyPairsAffinityPodsMap) > 0 || len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {
252-
appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap)
225+
if len(affinity) > 0 || len(antiAffinity) > 0 {
226+
k := atomic.AddInt32(&index, 1)
227+
affinityCountsList[k] = affinity
228+
antiAffinityCountsList[k] = antiAffinity
253229
}
254230
}
255231
parallelize.Until(context.Background(), len(allNodes), processNode)
256232

257-
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms
233+
for i := 0; i <= int(index); i++ {
234+
affinityCounts.append(affinityCountsList[i])
235+
antiAffinityCounts.append(antiAffinityCountsList[i])
236+
}
237+
238+
return affinityCounts, antiAffinityCounts
258239
}
259240

260241
// PreFilter invoked at the prefilter extension point.

pkg/scheduler/framework/plugins/interpodaffinity/plugin.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package interpodaffinity
1818

1919
import (
2020
"fmt"
21-
"sync"
2221

2322
"k8s.io/apimachinery/pkg/runtime"
2423
"k8s.io/apimachinery/pkg/util/validation/field"
@@ -45,7 +44,6 @@ var _ framework.ScorePlugin = &InterPodAffinity{}
4544
type InterPodAffinity struct {
4645
args config.InterPodAffinityArgs
4746
sharedLister framework.SharedLister
48-
sync.Mutex
4947
}
5048

5149
// Name returns name of the plugin. It is used in logs, etc.

pkg/scheduler/framework/plugins/interpodaffinity/scoring.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package interpodaffinity
1919
import (
2020
"context"
2121
"fmt"
22+
"sync/atomic"
2223

2324
v1 "k8s.io/api/core/v1"
2425
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@@ -164,6 +165,8 @@ func (pl *InterPodAffinity) PreScore(
164165
podInfo: framework.NewPodInfo(pod),
165166
}
166167

168+
topoScores := make([]scoreMap, len(allNodes))
169+
index := int32(-1)
167170
processNode := func(i int) {
168171
nodeInfo := allNodes[i]
169172
if nodeInfo.Node() == nil {
@@ -182,13 +185,15 @@ func (pl *InterPodAffinity) PreScore(
182185
pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore)
183186
}
184187
if len(topoScore) > 0 {
185-
pl.Lock()
186-
state.topologyScore.append(topoScore)
187-
pl.Unlock()
188+
topoScores[atomic.AddInt32(&index, 1)] = topoScore
188189
}
189190
}
190191
parallelize.Until(context.Background(), len(allNodes), processNode)
191192

193+
for i := 0; i <= int(index); i++ {
194+
state.topologyScore.append(topoScores[i])
195+
}
196+
192197
cycleState.Write(preScoreStateKey, state)
193198
return nil
194199
}

0 commit comments

Comments
 (0)