Skip to content

Commit ac32644

Browse files
authored
Merge pull request kubernetes#87759 from klueska/upstream-move-cpu-allocation-to-pod-admit
Guarantee aligned resources across containers
2 parents d7e7136 + 2327934 commit ac32644

19 files changed

+279
-154
lines changed

pkg/kubelet/cm/container_manager.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
internalapi "k8s.io/cri-api/pkg/apis"
2626
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
2727
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
28-
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
2928
"k8s.io/kubernetes/pkg/kubelet/config"
3029
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
3130
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@@ -111,8 +110,8 @@ type ContainerManager interface {
111110
// due to node recreation.
112111
ShouldResetExtendedResourceCapacity() bool
113112

114-
// GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission
115-
GetTopologyPodAdmitHandler() topologymanager.Manager
113+
// GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources.
114+
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
116115

117116
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
118117
UpdateAllocatedDevices()

pkg/kubelet/cm/container_manager_linux.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -672,11 +672,51 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
672672
}
673673

674674
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
675-
return cm.deviceManager.Allocate(node, attrs)
675+
return cm.deviceManager.UpdatePluginResources(node, attrs)
676676
}
677677

678-
func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
679-
return cm.topologyManager
678+
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
679+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
680+
return cm.topologyManager
681+
}
682+
// TODO: we need to think about a better way to do this. This will work for
683+
// now so long as we have only the cpuManager and deviceManager relying on
684+
// allocations here. However, going forward it is not generalized enough to
685+
// work as we add more and more hint providers that the TopologyManager
686+
// needs to call Allocate() on (that may not be directly intstantiated
687+
// inside this component).
688+
return &resourceAllocator{cm.cpuManager, cm.deviceManager}
689+
}
690+
691+
type resourceAllocator struct {
692+
cpuManager cpumanager.Manager
693+
deviceManager devicemanager.Manager
694+
}
695+
696+
func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
697+
pod := attrs.Pod
698+
699+
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
700+
err := m.deviceManager.Allocate(pod, &container)
701+
if err != nil {
702+
return lifecycle.PodAdmitResult{
703+
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
704+
Reason: "UnexpectedAdmissionError",
705+
Admit: false,
706+
}
707+
}
708+
709+
err = m.cpuManager.Allocate(pod, &container)
710+
if err != nil {
711+
return lifecycle.PodAdmitResult{
712+
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
713+
Reason: "UnexpectedAdmissionError",
714+
Admit: false,
715+
}
716+
}
717+
}
718+
719+
return lifecycle.PodAdmitResult{Admit: true}
680720
}
681721

682722
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {

pkg/kubelet/cm/container_manager_stub.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
117117
return cm.shouldResetExtendedResourceCapacity
118118
}
119119

120-
func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Manager {
121-
return nil
120+
func (cm *containerManagerStub) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
121+
return topologymanager.NewFakeManager()
122122
}
123123

124124
func (cm *containerManagerStub) UpdateAllocatedDevices() {

pkg/kubelet/cm/container_manager_windows.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
177177
return false
178178
}
179179

180-
func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
180+
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
181181
return nil
182182
}
183183

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ type Manager interface {
5555
// Start is called during Kubelet initialization.
5656
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
5757

58+
// Called to trigger the allocation of CPUs to a container. This must be
59+
// called at some point prior to the AddContainer() call for a container,
60+
// e.g. at pod admission time.
61+
Allocate(pod *v1.Pod, container *v1.Container) error
62+
5863
// AddContainer is called between container create and container start
5964
// so that initial CPU affinity settings can be written through to the
6065
// container runtime before the first process begins to execute.
@@ -206,46 +211,41 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
206211
return nil
207212
}
208213

209-
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
214+
func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
210215
m.Lock()
211-
// Proactively remove CPUs from init containers that have already run.
212-
// They are guaranteed to have run to completion before any other
213-
// container is run.
214-
for _, initContainer := range p.Spec.InitContainers {
215-
if c.Name != initContainer.Name {
216-
err := m.policyRemoveContainerByRef(string(p.UID), initContainer.Name)
217-
if err != nil {
218-
klog.Warningf("[cpumanager] unable to remove init container (pod: %s, container: %s, error: %v)", string(p.UID), initContainer.Name, err)
219-
}
220-
}
221-
}
216+
defer m.Unlock()
222217

223218
// Call down into the policy to assign this container CPUs if required.
224-
err := m.policyAddContainer(p, c, containerID)
219+
err := m.policy.Allocate(m.state, p, c)
225220
if err != nil {
226-
klog.Errorf("[cpumanager] AddContainer error: %v", err)
227-
m.Unlock()
221+
klog.Errorf("[cpumanager] Allocate error: %v", err)
228222
return err
229223
}
230224

231-
// Get the CPUs just assigned to the container (or fall back to the default
232-
// CPUSet if none were assigned).
225+
return nil
226+
}
227+
228+
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
229+
m.Lock()
230+
// Get the CPUs assigned to the container during Allocate()
231+
// (or fall back to the default CPUSet if none were assigned).
233232
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
234233
m.Unlock()
235234

236235
if !cpus.IsEmpty() {
237-
err = m.updateContainerCPUSet(containerID, cpus)
236+
err := m.updateContainerCPUSet(containerID, cpus)
238237
if err != nil {
239238
klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
240239
m.Lock()
241-
err := m.policyRemoveContainerByID(containerID)
240+
err := m.policyRemoveContainerByRef(string(p.UID), c.Name)
242241
if err != nil {
243242
klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
244243
}
245244
m.Unlock()
246245
}
247246
return err
248247
}
248+
249249
klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
250250
return nil
251251
}
@@ -263,14 +263,6 @@ func (m *manager) RemoveContainer(containerID string) error {
263263
return nil
264264
}
265265

266-
func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
267-
err := m.policy.AddContainer(m.state, p, c)
268-
if err == nil {
269-
m.containerMap.Add(string(p.UID), c.Name, containerID)
270-
}
271-
return err
272-
}
273-
274266
func (m *manager) policyRemoveContainerByID(containerID string) error {
275267
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
276268
if err != nil {

pkg/kubelet/cm/cpumanager/cpu_manager_test.go

Lines changed: 64 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (p *mockPolicy) Start(s state.State) error {
104104
return p.err
105105
}
106106

107-
func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
107+
func (p *mockPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
108108
return p.err
109109
}
110110

@@ -223,34 +223,38 @@ func TestCPUManagerAdd(t *testing.T) {
223223
cpuset.NewCPUSet(),
224224
topologymanager.NewFakeManager())
225225
testCases := []struct {
226-
description string
227-
updateErr error
228-
policy Policy
229-
expCPUSet cpuset.CPUSet
230-
expErr error
226+
description string
227+
updateErr error
228+
policy Policy
229+
expCPUSet cpuset.CPUSet
230+
expAllocateErr error
231+
expAddContainerErr error
231232
}{
232233
{
233-
description: "cpu manager add - no error",
234-
updateErr: nil,
235-
policy: testPolicy,
236-
expCPUSet: cpuset.NewCPUSet(3, 4),
237-
expErr: nil,
234+
description: "cpu manager add - no error",
235+
updateErr: nil,
236+
policy: testPolicy,
237+
expCPUSet: cpuset.NewCPUSet(3, 4),
238+
expAllocateErr: nil,
239+
expAddContainerErr: nil,
238240
},
239241
{
240242
description: "cpu manager add - policy add container error",
241243
updateErr: nil,
242244
policy: &mockPolicy{
243245
err: fmt.Errorf("fake reg error"),
244246
},
245-
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
246-
expErr: fmt.Errorf("fake reg error"),
247+
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
248+
expAllocateErr: fmt.Errorf("fake reg error"),
249+
expAddContainerErr: nil,
247250
},
248251
{
249-
description: "cpu manager add - container update error",
250-
updateErr: fmt.Errorf("fake update error"),
251-
policy: testPolicy,
252-
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
253-
expErr: fmt.Errorf("fake update error"),
252+
description: "cpu manager add - container update error",
253+
updateErr: fmt.Errorf("fake update error"),
254+
policy: testPolicy,
255+
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
256+
expAllocateErr: nil,
257+
expAddContainerErr: fmt.Errorf("fake update error"),
254258
},
255259
}
256260

@@ -271,10 +275,16 @@ func TestCPUManagerAdd(t *testing.T) {
271275

272276
pod := makePod("fakePod", "fakeContainer", "2", "2")
273277
container := &pod.Spec.Containers[0]
274-
err := mgr.AddContainer(pod, container, "fakeID")
275-
if !reflect.DeepEqual(err, testCase.expErr) {
278+
err := mgr.Allocate(pod, container)
279+
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
280+
t.Errorf("CPU Manager Allocate() error (%v). expected error: %v but got: %v",
281+
testCase.description, testCase.expAllocateErr, err)
282+
}
283+
284+
err = mgr.AddContainer(pod, container, "fakeID")
285+
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
276286
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
277-
testCase.description, testCase.expErr, err)
287+
testCase.description, testCase.expAddContainerErr, err)
278288
}
279289
if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) {
280290
t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v",
@@ -494,7 +504,12 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
494504
testCase.expCSets...)
495505

496506
for i := range containers {
497-
err := mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
507+
err := mgr.Allocate(testCase.pod, &containers[i])
508+
if err != nil {
509+
t.Errorf("StaticPolicy Allocate() error (%v). unexpected error for container id: %v: %v",
510+
testCase.description, containerIDs[i], err)
511+
}
512+
err = mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
498513
if err != nil {
499514
t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v",
500515
testCase.description, containerIDs[i], err)
@@ -970,25 +985,28 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
970985
cpuset.NewCPUSet(0),
971986
topologymanager.NewFakeManager())
972987
testCases := []struct {
973-
description string
974-
updateErr error
975-
policy Policy
976-
expCPUSet cpuset.CPUSet
977-
expErr error
988+
description string
989+
updateErr error
990+
policy Policy
991+
expCPUSet cpuset.CPUSet
992+
expAllocateErr error
993+
expAddContainerErr error
978994
}{
979995
{
980-
description: "cpu manager add - no error",
981-
updateErr: nil,
982-
policy: testPolicy,
983-
expCPUSet: cpuset.NewCPUSet(0, 3),
984-
expErr: nil,
996+
description: "cpu manager add - no error",
997+
updateErr: nil,
998+
policy: testPolicy,
999+
expCPUSet: cpuset.NewCPUSet(0, 3),
1000+
expAllocateErr: nil,
1001+
expAddContainerErr: nil,
9851002
},
9861003
{
987-
description: "cpu manager add - container update error",
988-
updateErr: fmt.Errorf("fake update error"),
989-
policy: testPolicy,
990-
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
991-
expErr: fmt.Errorf("fake update error"),
1004+
description: "cpu manager add - container update error",
1005+
updateErr: fmt.Errorf("fake update error"),
1006+
policy: testPolicy,
1007+
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
1008+
expAllocateErr: nil,
1009+
expAddContainerErr: fmt.Errorf("fake update error"),
9921010
},
9931011
}
9941012

@@ -1009,10 +1027,16 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
10091027

10101028
pod := makePod("fakePod", "fakeContainer", "2", "2")
10111029
container := &pod.Spec.Containers[0]
1012-
err := mgr.AddContainer(pod, container, "fakeID")
1013-
if !reflect.DeepEqual(err, testCase.expErr) {
1030+
err := mgr.Allocate(pod, container)
1031+
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
1032+
t.Errorf("CPU Manager Allocate() error (%v). expected error: %v but got: %v",
1033+
testCase.description, testCase.expAllocateErr, err)
1034+
}
1035+
1036+
err = mgr.AddContainer(pod, container, "fakeID")
1037+
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
10141038
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
1015-
testCase.description, testCase.expErr, err)
1039+
testCase.description, testCase.expAddContainerErr, err)
10161040
}
10171041
if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) {
10181042
t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v",

pkg/kubelet/cm/cpumanager/fake_cpu_manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ func (m *fakeManager) Policy() Policy {
4040
return NewNonePolicy()
4141
}
4242

43+
func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
44+
klog.Infof("[fake cpumanager] Allocate (pod: %s, container: %s", pod.Name, container.Name)
45+
return nil
46+
}
47+
4348
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
4449
klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
4550
return nil

pkg/kubelet/cm/cpumanager/policy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626
type Policy interface {
2727
Name() string
2828
Start(s state.State) error
29-
// AddContainer call is idempotent
30-
AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error
29+
// Allocate call is idempotent
30+
Allocate(s state.State, pod *v1.Pod, container *v1.Container) error
3131
// RemoveContainer call is idempotent
3232
RemoveContainer(s state.State, podUID string, containerName string) error
3333
// GetTopologyHints implements the topologymanager.HintProvider Interface

pkg/kubelet/cm/cpumanager/policy_none.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (p *nonePolicy) Start(s state.State) error {
4444
return nil
4545
}
4646

47-
func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
47+
func (p *nonePolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
4848
return nil
4949
}
5050

pkg/kubelet/cm/cpumanager/policy_none_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestNonePolicyName(t *testing.T) {
3333
}
3434
}
3535

36-
func TestNonePolicyAdd(t *testing.T) {
36+
func TestNonePolicyAllocate(t *testing.T) {
3737
policy := &nonePolicy{}
3838

3939
st := &mockState{
@@ -44,9 +44,9 @@ func TestNonePolicyAdd(t *testing.T) {
4444
testPod := makePod("fakePod", "fakeContainer", "1000m", "1000m")
4545

4646
container := &testPod.Spec.Containers[0]
47-
err := policy.AddContainer(st, testPod, container)
47+
err := policy.Allocate(st, testPod, container)
4848
if err != nil {
49-
t.Errorf("NonePolicy AddContainer() error. expected no error but got: %v", err)
49+
t.Errorf("NonePolicy Allocate() error. expected no error but got: %v", err)
5050
}
5151
}
5252

0 commit comments

Comments
 (0)