diff --git a/pkg/kubelet/cm/cgroup_manager_linux.go b/pkg/kubelet/cm/cgroup_manager_linux.go index 11267c96db198..0a668a76895d6 100644 --- a/pkg/kubelet/cm/cgroup_manager_linux.go +++ b/pkg/kubelet/cm/cgroup_manager_linux.go @@ -383,6 +383,9 @@ func (m *cgroupManagerImpl) toResources(resourceConfig *ResourceConfig) *libcont if resourceConfig.PidsLimit != nil { resources.PidsLimit = *resourceConfig.PidsLimit } + if !resourceConfig.CPUSet.IsEmpty() { + resources.CpusetCpus = resourceConfig.CPUSet.String() + } m.maybeSetHugetlb(resourceConfig, resources) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index d2ca5f5ade202..69b3690695e40 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -94,6 +94,10 @@ type Manager interface { // GetCPUAffinity returns cpuset which includes cpus from shared pools // as well as exclusively allocated cpus GetCPUAffinity(podUID, containerName string) cpuset.CPUSet + + // GetAllCPUs returns all the CPUs known by cpumanager, as reported by the + // hardware discovery. Maps to the CPU capacity. + GetAllCPUs() cpuset.CPUSet } type manager struct { @@ -137,7 +141,11 @@ type manager struct { // stateFileDirectory holds the directory where the state file for checkpoints is held. stateFileDirectory string - // allocatableCPUs is the set of online CPUs as reported by the system + // allCPUs is the set of online CPUs as reported by the system + allCPUs cpuset.CPUSet + + // allocatableCPUs is the set of online CPUs as reported by the system, + // and available for allocation, minus the reserved set allocatableCPUs cpuset.CPUSet // pendingAdmissionPod contain the pod during the admission phase @@ -157,6 +165,11 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc var policy Policy var err error + topo, err = topology.Discover(machineInfo) + if err != nil { + return nil, err + } + switch policyName(cpuPolicyName) { case PolicyNone: @@ -166,10 +179,6 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc } case PolicyStatic: - topo, err = topology.Discover(machineInfo) - if err != nil { - return nil, err - } klog.InfoS("Detected CPU topology", "topology", topo) reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU] @@ -206,6 +215,7 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc topology: topo, nodeAllocatableReservation: nodeAllocatableReservation, stateFileDirectory: stateFileDirectory, + allCPUs: topo.CPUDetails.CPUs(), } manager.sourcesReady = &sourcesReadyStub{} return manager, nil @@ -340,6 +350,10 @@ func (m *manager) GetAllocatableCPUs() cpuset.CPUSet { return m.allocatableCPUs.Clone() } +func (m *manager) GetAllCPUs() cpuset.CPUSet { + return m.allCPUs.Clone() +} + type reconciledContainer struct { podName string containerName string diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 250f1eb014a36..e60db6a00489c 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -645,15 +645,8 @@ func TestCPUManagerGenerate(t *testing.T) { if rawMgr.policy.Name() != testCase.expectedPolicy { t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), testCase.expectedPolicy) } - if rawMgr.policy.Name() == string(PolicyNone) { - if rawMgr.topology != nil { - t.Errorf("Expected topology to be nil for 'none' policy. Have: %q", rawMgr.topology) - } - } - if rawMgr.policy.Name() != string(PolicyNone) { - if rawMgr.topology == nil { - t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology) - } + if rawMgr.topology == nil { + t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology) } } }) diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 933697051355e..9a0329db443bb 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -85,6 +85,11 @@ func (m *fakeManager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet return cpuset.CPUSet{} } +func (m *fakeManager) GetAllCPUs() cpuset.CPUSet { + klog.InfoS("GetAllCPUs") + return cpuset.CPUSet{} +} + // NewFakeManager creates empty/fake cpu manager func NewFakeManager() Manager { return &fakeManager{ diff --git a/pkg/kubelet/cm/node_container_manager_linux.go b/pkg/kubelet/cm/node_container_manager_linux.go index 74221c67047b4..d90279a01124d 100644 --- a/pkg/kubelet/cm/node_container_manager_linux.go +++ b/pkg/kubelet/cm/node_container_manager_linux.go @@ -52,7 +52,7 @@ func (cm *containerManagerImpl) createNodeAllocatableCgroups() error { cgroupConfig := &CgroupConfig{ Name: cm.cgroupRoot, // The default limits for cpu shares can be very low which can lead to CPU starvation for pods. - ResourceParameters: getCgroupConfig(nodeAllocatable), + ResourceParameters: cm.getCgroupConfig(nodeAllocatable), } if cm.cgroupManager.Exists(cgroupConfig.Name) { return nil @@ -80,7 +80,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { cgroupConfig := &CgroupConfig{ Name: cm.cgroupRoot, - ResourceParameters: getCgroupConfig(nodeAllocatable), + ResourceParameters: cm.getCgroupConfig(nodeAllocatable), } // Using ObjectReference for events as the node maybe not cached; refer to #42701 for detail. @@ -114,7 +114,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { // Now apply kube reserved and system reserved limits if required. if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedEnforcementKey) { klog.V(2).InfoS("Enforcing system reserved on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved) - if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved); err != nil { + if err := cm.enforceExistingCgroup(nc.SystemReservedCgroupName, nc.SystemReserved); err != nil { message := fmt.Sprintf("Failed to enforce System Reserved Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err) cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message) return fmt.Errorf(message) @@ -123,7 +123,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { } if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedEnforcementKey) { klog.V(2).InfoS("Enforcing kube reserved on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved) - if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved); err != nil { + if err := cm.enforceExistingCgroup(nc.KubeReservedCgroupName, nc.KubeReserved); err != nil { message := fmt.Sprintf("Failed to enforce Kube Reserved Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err) cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message) return fmt.Errorf(message) @@ -134,8 +134,9 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { } // enforceExistingCgroup updates the limits `rl` on existing cgroup `cName` using `cgroupManager` interface. -func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.ResourceList) error { - rp := getCgroupConfig(rl) +func (cm *containerManagerImpl) enforceExistingCgroup(cNameStr string, rl v1.ResourceList) error { + cName := cm.cgroupManager.CgroupName(cNameStr) + rp := cm.getCgroupConfig(rl) if rp == nil { return fmt.Errorf("%q cgroup is not configured properly", cName) } @@ -156,17 +157,17 @@ func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1. ResourceParameters: rp, } klog.V(4).InfoS("Enforcing limits on cgroup", "cgroupName", cName, "cpuShares", cgroupConfig.ResourceParameters.CPUShares, "memory", cgroupConfig.ResourceParameters.Memory, "pidsLimit", cgroupConfig.ResourceParameters.PidsLimit) - if err := cgroupManager.Validate(cgroupConfig.Name); err != nil { + if err := cm.cgroupManager.Validate(cgroupConfig.Name); err != nil { return err } - if err := cgroupManager.Update(cgroupConfig); err != nil { + if err := cm.cgroupManager.Update(cgroupConfig); err != nil { return err } return nil } // getCgroupConfig returns a ResourceConfig object that can be used to create or update cgroups via CgroupManager interface. -func getCgroupConfig(rl v1.ResourceList) *ResourceConfig { +func (cm *containerManagerImpl) getCgroupConfig(rl v1.ResourceList) *ResourceConfig { // TODO(vishh): Set CPU Quota if necessary. if rl == nil { return nil @@ -188,6 +189,18 @@ func getCgroupConfig(rl v1.ResourceList) *ResourceConfig { } rc.HugePageLimit = HugePageLimits(rl) + // In the case of a None policy, cgroupv2 and systemd cgroup manager, we must make sure systemd is aware of the cpuset cgroup. + // By default, systemd will not create it, as we've not chosen to delegate it, and we haven't included it in the Apply() request. + // However, this causes a bug where kubelet restarts unnecessarily (cpuset cgroup is created in the cgroupfs, but systemd + // doesn't know about it and deletes it, and then kubelet doesn't continue because the cgroup isn't configured as expected). + // An alternative is to delegate the `cpuset` cgroup to the kubelet, but that would require some plumbing in libcontainer, + // and this is sufficient. + // Only do so on None policy, as Static policy will do its own updating of the cpuset. + // Please see the comment on policy none's GetAllocatableCPUs + if cm.cpuManager.GetAllocatableCPUs().IsEmpty() { + rc.CPUSet = cm.cpuManager.GetAllCPUs() + } + return &rc } diff --git a/pkg/kubelet/cm/types.go b/pkg/kubelet/cm/types.go index 01011900a6051..fe09e897d2b04 100644 --- a/pkg/kubelet/cm/types.go +++ b/pkg/kubelet/cm/types.go @@ -19,12 +19,15 @@ package cm import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) // ResourceConfig holds information about all the supported cgroup resource parameters. type ResourceConfig struct { // Memory limit (in bytes). Memory *int64 + // CPU set (number of cpus the cgroup has access to). + CPUSet cpuset.CPUSet // CPU shares (relative weight vs. other containers). CPUShares *uint64 // CPU hardcap limit (in usecs). Allowed cpu time in a given period. diff --git a/test/e2e_node/node_container_manager_test.go b/test/e2e_node/node_container_manager_test.go index ae57fa75e3025..9cad91d8b6363 100644 --- a/test/e2e_node/node_container_manager_test.go +++ b/test/e2e_node/node_container_manager_test.go @@ -71,6 +71,63 @@ var _ = SIGDescribe("Node Container Manager [Serial]", func() { framework.ExpectNoError(runTest(ctx, f)) }) }) + ginkgo.Context("Validate CGroup management", func() { + // Regression test for https://issues.k8s.io/125923 + // In this issue there's a race involved with systemd which seems to manifest most likely, or perhaps only + // (data gathered so far seems inconclusive) on the very first boot of the machine, so restarting the kubelet + // seems not sufficient. OTOH, the exact reproducer seems to require a dedicate lane with only this test, or + // to reboot the machine before to run this test. Both are practically unrealistic in CI. + // The closest approximation is this test in this current form, using a kubelet restart. This at least + // acts as non regression testing, so it still brings value. + ginkgo.It("should correctly start with cpumanager none policy in use with systemd", func(ctx context.Context) { + if !IsCgroup2UnifiedMode() { + ginkgo.Skip("this test requires cgroups v2") + } + + var err error + var oldCfg *kubeletconfig.KubeletConfiguration + // Get current kubelet configuration + oldCfg, err = getCurrentKubeletConfig(ctx) + framework.ExpectNoError(err) + + ginkgo.DeferCleanup(func(ctx context.Context) { + if oldCfg != nil { + // Update the Kubelet configuration. + framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(oldCfg)) + + ginkgo.By("Restarting the kubelet") + restartKubelet(true) + + // wait until the kubelet health check will succeed + gomega.Eventually(ctx, func(ctx context.Context) bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrue()) + ginkgo.By("Started the kubelet") + } + }) + + newCfg := oldCfg.DeepCopy() + // Change existing kubelet configuration + newCfg.CPUManagerPolicy = "none" + newCfg.CgroupDriver = "systemd" + + // Update the Kubelet configuration. + framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(newCfg)) + + ginkgo.By("Restarting the kubelet") + restartKubelet(true) + + // wait until the kubelet health check will succeed + gomega.Eventually(ctx, func(ctx context.Context) bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrue()) + ginkgo.By("Started the kubelet") + + gomega.Consistently(ctx, func(ctx context.Context) bool { + return getNodeReadyStatus(ctx, f) && kubeletHealthCheck(kubeletHealthCheckURL) + }).WithTimeout(2 * time.Minute).WithPolling(2 * time.Second).Should(gomega.BeTrue()) + }) + }) }) func expectFileValToEqual(filePath string, expectedValue, delta int64) error {