Skip to content

Commit 762a85e

Browse files
authored
Merge pull request kubernetes#125923 from haircommander/cpuset-fix-restart
kubelet/cm: fix bug where kubelet restarts from missing cpuset cgroup
2 parents c45f3ab + b94c538 commit 762a85e

File tree

8 files changed

+126
-28
lines changed

8 files changed

+126
-28
lines changed

pkg/kubelet/cm/cgroup_manager_linux.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,9 @@ func (m *cgroupCommon) toResources(resourceConfig *ResourceConfig) *libcontainer
298298
if resourceConfig.PidsLimit != nil {
299299
resources.PidsLimit = *resourceConfig.PidsLimit
300300
}
301+
if !resourceConfig.CPUSet.IsEmpty() {
302+
resources.CpusetCpus = resourceConfig.CPUSet.String()
303+
}
301304

302305
m.maybeSetHugetlb(resourceConfig, resources)
303306

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ type Manager interface {
9393
// GetCPUAffinity returns cpuset which includes cpus from shared pools
9494
// as well as exclusively allocated cpus
9595
GetCPUAffinity(podUID, containerName string) cpuset.CPUSet
96+
97+
// GetAllCPUs returns all the CPUs known by cpumanager, as reported by the
98+
// hardware discovery. Maps to the CPU capacity.
99+
GetAllCPUs() cpuset.CPUSet
96100
}
97101

98102
type manager struct {
@@ -136,7 +140,11 @@ type manager struct {
136140
// stateFileDirectory holds the directory where the state file for checkpoints is held.
137141
stateFileDirectory string
138142

139-
// allocatableCPUs is the set of online CPUs as reported by the system
143+
// allCPUs is the set of online CPUs as reported by the system
144+
allCPUs cpuset.CPUSet
145+
146+
// allocatableCPUs is the set of online CPUs as reported by the system,
147+
// and available for allocation, minus the reserved set
140148
allocatableCPUs cpuset.CPUSet
141149

142150
// pendingAdmissionPod contain the pod during the admission phase
@@ -156,6 +164,11 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
156164
var policy Policy
157165
var err error
158166

167+
topo, err = topology.Discover(machineInfo)
168+
if err != nil {
169+
return nil, err
170+
}
171+
159172
switch policyName(cpuPolicyName) {
160173

161174
case PolicyNone:
@@ -165,10 +178,6 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
165178
}
166179

167180
case PolicyStatic:
168-
topo, err = topology.Discover(machineInfo)
169-
if err != nil {
170-
return nil, err
171-
}
172181
klog.InfoS("Detected CPU topology", "topology", topo)
173182

174183
reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
@@ -205,6 +214,7 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
205214
topology: topo,
206215
nodeAllocatableReservation: nodeAllocatableReservation,
207216
stateFileDirectory: stateFileDirectory,
217+
allCPUs: topo.CPUDetails.CPUs(),
208218
}
209219
manager.sourcesReady = &sourcesReadyStub{}
210220
return manager, nil
@@ -339,6 +349,10 @@ func (m *manager) GetAllocatableCPUs() cpuset.CPUSet {
339349
return m.allocatableCPUs.Clone()
340350
}
341351

352+
func (m *manager) GetAllCPUs() cpuset.CPUSet {
353+
return m.allCPUs.Clone()
354+
}
355+
342356
type reconciledContainer struct {
343357
podName string
344358
containerName string

pkg/kubelet/cm/cpumanager/cpu_manager_test.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -693,15 +693,8 @@ func TestCPUManagerGenerate(t *testing.T) {
693693
if rawMgr.policy.Name() != testCase.expectedPolicy {
694694
t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), testCase.expectedPolicy)
695695
}
696-
if rawMgr.policy.Name() == string(PolicyNone) {
697-
if rawMgr.topology != nil {
698-
t.Errorf("Expected topology to be nil for 'none' policy. Have: %q", rawMgr.topology)
699-
}
700-
}
701-
if rawMgr.policy.Name() != string(PolicyNone) {
702-
if rawMgr.topology == nil {
703-
t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology)
704-
}
696+
if rawMgr.topology == nil {
697+
t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology)
705698
}
706699
}
707700
})

pkg/kubelet/cm/cpumanager/fake_cpu_manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ func (m *fakeManager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet
8585
return cpuset.CPUSet{}
8686
}
8787

88+
func (m *fakeManager) GetAllCPUs() cpuset.CPUSet {
89+
klog.InfoS("GetAllCPUs")
90+
return cpuset.CPUSet{}
91+
}
92+
8893
// NewFakeManager creates empty/fake cpu manager
8994
func NewFakeManager() Manager {
9095
return &fakeManager{

pkg/kubelet/cm/node_container_manager_linux.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (cm *containerManagerImpl) createNodeAllocatableCgroups() error {
5353
cgroupConfig := &CgroupConfig{
5454
Name: cm.cgroupRoot,
5555
// The default limits for cpu shares can be very low which can lead to CPU starvation for pods.
56-
ResourceParameters: getCgroupConfig(nodeAllocatable, false),
56+
ResourceParameters: cm.getCgroupConfig(nodeAllocatable, false),
5757
}
5858
if cm.cgroupManager.Exists(cgroupConfig.Name) {
5959
return nil
@@ -81,7 +81,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
8181

8282
cgroupConfig := &CgroupConfig{
8383
Name: cm.cgroupRoot,
84-
ResourceParameters: getCgroupConfig(nodeAllocatable, false),
84+
ResourceParameters: cm.getCgroupConfig(nodeAllocatable, false),
8585
}
8686

8787
// Using ObjectReference for events as the node maybe not cached; refer to #42701 for detail.
@@ -110,7 +110,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
110110
// Now apply kube reserved and system reserved limits if required.
111111
if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedEnforcementKey) {
112112
klog.V(2).InfoS("Enforcing system reserved on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved)
113-
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved, false); err != nil {
113+
if err := cm.enforceExistingCgroup(nc.SystemReservedCgroupName, nc.SystemReserved, false); err != nil {
114114
message := fmt.Sprintf("Failed to enforce System Reserved Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err)
115115
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
116116
return errors.New(message)
@@ -119,7 +119,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
119119
}
120120
if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedEnforcementKey) {
121121
klog.V(2).InfoS("Enforcing kube reserved on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved)
122-
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved, false); err != nil {
122+
if err := cm.enforceExistingCgroup(nc.KubeReservedCgroupName, nc.KubeReserved, false); err != nil {
123123
message := fmt.Sprintf("Failed to enforce Kube Reserved Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err)
124124
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
125125
return errors.New(message)
@@ -129,7 +129,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
129129

130130
if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedCompressibleEnforcementKey) {
131131
klog.V(2).InfoS("Enforcing system reserved compressible on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved)
132-
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved, true); err != nil {
132+
if err := cm.enforceExistingCgroup(nc.SystemReservedCgroupName, nc.SystemReserved, true); err != nil {
133133
message := fmt.Sprintf("Failed to enforce System Reserved Compressible Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err)
134134
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
135135
return errors.New(message)
@@ -139,7 +139,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
139139

140140
if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedCompressibleEnforcementKey) {
141141
klog.V(2).InfoS("Enforcing kube reserved compressible on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved)
142-
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved, true); err != nil {
142+
if err := cm.enforceExistingCgroup(nc.KubeReservedCgroupName, nc.KubeReserved, true); err != nil {
143143
message := fmt.Sprintf("Failed to enforce Kube Reserved Compressible Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err)
144144
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
145145
return errors.New(message)
@@ -150,9 +150,9 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
150150
}
151151

152152
// enforceExistingCgroup updates the limits `rl` on existing cgroup `cName` using `cgroupManager` interface.
153-
func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.ResourceList, compressibleResources bool) error {
154-
rp := getCgroupConfig(rl, compressibleResources)
155-
153+
func (cm *containerManagerImpl) enforceExistingCgroup(cNameStr string, rl v1.ResourceList, compressibleResources bool) error {
154+
cName := cm.cgroupManager.CgroupName(cNameStr)
155+
rp := cm.getCgroupConfig(rl, compressibleResources)
156156
if rp == nil {
157157
return fmt.Errorf("%q cgroup is not configured properly", cName)
158158
}
@@ -173,17 +173,40 @@ func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.
173173
ResourceParameters: rp,
174174
}
175175
klog.V(4).InfoS("Enforcing limits on cgroup", "cgroupName", cName, "cpuShares", cgroupConfig.ResourceParameters.CPUShares, "memory", cgroupConfig.ResourceParameters.Memory, "pidsLimit", cgroupConfig.ResourceParameters.PidsLimit)
176-
if err := cgroupManager.Validate(cgroupConfig.Name); err != nil {
176+
if err := cm.cgroupManager.Validate(cgroupConfig.Name); err != nil {
177177
return err
178178
}
179-
if err := cgroupManager.Update(cgroupConfig); err != nil {
179+
if err := cm.cgroupManager.Update(cgroupConfig); err != nil {
180180
return err
181181
}
182182
return nil
183183
}
184184

185185
// getCgroupConfig returns a ResourceConfig object that can be used to create or update cgroups via CgroupManager interface.
186-
func getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig {
186+
func (cm *containerManagerImpl) getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig {
187+
rc := getCgroupConfigInternal(rl, compressibleResourcesOnly)
188+
if rc == nil {
189+
return nil
190+
}
191+
192+
// In the case of a None policy, cgroupv2 and systemd cgroup manager, we must make sure systemd is aware of the cpuset cgroup.
193+
// By default, systemd will not create it, as we've not chosen to delegate it, and we haven't included it in the Apply() request.
194+
// However, this causes a bug where kubelet restarts unnecessarily (cpuset cgroup is created in the cgroupfs, but systemd
195+
// doesn't know about it and deletes it, and then kubelet doesn't continue because the cgroup isn't configured as expected).
196+
// An alternative is to delegate the `cpuset` cgroup to the kubelet, but that would require some plumbing in libcontainer,
197+
// and this is sufficient.
198+
// Only do so on None policy, as Static policy will do its own updating of the cpuset.
199+
// Please see the comment on policy none's GetAllocatableCPUs
200+
if cm.cpuManager.GetAllocatableCPUs().IsEmpty() {
201+
rc.CPUSet = cm.cpuManager.GetAllCPUs()
202+
}
203+
204+
return rc
205+
}
206+
207+
// getCgroupConfigInternal are the pieces of getCgroupConfig that don't require the cm object.
208+
// This is added to unit test without needing to create a full containerManager
209+
func getCgroupConfigInternal(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig {
187210
// TODO(vishh): Set CPU Quota if necessary.
188211
if rl == nil {
189212
return nil
@@ -216,7 +239,6 @@ func getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *Resour
216239
}
217240
rc.HugePageLimit = HugePageLimits(rl)
218241
}
219-
220242
return &rc
221243
}
222244

pkg/kubelet/cm/node_container_manager_linux_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ func TestGetCgroupConfig(t *testing.T) {
458458

459459
for _, tc := range cases {
460460
t.Run(tc.name, func(t *testing.T) {
461-
actual := getCgroupConfig(tc.resourceList, tc.compressibleResources)
461+
actual := getCgroupConfigInternal(tc.resourceList, tc.compressibleResources)
462462
tc.checks(actual, t)
463463
})
464464
}

pkg/kubelet/cm/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ package cm
1919
import (
2020
v1 "k8s.io/api/core/v1"
2121
"k8s.io/apimachinery/pkg/types"
22+
"k8s.io/utils/cpuset"
2223
)
2324

2425
// ResourceConfig holds information about all the supported cgroup resource parameters.
2526
type ResourceConfig struct {
2627
// Memory limit (in bytes).
2728
Memory *int64
29+
// CPU set (number of CPUs the cgroup has access to).
30+
CPUSet cpuset.CPUSet
2831
// CPU shares (relative weight vs. other containers).
2932
CPUShares *uint64
3033
// CPU hardcap limit (in usecs). Allowed cpu time in a given period.

test/e2e_node/node_container_manager_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,64 @@ var _ = SIGDescribe("Node Container Manager", framework.WithSerial(), func() {
7676
framework.ExpectNoError(runTest(ctx, f))
7777
})
7878
})
79+
f.Describe("Validate CGroup management", func() {
80+
// Regression test for https://issues.k8s.io/125923
81+
// In this issue there's a race involved with systemd which seems to manifest most likely, or perhaps only
82+
// (data gathered so far seems inconclusive) on the very first boot of the machine, so restarting the kubelet
83+
// seems not sufficient. OTOH, the exact reproducer seems to require a dedicate lane with only this test, or
84+
// to reboot the machine before to run this test. Both are practically unrealistic in CI.
85+
// The closest approximation is this test in this current form, using a kubelet restart. This at least
86+
// acts as non regression testing, so it still brings value.
87+
ginkgo.It("should correctly start with cpumanager none policy in use with systemd", func(ctx context.Context) {
88+
if !IsCgroup2UnifiedMode() {
89+
ginkgo.Skip("this test requires cgroups v2")
90+
}
91+
92+
var err error
93+
var oldCfg *kubeletconfig.KubeletConfiguration
94+
// Get current kubelet configuration
95+
oldCfg, err = getCurrentKubeletConfig(ctx)
96+
framework.ExpectNoError(err)
97+
98+
ginkgo.DeferCleanup(func(ctx context.Context) {
99+
if oldCfg != nil {
100+
// Update the Kubelet configuration.
101+
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(oldCfg))
102+
103+
ginkgo.By("Restarting the kubelet")
104+
restartKubelet(true)
105+
106+
// wait until the kubelet health check will succeed
107+
gomega.Eventually(ctx, func(ctx context.Context) bool {
108+
return kubeletHealthCheck(kubeletHealthCheckURL)
109+
}).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrueBecause("expected kubelet to be in healthy state"))
110+
ginkgo.By("Started the kubelet")
111+
}
112+
})
113+
114+
newCfg := oldCfg.DeepCopy()
115+
// Change existing kubelet configuration
116+
newCfg.CPUManagerPolicy = "none"
117+
newCfg.CgroupDriver = "systemd"
118+
newCfg.FailCgroupV1 = true // extra safety. We want to avoid false negatives though, so we added the skip check earlier
119+
120+
// Update the Kubelet configuration.
121+
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(newCfg))
122+
123+
ginkgo.By("Restarting the kubelet")
124+
restartKubelet(true)
125+
126+
// wait until the kubelet health check will succeed
127+
gomega.Eventually(ctx, func(ctx context.Context) bool {
128+
return kubeletHealthCheck(kubeletHealthCheckURL)
129+
}).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrueBecause("expected kubelet to be in healthy state"))
130+
ginkgo.By("Started the kubelet")
131+
132+
gomega.Consistently(ctx, func(ctx context.Context) bool {
133+
return getNodeReadyStatus(ctx, f) && kubeletHealthCheck(kubeletHealthCheckURL)
134+
}).WithTimeout(2 * time.Minute).WithPolling(2 * time.Second).Should(gomega.BeTrueBecause("node keeps reporting ready status"))
135+
})
136+
})
79137
})
80138

81139
func expectFileValToEqual(filePath string, expectedValue, delta int64) error {

0 commit comments

Comments
 (0)