Skip to content

Commit 6c85fd4

Browse files
committed
KEP-4176: Add static policy option to distribute cpus across cores
1 parent 535e833 commit 6c85fd4

File tree

5 files changed

+329
-30
lines changed

5 files changed

+329
-30
lines changed

pkg/kubelet/cm/cpumanager/cpu_assignment.go

Lines changed: 96 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,58 @@ func (s *socketsFirst) sortAvailableCores() []int {
196196
return result
197197
}
198198

199+
type availableCPUSorter interface {
200+
sort() []int
201+
}
202+
203+
type sortCPUsPacked struct{ acc *cpuAccumulator }
204+
type sortCPUsSpread struct{ acc *cpuAccumulator }
205+
206+
var _ availableCPUSorter = (*sortCPUsPacked)(nil)
207+
var _ availableCPUSorter = (*sortCPUsSpread)(nil)
208+
209+
func (s sortCPUsPacked) sort() []int {
210+
return s.acc.sortAvailableCPUsPacked()
211+
}
212+
213+
func (s sortCPUsSpread) sort() []int {
214+
return s.acc.sortAvailableCPUsSpread()
215+
}
216+
217+
// CPUSortingStrategy describes the CPU sorting solution within the socket scope.
218+
// Using topoDualSocketHT (12 CPUs, 2 sockets, 6 cores) as an example:
219+
//
220+
// CPUDetails: map[int]topology.CPUInfo{
221+
// 0: {CoreID: 0, SocketID: 0, NUMANodeID: 0},
222+
// 1: {CoreID: 1, SocketID: 1, NUMANodeID: 1},
223+
// 2: {CoreID: 2, SocketID: 0, NUMANodeID: 0},
224+
// 3: {CoreID: 3, SocketID: 1, NUMANodeID: 1},
225+
// 4: {CoreID: 4, SocketID: 0, NUMANodeID: 0},
226+
// 5: {CoreID: 5, SocketID: 1, NUMANodeID: 1},
227+
// 6: {CoreID: 0, SocketID: 0, NUMANodeID: 0},
228+
// 7: {CoreID: 1, SocketID: 1, NUMANodeID: 1},
229+
// 8: {CoreID: 2, SocketID: 0, NUMANodeID: 0},
230+
// 9: {CoreID: 3, SocketID: 1, NUMANodeID: 1},
231+
// 10: {CoreID: 4, SocketID: 0, NUMANodeID: 0},
232+
// 11: {CoreID: 5, SocketID: 1, NUMANodeID: 1},
233+
// },
234+
//
235+
// - CPUSortingOptionPacked sorts CPUs in a packed manner, where CPUs are grouped by core
236+
// before moving to the next core, resulting in packed cores, like:
237+
// 0, 2, 4, 6, 8, 10, 1, 3, 5, 7, 9, 11
238+
// - CPUSortingOptionSpread sorts CPUs in a spread manner, where CPUs are spread across cores
239+
// before moving to the next CPU, resulting in spread-out cores, like:
240+
// 0, 6, 2, 8, 4, 10, 1, 7, 3, 9, 5, 11
241+
//
242+
// By default, CPUSortingOptionPacked will be used, and CPUSortingOptionSpread will only be activated
243+
// when the user specifies the `DistributeCPUsAcrossCoresOption` static policy option.
244+
type CPUSortingStrategy string
245+
246+
const (
247+
CPUSortingStrategyPacked CPUSortingStrategy = "packed"
248+
CPUSortingStrategySpread CPUSortingStrategy = "spread"
249+
)
250+
199251
type cpuAccumulator struct {
200252
// `topo` describes the layout of CPUs (i.e. hyper-threads if hyperthreading is on) between
201253
// cores (i.e. physical CPUs if hyper-threading is on), NUMA nodes, and sockets on the K8s
@@ -223,9 +275,15 @@ type cpuAccumulator struct {
223275
result cpuset.CPUSet
224276

225277
numaOrSocketsFirst numaOrSocketsFirstFuncs
278+
279+
// availableCPUSorter is used to control the cpu sorting result.
280+
// The sequence of returned CPU IDs depends on the policy.
281+
// By default, cpus is sorted by sortAvailableCPUsPacked()
282+
// If packed is false, cpu is sorted by sortAvailableCPUsSpread()
283+
availableCPUSorter availableCPUSorter
226284
}
227285

228-
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator {
286+
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) *cpuAccumulator {
229287
acc := &cpuAccumulator{
230288
topo: topo,
231289
details: topo.CPUDetails.KeepOnly(availableCPUs),
@@ -239,6 +297,12 @@ func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet,
239297
acc.numaOrSocketsFirst = &socketsFirst{acc}
240298
}
241299

300+
if cpuSortingStrategy == CPUSortingStrategyPacked {
301+
acc.availableCPUSorter = &sortCPUsPacked{acc}
302+
} else {
303+
acc.availableCPUSorter = &sortCPUsSpread{acc}
304+
}
305+
242306
return acc
243307
}
244308

@@ -293,9 +357,9 @@ func (a *cpuAccumulator) freeCores() []int {
293357
return free
294358
}
295359

296-
// Returns free CPU IDs as a slice sorted by sortAvailableCPUs().
360+
// Returns free CPU IDs as a slice sorted by sortAvailableCPUsPacked().
297361
func (a *cpuAccumulator) freeCPUs() []int {
298-
return a.sortAvailableCPUs()
362+
return a.availableCPUSorter.sort()
299363
}
300364

301365
// Sorts the provided list of NUMA nodes/sockets/cores/cpus referenced in 'ids'
@@ -404,7 +468,7 @@ func (a *cpuAccumulator) sortAvailableCores() []int {
404468
// same way as described in the previous paragraph, except that the priority of NUMA nodes and
405469
// sockets is inverted (e.g. first sort the CPUs by number of free CPUs in their NUMA nodes, then,
406470
// for each NUMA node, sort the CPUs by number of free CPUs in their sockets, etc...).
407-
func (a *cpuAccumulator) sortAvailableCPUs() []int {
471+
func (a *cpuAccumulator) sortAvailableCPUsPacked() []int {
408472
var result []int
409473
for _, core := range a.sortAvailableCores() {
410474
cpus := a.details.CPUsInCores(core).UnsortedList()
@@ -414,6 +478,19 @@ func (a *cpuAccumulator) sortAvailableCPUs() []int {
414478
return result
415479
}
416480

481+
// Sort all available CPUs:
482+
// - First by core using sortAvailableSockets().
483+
// - Then within each socket, sort cpus directly using the sort() algorithm defined above.
484+
func (a *cpuAccumulator) sortAvailableCPUsSpread() []int {
485+
var result []int
486+
for _, socket := range a.sortAvailableSockets() {
487+
cpus := a.details.CPUsInSockets(socket).UnsortedList()
488+
sort.Ints(cpus)
489+
result = append(result, cpus...)
490+
}
491+
return result
492+
}
493+
417494
func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
418495
a.result = a.result.Union(cpus)
419496
a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
@@ -454,7 +531,7 @@ func (a *cpuAccumulator) takeFullCores() {
454531
}
455532

456533
func (a *cpuAccumulator) takeRemainingCPUs() {
457-
for _, cpu := range a.sortAvailableCPUs() {
534+
for _, cpu := range a.availableCPUSorter.sort() {
458535
klog.V(4).InfoS("takeRemainingCPUs: claiming CPU", "cpu", cpu)
459536
a.take(cpuset.New(cpu))
460537
if a.isSatisfied() {
@@ -581,8 +658,8 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
581658
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
582659
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
583660
// the least amount of free CPUs to the one with the highest amount of free CPUs.
584-
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
585-
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
661+
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
662+
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
586663
if acc.isSatisfied() {
587664
return acc.result, nil
588665
}
@@ -606,9 +683,12 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
606683

607684
// 2. Acquire whole cores, if available and the container requires at least
608685
// a core's-worth of CPUs.
609-
acc.takeFullCores()
610-
if acc.isSatisfied() {
611-
return acc.result, nil
686+
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
687+
if cpuSortingStrategy != CPUSortingStrategySpread {
688+
acc.takeFullCores()
689+
if acc.isSatisfied() {
690+
return acc.result, nil
691+
}
612692
}
613693

614694
// 3. Acquire single threads, preferring to fill partially-allocated cores
@@ -685,16 +765,16 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
685765
// of size 'cpuGroupSize' according to the algorithm described above. This is
686766
// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from
687767
// a single core are allocated together.
688-
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int) (cpuset.CPUSet, error) {
768+
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
689769
// If the number of CPUs requested cannot be handed out in chunks of
690770
// 'cpuGroupSize', then we just call out the packing algorithm since we
691771
// can't distribute CPUs in this chunk size.
692772
if (numCPUs % cpuGroupSize) != 0 {
693-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
773+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
694774
}
695775

696776
// Otherwise build an accumulator to start allocating CPUs from.
697-
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
777+
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
698778
if acc.isSatisfied() {
699779
return acc.result, nil
700780
}
@@ -873,7 +953,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
873953
// size 'cpuGroupSize' from 'bestCombo'.
874954
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
875955
for _, numa := range bestCombo {
876-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution)
956+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy)
877957
acc.take(cpus)
878958
}
879959

@@ -888,7 +968,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
888968
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
889969
continue
890970
}
891-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize)
971+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy)
892972
acc.take(cpus)
893973
remainder -= cpuGroupSize
894974
}
@@ -912,5 +992,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
912992

913993
// If we never found a combination of NUMA nodes that we could properly
914994
// distribute CPUs across, fall back to the packing algorithm.
915-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
995+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
916996
}

0 commit comments

Comments
 (0)