Skip to content

Commit 7d6f8d8

Browse files
authored
Merge pull request kubernetes#80570 from klueska/upstream-add-topology-manager-to-devicemanager
Add support for Topology Manager to Device Manager
2 parents 3ebe6a6 + eb0216e commit 7d6f8d8

File tree

15 files changed

+1017
-193
lines changed

15 files changed

+1017
-193
lines changed

pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go

Lines changed: 82 additions & 75 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/kubelet/apis/deviceplugin/v1beta1/api.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ message ListAndWatchResponse {
7474
}
7575

7676
message TopologyInfo {
77-
NUMANode node = 1;
77+
repeated NUMANode nodes = 1;
7878
}
7979

8080
message NUMANode {

pkg/kubelet/cm/container_manager_linux.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
304304

305305
klog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
306306
if devicePluginEnabled {
307-
cm.deviceManager, err = devicemanager.NewManagerImpl()
307+
cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager)
308+
cm.topologyManager.AddHintProvider(cm.deviceManager)
308309
} else {
309310
cm.deviceManager, err = devicemanager.NewManagerStub()
310311
}

pkg/kubelet/cm/cpumanager/cpu_assignment.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
5050

5151
// Returns true if the supplied socket is fully available in `topoDetails`.
5252
func (a *cpuAccumulator) isSocketFree(socketID int) bool {
53-
return a.details.CPUsInSocket(socketID).Size() == a.topo.CPUsPerSocket()
53+
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
5454
}
5555

5656
// Returns true if the supplied core is fully available in `topoDetails`.
5757
func (a *cpuAccumulator) isCoreFree(coreID int) bool {
58-
return a.details.CPUsInCore(coreID).Size() == a.topo.CPUsPerCore()
58+
return a.details.CPUsInCores(coreID).Size() == a.topo.CPUsPerCore()
5959
}
6060

6161
// Returns free socket IDs as a slice sorted by:
@@ -72,14 +72,14 @@ func (a *cpuAccumulator) freeCores() []int {
7272
socketIDs := a.details.Sockets().ToSliceNoSort()
7373
sort.Slice(socketIDs,
7474
func(i, j int) bool {
75-
iCores := a.details.CoresInSocket(socketIDs[i]).Filter(a.isCoreFree)
76-
jCores := a.details.CoresInSocket(socketIDs[j]).Filter(a.isCoreFree)
75+
iCores := a.details.CoresInSockets(socketIDs[i]).Filter(a.isCoreFree)
76+
jCores := a.details.CoresInSockets(socketIDs[j]).Filter(a.isCoreFree)
7777
return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j]
7878
})
7979

8080
coreIDs := []int{}
8181
for _, s := range socketIDs {
82-
coreIDs = append(coreIDs, a.details.CoresInSocket(s).Filter(a.isCoreFree).ToSlice()...)
82+
coreIDs = append(coreIDs, a.details.CoresInSockets(s).Filter(a.isCoreFree).ToSlice()...)
8383
}
8484
return coreIDs
8585
}
@@ -100,25 +100,25 @@ func (a *cpuAccumulator) freeCPUs() []int {
100100
iCore := cores[i]
101101
jCore := cores[j]
102102

103-
iCPUs := a.topo.CPUDetails.CPUsInCore(iCore).ToSlice()
104-
jCPUs := a.topo.CPUDetails.CPUsInCore(jCore).ToSlice()
103+
iCPUs := a.topo.CPUDetails.CPUsInCores(iCore).ToSlice()
104+
jCPUs := a.topo.CPUDetails.CPUsInCores(jCore).ToSlice()
105105

106106
iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID
107107
jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID
108108

109109
// Compute the number of CPUs in the result reside on the same socket
110110
// as each core.
111-
iSocketColoScore := a.topo.CPUDetails.CPUsInSocket(iSocket).Intersection(a.result).Size()
112-
jSocketColoScore := a.topo.CPUDetails.CPUsInSocket(jSocket).Intersection(a.result).Size()
111+
iSocketColoScore := a.topo.CPUDetails.CPUsInSockets(iSocket).Intersection(a.result).Size()
112+
jSocketColoScore := a.topo.CPUDetails.CPUsInSockets(jSocket).Intersection(a.result).Size()
113113

114114
// Compute the number of available CPUs available on the same socket
115115
// as each core.
116-
iSocketFreeScore := a.details.CPUsInSocket(iSocket).Size()
117-
jSocketFreeScore := a.details.CPUsInSocket(jSocket).Size()
116+
iSocketFreeScore := a.details.CPUsInSockets(iSocket).Size()
117+
jSocketFreeScore := a.details.CPUsInSockets(jSocket).Size()
118118

119119
// Compute the number of available CPUs on each core.
120-
iCoreFreeScore := a.details.CPUsInCore(iCore).Size()
121-
jCoreFreeScore := a.details.CPUsInCore(jCore).Size()
120+
iCoreFreeScore := a.details.CPUsInCores(iCore).Size()
121+
jCoreFreeScore := a.details.CPUsInCores(jCore).Size()
122122

123123
return iSocketColoScore > jSocketColoScore ||
124124
iSocketFreeScore < jSocketFreeScore ||
@@ -129,7 +129,7 @@ func (a *cpuAccumulator) freeCPUs() []int {
129129

130130
// For each core, append sorted CPU IDs to result.
131131
for _, core := range cores {
132-
result = append(result, a.details.CPUsInCore(core).ToSlice()...)
132+
result = append(result, a.details.CPUsInCores(core).ToSlice()...)
133133
}
134134
return result
135135
}
@@ -161,7 +161,7 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num
161161
if acc.needs(acc.topo.CPUsPerSocket()) {
162162
for _, s := range acc.freeSockets() {
163163
klog.V(4).Infof("[cpumanager] takeByTopology: claiming socket [%d]", s)
164-
acc.take(acc.details.CPUsInSocket(s))
164+
acc.take(acc.details.CPUsInSockets(s))
165165
if acc.isSatisfied() {
166166
return acc.result, nil
167167
}
@@ -176,7 +176,7 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num
176176
if acc.needs(acc.topo.CPUsPerCore()) {
177177
for _, c := range acc.freeCores() {
178178
klog.V(4).Infof("[cpumanager] takeByTopology: claiming core [%d]", c)
179-
acc.take(acc.details.CPUsInCore(c))
179+
acc.take(acc.details.CPUsInCores(c))
180180
if acc.isSatisfied() {
181181
return acc.result, nil
182182
}

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity soc
258258
if numaAffinity != nil {
259259
alignedCPUs := cpuset.NewCPUSet()
260260
for _, numaNodeID := range numaAffinity.GetSockets() {
261-
alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANode(numaNodeID)))
261+
alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
262262
}
263263

264264
numAlignedToAlloc := alignedCPUs.Size()

0 commit comments

Comments
 (0)