Skip to content

Commit 08e5781

Browse files
authored
Merge pull request kubernetes#84525 from klueska/upstream-fix-hint-generation-after-kubelet-restart
Fix bug in TopologyManager hint generation after kubelet restart
2 parents 9d708b0 + 4d4d4bd commit 08e5781

File tree

7 files changed

+348
-34
lines changed

7 files changed

+348
-34
lines changed

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ func (m *manager) State() state.Reader {
229229
}
230230

231231
func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint {
232+
// Garbage collect any stranded resources before providing TopologyHints
233+
m.removeStaleState()
232234
// Delegate to active policy
233235
return m.policy.GetTopologyHints(m.state, pod, container)
234236
}
@@ -239,15 +241,68 @@ type reconciledContainer struct {
239241
containerID string
240242
}
241243

242-
func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
244+
func (m *manager) removeStaleState() {
245+
// Only once all sources are ready do we attempt to remove any stale state.
246+
// This ensures that the call to `m.activePods()` below will succeed with
247+
// the actual active pods list.
243248
if !m.sourcesReady.AllReady() {
244249
return
245250
}
251+
252+
// We grab the lock to ensure that no new containers will grab CPUs while
253+
// executing the code below. Without this lock, its possible that we end up
254+
// removing state that is newly added by an asynchronous call to
255+
// AddContainer() during the execution of this code.
256+
m.Lock()
257+
defer m.Unlock()
258+
259+
// We remove stale state very conservatively, only removing *any* state
260+
// once we know for sure that we wont be accidentally removing state that
261+
// is still valid. Since this function is called periodically, we will just
262+
// try again next time this function is called.
263+
activePods := m.activePods()
264+
if len(activePods) == 0 {
265+
// If there are no active pods, skip the removal of stale state.
266+
return
267+
}
268+
269+
// Build a list of containerIDs for all containers in all active Pods.
270+
activeContainers := make(map[string]struct{})
271+
for _, pod := range activePods {
272+
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
273+
if !ok {
274+
// If even one pod does not have it's status set, skip state removal.
275+
return
276+
}
277+
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
278+
containerID, err := findContainerIDByName(&pstatus, container.Name)
279+
if err != nil {
280+
// If even one container does not have it's containerID set, skip state removal.
281+
return
282+
}
283+
activeContainers[containerID] = struct{}{}
284+
}
285+
}
286+
287+
// Loop through the CPUManager state. Remove any state for containers not
288+
// in the `activeContainers` list built above. The shortcircuits in place
289+
// above ensure that no erroneous state will ever be removed.
290+
for containerID := range m.state.GetCPUAssignments() {
291+
if _, ok := activeContainers[containerID]; !ok {
292+
klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID)
293+
err := m.policy.RemoveContainer(m.state, containerID)
294+
if err != nil {
295+
klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err)
296+
}
297+
}
298+
}
299+
}
300+
301+
func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
246302
success = []reconciledContainer{}
247303
failure = []reconciledContainer{}
248304

249-
activeContainers := make(map[string]*v1.Pod)
250-
305+
m.removeStaleState()
251306
for _, pod := range m.activePods() {
252307
allContainers := pod.Spec.InitContainers
253308
allContainers = append(allContainers, pod.Spec.Containers...)
@@ -286,8 +341,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
286341
}
287342
}
288343

289-
activeContainers[containerID] = pod
290-
291344
cset := m.state.GetCPUSetOrDefault(containerID)
292345
if cset.IsEmpty() {
293346
// NOTE: This should not happen outside of tests.
@@ -306,16 +359,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
306359
success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
307360
}
308361
}
309-
310-
for containerID := range m.state.GetCPUAssignments() {
311-
if pod, ok := activeContainers[containerID]; !ok {
312-
err := m.RemoveContainer(containerID)
313-
if err != nil {
314-
klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err)
315-
failure = append(failure, reconciledContainer{pod.Name, "", containerID})
316-
}
317-
}
318-
}
319362
return success, failure
320363
}
321364

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,23 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod v1.Pod, container v1.
320320
return nil
321321
}
322322

323+
// Short circuit to regenerate the same hints if there are already
324+
// guaranteed CPUs allocated to the Container. This might happen after a
325+
// kubelet restart, for example.
326+
containerID, _ := findContainerIDByName(&pod.Status, container.Name)
327+
if allocated, exists := s.GetCPUSet(containerID); exists {
328+
if allocated.Size() != requested {
329+
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", string(pod.UID), container.Name, requested, allocated.Size())
330+
return map[string][]topologymanager.TopologyHint{
331+
string(v1.ResourceCPU): {},
332+
}
333+
}
334+
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", string(pod.UID), container.Name)
335+
return map[string][]topologymanager.TopologyHint{
336+
string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, requested),
337+
}
338+
}
339+
323340
// Get a list of available CPUs.
324341
available := p.assignableCPUs(s)
325342

pkg/kubelet/cm/cpumanager/policy_static_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,20 @@ func TestStaticPolicyAdd(t *testing.T) {
213213
expCPUAlloc: true,
214214
expCSet: cpuset.NewCPUSet(1, 5),
215215
},
216+
{
217+
description: "GuPodMultipleCores, SingleSocketHT, ExpectSameAllocation",
218+
topo: topoSingleSocketHT,
219+
numReservedCPUs: 1,
220+
containerID: "fakeID3",
221+
stAssignments: state.ContainerCPUAssignments{
222+
"fakeID3": cpuset.NewCPUSet(2, 3, 6, 7),
223+
},
224+
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
225+
pod: makePod("4000m", "4000m"),
226+
expErr: nil,
227+
expCPUAlloc: true,
228+
expCSet: cpuset.NewCPUSet(2, 3, 6, 7),
229+
},
216230
{
217231
description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocket",
218232
topo: topoDualSocketHT,

pkg/kubelet/cm/cpumanager/topology_hints_test.go

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
cadvisorapi "github.com/google/cadvisor/info/v1"
2525
v1 "k8s.io/api/core/v1"
26+
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
2627
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
2728
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
2829
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@@ -72,6 +73,7 @@ func TestGetTopologyHints(t *testing.T) {
7273
name string
7374
pod v1.Pod
7475
container v1.Container
76+
assignments state.ContainerCPUAssignments
7577
defaultCPUSet cpuset.CPUSet
7678
expectedHints []topologymanager.TopologyHint
7779
}{
@@ -142,6 +144,86 @@ func TestGetTopologyHints(t *testing.T) {
142144
},
143145
},
144146
},
147+
{
148+
name: "Request more CPUs than available",
149+
pod: *testPod2,
150+
container: *testContainer2,
151+
defaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
152+
expectedHints: nil,
153+
},
154+
{
155+
name: "Regenerate Single-Node NUMA Hints if already allocated 1/2",
156+
pod: *testPod1,
157+
container: *testContainer1,
158+
assignments: state.ContainerCPUAssignments{
159+
"": cpuset.NewCPUSet(0, 6),
160+
},
161+
defaultCPUSet: cpuset.NewCPUSet(),
162+
expectedHints: []topologymanager.TopologyHint{
163+
{
164+
NUMANodeAffinity: firstSocketMask,
165+
Preferred: true,
166+
},
167+
{
168+
NUMANodeAffinity: crossSocketMask,
169+
Preferred: false,
170+
},
171+
},
172+
},
173+
{
174+
name: "Regenerate Single-Node NUMA Hints if already allocated 1/2",
175+
pod: *testPod1,
176+
container: *testContainer1,
177+
assignments: state.ContainerCPUAssignments{
178+
"": cpuset.NewCPUSet(3, 9),
179+
},
180+
defaultCPUSet: cpuset.NewCPUSet(),
181+
expectedHints: []topologymanager.TopologyHint{
182+
{
183+
NUMANodeAffinity: secondSocketMask,
184+
Preferred: true,
185+
},
186+
{
187+
NUMANodeAffinity: crossSocketMask,
188+
Preferred: false,
189+
},
190+
},
191+
},
192+
{
193+
name: "Regenerate Cross-NUMA Hints if already allocated",
194+
pod: *testPod4,
195+
container: *testContainer4,
196+
assignments: state.ContainerCPUAssignments{
197+
"": cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
198+
},
199+
defaultCPUSet: cpuset.NewCPUSet(),
200+
expectedHints: []topologymanager.TopologyHint{
201+
{
202+
NUMANodeAffinity: crossSocketMask,
203+
Preferred: true,
204+
},
205+
},
206+
},
207+
{
208+
name: "Requested less than already allocated",
209+
pod: *testPod1,
210+
container: *testContainer1,
211+
assignments: state.ContainerCPUAssignments{
212+
"": cpuset.NewCPUSet(0, 6, 3, 9),
213+
},
214+
defaultCPUSet: cpuset.NewCPUSet(),
215+
expectedHints: []topologymanager.TopologyHint{},
216+
},
217+
{
218+
name: "Requested more than already allocated",
219+
pod: *testPod4,
220+
container: *testContainer4,
221+
assignments: state.ContainerCPUAssignments{
222+
"": cpuset.NewCPUSet(0, 6, 3, 9),
223+
},
224+
defaultCPUSet: cpuset.NewCPUSet(),
225+
expectedHints: []topologymanager.TopologyHint{},
226+
},
145227
}
146228
for _, tc := range tcases {
147229
topology, _ := topology.Discover(&machineInfo, numaNodeInfo)
@@ -151,9 +233,13 @@ func TestGetTopologyHints(t *testing.T) {
151233
topology: topology,
152234
},
153235
state: &mockState{
236+
assignments: tc.assignments,
154237
defaultCPUSet: tc.defaultCPUSet,
155238
},
156-
topology: topology,
239+
topology: topology,
240+
activePods: func() []*v1.Pod { return nil },
241+
podStatusProvider: mockPodStatusProvider{},
242+
sourcesReady: &sourcesReadyStub{},
157243
}
158244

159245
hints := m.GetTopologyHints(tc.pod, tc.container)[string(v1.ResourceCPU)]

pkg/kubelet/cm/devicemanager/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ go_test(
6060
"//staging/src/k8s.io/api/core/v1:go_default_library",
6161
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
6262
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
63+
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
6364
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
6465
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
6566
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

pkg/kubelet/cm/devicemanager/topology_hints.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,49 @@ import (
2828
// ensures the Device Manager is consulted when Topology Aware Hints for each
2929
// container are created.
3030
func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint {
31-
deviceHints := make(map[string][]topologymanager.TopologyHint)
31+
// Garbage collect any stranded device resources before providing TopologyHints
32+
m.updateAllocatedDevices(m.activePods())
3233

34+
// Loop through all device resources and generate TopologyHints for them..
35+
deviceHints := make(map[string][]topologymanager.TopologyHint)
3336
for resourceObj, requestedObj := range container.Resources.Limits {
3437
resource := string(resourceObj)
3538
requested := int(requestedObj.Value())
3639

40+
// Only consider resources associated with a device plugin.
3741
if m.isDevicePluginResource(resource) {
42+
// Only consider devices that actually container topology information.
3843
if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
3944
klog.Infof("[devicemanager] Resource '%v' does not have a topology preference", resource)
4045
deviceHints[resource] = nil
4146
continue
4247
}
4348

49+
// Short circuit to regenerate the same hints if there are already
50+
// devices allocated to the Container. This might happen after a
51+
// kubelet restart, for example.
52+
allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource)
53+
if allocated.Len() > 0 {
54+
if allocated.Len() != requested {
55+
klog.Errorf("[devicemanager] Resource '%v' already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", resource, string(pod.UID), container.Name, requested, allocated.Len())
56+
deviceHints[resource] = []topologymanager.TopologyHint{}
57+
continue
58+
}
59+
klog.Infof("[devicemanager] Regenerating TopologyHints for resource '%v' already allocated to (pod %v, container %v)", resource, string(pod.UID), container.Name)
60+
deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, requested)
61+
continue
62+
}
63+
64+
// Get the list of available devices, for which TopologyHints should be generated.
4465
available := m.getAvailableDevices(resource)
4566
if available.Len() < requested {
4667
klog.Errorf("[devicemanager] Unable to generate topology hints: requested number of devices unavailable for '%s': requested: %d, available: %d", resource, requested, available.Len())
4768
deviceHints[resource] = []topologymanager.TopologyHint{}
4869
continue
4970
}
5071

72+
// Generate TopologyHints for this resource given the current
73+
// request size and the list of available devices.
5174
deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, requested)
5275
}
5376
}
@@ -66,8 +89,6 @@ func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool {
6689
}
6790

6891
func (m *ManagerImpl) getAvailableDevices(resource string) sets.String {
69-
// Gets Devices in use.
70-
m.updateAllocatedDevices(m.activePods())
7192
// Strip all devices in use from the list of healthy ones.
7293
return m.healthyDevices[resource].Difference(m.allocatedDevices[resource])
7394
}

0 commit comments

Comments
 (0)