Skip to content

Commit f2dd248

Browse files
authored
Merge pull request kubernetes#73920 from nolancon/topology-manager-cpu-manager
Changes to make CPU Manager a Hint Provider for Topology Manager
2 parents 117e831 + b3f4bed commit f2dd248

File tree

11 files changed

+513
-16
lines changed

11 files changed

+513
-16
lines changed

pkg/kubelet/cm/container_manager_linux.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,11 +312,13 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
312312
machineInfo,
313313
cm.GetNodeAllocatableReservation(),
314314
nodeConfig.KubeletRootDir,
315+
cm.topologyManager,
315316
)
316317
if err != nil {
317318
klog.Errorf("failed to initialize cpu manager: %v", err)
318319
return nil, err
319320
}
321+
cm.topologyManager.AddHintProvider(cm.cpuManager)
320322
}
321323

322324
return cm, nil

pkg/kubelet/cm/cpumanager/BUILD

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
"policy.go",
1111
"policy_none.go",
1212
"policy_static.go",
13+
"topology_hints.go",
1314
],
1415
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager",
1516
visibility = ["//visibility:public"],
@@ -18,6 +19,8 @@ go_library(
1819
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
1920
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
2021
"//pkg/kubelet/cm/cpuset:go_default_library",
22+
"//pkg/kubelet/cm/topologymanager:go_default_library",
23+
"//pkg/kubelet/cm/topologymanager/socketmask:go_default_library",
2124
"//pkg/kubelet/container:go_default_library",
2225
"//pkg/kubelet/status:go_default_library",
2326
"//staging/src/k8s.io/api/core/v1:go_default_library",
@@ -37,12 +40,15 @@ go_test(
3740
"policy_none_test.go",
3841
"policy_static_test.go",
3942
"policy_test.go",
43+
"topology_hints_test.go",
4044
],
4145
embed = [":go_default_library"],
4246
deps = [
4347
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
4448
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
4549
"//pkg/kubelet/cm/cpuset:go_default_library",
50+
"//pkg/kubelet/cm/topologymanager:go_default_library",
51+
"//pkg/kubelet/cm/topologymanager/socketmask:go_default_library",
4652
"//staging/src/k8s.io/api/core/v1:go_default_library",
4753
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
4854
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
3232
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
3333
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
34+
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3435
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
3536
"k8s.io/kubernetes/pkg/kubelet/status"
3637
)
@@ -64,6 +65,10 @@ type Manager interface {
6465

6566
// State returns a read-only interface to the internal CPU manager state.
6667
State() state.Reader
68+
69+
// GetTopologyHints implements the Topology Manager Interface and is
70+
// consulted to make Topology aware resource alignments
71+
GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint
6772
}
6873

6974
type manager struct {
@@ -97,7 +102,7 @@ type manager struct {
97102
var _ Manager = &manager{}
98103

99104
// NewManager creates new cpu manager based on provided policy
100-
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string) (Manager, error) {
105+
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
101106
var policy Policy
102107

103108
switch policyName(cpuPolicyName) {
@@ -129,7 +134,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
129134
// exclusively allocated.
130135
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
131136
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
132-
policy = NewStaticPolicy(topo, numReservedCPUs)
137+
policy = NewStaticPolicy(topo, numReservedCPUs, affinity)
133138

134139
default:
135140
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)

pkg/kubelet/cm/cpumanager/cpu_manager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
3636
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
3737
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
38+
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3839
)
3940

4041
type mockState struct {
@@ -195,7 +196,7 @@ func TestCPUManagerAdd(t *testing.T) {
195196
2: {CoreID: 2, SocketID: 0},
196197
3: {CoreID: 3, SocketID: 0},
197198
},
198-
}, 0)
199+
}, 0, topologymanager.NewFakeManager())
199200
testCases := []struct {
200201
description string
201202
updateErr error
@@ -342,7 +343,7 @@ func TestCPUManagerGenerate(t *testing.T) {
342343
}
343344
defer os.RemoveAll(sDir)
344345

345-
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir)
346+
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
346347
if testCase.expectedError != nil {
347348
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
348349
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())

pkg/kubelet/cm/cpumanager/fake_cpu_manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"k8s.io/api/core/v1"
2121
"k8s.io/klog"
2222
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
23+
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
2324
"k8s.io/kubernetes/pkg/kubelet/status"
2425
)
2526

@@ -46,6 +47,11 @@ func (m *fakeManager) RemoveContainer(containerID string) error {
4647
return nil
4748
}
4849

50+
func (m *fakeManager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint {
51+
klog.Infof("[fake cpumanager] Get Topology Hints")
52+
return []topologymanager.TopologyHint{}
53+
}
54+
4955
func (m *fakeManager) State() state.Reader {
5056
return m.state
5157
}

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
2626
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
2727
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
28+
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
29+
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask"
2830
)
2931

3032
// PolicyStatic is the name of the static policy
@@ -77,6 +79,8 @@ type staticPolicy struct {
7779
// (pod, container) -> containerID
7880
// for all containers a pod
7981
containerMap containerMap
82+
// topology manager reference to get container Topology affinity
83+
affinity topologymanager.Store
8084
}
8185

8286
// Ensure staticPolicy implements Policy interface
@@ -85,7 +89,7 @@ var _ Policy = &staticPolicy{}
8589
// NewStaticPolicy returns a CPU manager policy that does not change CPU
8690
// assignments for exclusively pinned guaranteed containers after the main
8791
// container process starts.
88-
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy {
92+
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, affinity topologymanager.Store) Policy {
8993
allCPUs := topology.CPUDetails.CPUs()
9094
// takeByTopology allocates CPUs associated with low-numbered cores from
9195
// allCPUs.
@@ -104,6 +108,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy
104108
topology: topology,
105109
reserved: reserved,
106110
containerMap: newContainerMap(),
111+
affinity: affinity,
107112
}
108113
}
109114

@@ -186,7 +191,7 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co
186191
}
187192
}()
188193

189-
if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 {
194+
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
190195
klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
191196
// container belongs in an exclusively allocated pool
192197

@@ -211,7 +216,12 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co
211216
}
212217
}
213218

214-
cpuset, err := p.allocateCPUs(s, numCPUs)
219+
// Call Topology Manager to get the aligned socket affinity across all hint providers.
220+
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
221+
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)
222+
223+
// Allocate CPUs according to the socket affinity contained in the hint.
224+
cpuset, err := p.allocateCPUs(s, numCPUs, hint.SocketAffinity)
215225
if err != nil {
216226
klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err)
217227
return err
@@ -240,20 +250,45 @@ func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr
240250
return nil
241251
}
242252

243-
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, error) {
244-
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d)", numCPUs)
245-
result, err := takeByTopology(p.topology, p.assignableCPUs(s), numCPUs)
253+
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, socketmask socketmask.SocketMask) (cpuset.CPUSet, error) {
254+
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, socketmask)
255+
256+
// If there are aligned CPUs in the socketmask, attempt to take those first.
257+
result := cpuset.NewCPUSet()
258+
if socketmask != nil {
259+
alignedCPUs := cpuset.NewCPUSet()
260+
for _, socketID := range socketmask.GetSockets() {
261+
alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInSocket(socketID)))
262+
}
263+
264+
numAlignedToAlloc := alignedCPUs.Size()
265+
if numCPUs < numAlignedToAlloc {
266+
numAlignedToAlloc = numCPUs
267+
}
268+
269+
alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc)
270+
if err != nil {
271+
return cpuset.NewCPUSet(), err
272+
}
273+
274+
result = result.Union(alignedCPUs)
275+
}
276+
277+
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
278+
remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size())
246279
if err != nil {
247280
return cpuset.NewCPUSet(), err
248281
}
282+
result = result.Union(remainingCPUs)
283+
249284
// Remove allocated CPUs from the shared CPUSet.
250285
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
251286

252287
klog.Infof("[cpumanager] allocateCPUs: returning \"%v\"", result)
253288
return result, nil
254289
}
255290

256-
func guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
291+
func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
257292
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
258293
return 0
259294
}

pkg/kubelet/cm/cpumanager/policy_static_test.go

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
2626
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
2727
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
28+
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
29+
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask"
2830
)
2931

3032
type staticPolicyTest struct {
@@ -58,7 +60,7 @@ type staticPolicyMultiContainerTest struct {
5860
}
5961

6062
func TestStaticPolicyName(t *testing.T) {
61-
policy := NewStaticPolicy(topoSingleSocketHT, 1)
63+
policy := NewStaticPolicy(topoSingleSocketHT, 1, topologymanager.NewFakeManager())
6264

6365
policyName := policy.Name()
6466
if policyName != "static" {
@@ -135,7 +137,7 @@ func TestStaticPolicyStart(t *testing.T) {
135137
t.Error("expected panic doesn't occurred")
136138
}
137139
}()
138-
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs).(*staticPolicy)
140+
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()).(*staticPolicy)
139141
st := &mockState{
140142
assignments: testCase.stAssignments,
141143
defaultCPUSet: testCase.stDefaultCPUSet,
@@ -419,7 +421,7 @@ func TestStaticPolicyAdd(t *testing.T) {
419421
}
420422

421423
for _, testCase := range testCases {
422-
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
424+
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
423425

424426
st := &mockState{
425427
assignments: testCase.stAssignments,
@@ -632,7 +634,7 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) {
632634
}
633635

634636
for _, testCase := range testCases {
635-
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
637+
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
636638

637639
st := &mockState{
638640
assignments: testCase.stAssignments,
@@ -719,7 +721,7 @@ func TestStaticPolicyRemove(t *testing.T) {
719721
}
720722

721723
for _, testCase := range testCases {
722-
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
724+
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
723725

724726
st := &mockState{
725727
assignments: testCase.stAssignments,
@@ -739,3 +741,93 @@ func TestStaticPolicyRemove(t *testing.T) {
739741
}
740742
}
741743
}
744+
745+
func TestTopologyAwareAllocateCPUs(t *testing.T) {
746+
testCases := []struct {
747+
description string
748+
topo *topology.CPUTopology
749+
stAssignments state.ContainerCPUAssignments
750+
stDefaultCPUSet cpuset.CPUSet
751+
numRequested int
752+
socketMask socketmask.SocketMask
753+
expCSet cpuset.CPUSet
754+
}{
755+
{
756+
description: "Request 2 CPUs, No SocketMask",
757+
topo: topoDualSocketHT,
758+
stAssignments: state.ContainerCPUAssignments{},
759+
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
760+
numRequested: 2,
761+
socketMask: nil,
762+
expCSet: cpuset.NewCPUSet(0, 6),
763+
},
764+
{
765+
description: "Request 2 CPUs, SocketMask on Socket 0",
766+
topo: topoDualSocketHT,
767+
stAssignments: state.ContainerCPUAssignments{},
768+
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
769+
numRequested: 2,
770+
socketMask: func() socketmask.SocketMask {
771+
mask, _ := socketmask.NewSocketMask(0)
772+
return mask
773+
}(),
774+
expCSet: cpuset.NewCPUSet(0, 6),
775+
},
776+
{
777+
description: "Request 2 CPUs, SocketMask on Socket 1",
778+
topo: topoDualSocketHT,
779+
stAssignments: state.ContainerCPUAssignments{},
780+
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
781+
numRequested: 2,
782+
socketMask: func() socketmask.SocketMask {
783+
mask, _ := socketmask.NewSocketMask(1)
784+
return mask
785+
}(),
786+
expCSet: cpuset.NewCPUSet(1, 7),
787+
},
788+
{
789+
description: "Request 8 CPUs, SocketMask on Socket 0",
790+
topo: topoDualSocketHT,
791+
stAssignments: state.ContainerCPUAssignments{},
792+
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
793+
numRequested: 8,
794+
socketMask: func() socketmask.SocketMask {
795+
mask, _ := socketmask.NewSocketMask(0)
796+
return mask
797+
}(),
798+
expCSet: cpuset.NewCPUSet(0, 6, 2, 8, 4, 10, 1, 7),
799+
},
800+
{
801+
description: "Request 8 CPUs, SocketMask on Socket 1",
802+
topo: topoDualSocketHT,
803+
stAssignments: state.ContainerCPUAssignments{},
804+
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
805+
numRequested: 8,
806+
socketMask: func() socketmask.SocketMask {
807+
mask, _ := socketmask.NewSocketMask(1)
808+
return mask
809+
}(),
810+
expCSet: cpuset.NewCPUSet(1, 7, 3, 9, 5, 11, 0, 6),
811+
},
812+
}
813+
for _, tc := range testCases {
814+
policy := NewStaticPolicy(tc.topo, 0, topologymanager.NewFakeManager()).(*staticPolicy)
815+
st := &mockState{
816+
assignments: tc.stAssignments,
817+
defaultCPUSet: tc.stDefaultCPUSet,
818+
}
819+
policy.Start(st)
820+
821+
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask)
822+
if err != nil {
823+
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v",
824+
tc.description, tc.expCSet, err)
825+
continue
826+
}
827+
828+
if !reflect.DeepEqual(tc.expCSet, cset) {
829+
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v but got %v",
830+
tc.description, tc.expCSet, cset)
831+
}
832+
}
833+
}

0 commit comments

Comments
 (0)