Skip to content

Commit a339a36

Browse files
authored
Merge pull request kubernetes#127506 from ffromani/cpu-pool-size-metrics
node: metrics: add metrics about cpu pool sizes
2 parents f087575 + 14ec0ed commit a339a36

File tree

11 files changed

+179
-232
lines changed

11 files changed

+179
-232
lines changed

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,6 @@ type manager struct {
146146
// allocatableCPUs is the set of online CPUs as reported by the system,
147147
// and available for allocation, minus the reserved set
148148
allocatableCPUs cpuset.CPUSet
149-
150-
// pendingAdmissionPod contain the pod during the admission phase
151-
pendingAdmissionPod *v1.Pod
152149
}
153150

154151
var _ Manager = &manager{}
@@ -254,10 +251,6 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
254251
}
255252

256253
func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
257-
// The pod is during the admission phase. We need to save the pod to avoid it
258-
// being cleaned before the admission ended
259-
m.setPodPendingAdmission(p)
260-
261254
// Garbage collect any stranded resources before allocating CPUs.
262255
m.removeStaleState()
263256

@@ -326,19 +319,13 @@ func (m *manager) State() state.Reader {
326319
}
327320

328321
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
329-
// The pod is during the admission phase. We need to save the pod to avoid it
330-
// being cleaned before the admission ended
331-
m.setPodPendingAdmission(pod)
332322
// Garbage collect any stranded resources before providing TopologyHints
333323
m.removeStaleState()
334324
// Delegate to active policy
335325
return m.policy.GetTopologyHints(m.state, pod, container)
336326
}
337327

338328
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
339-
// The pod is during the admission phase. We need to save the pod to avoid it
340-
// being cleaned before the admission ended
341-
m.setPodPendingAdmission(pod)
342329
// Garbage collect any stranded resources before providing TopologyHints
343330
m.removeStaleState()
344331
// Delegate to active policy
@@ -375,14 +362,11 @@ func (m *manager) removeStaleState() {
375362
defer m.Unlock()
376363

377364
// Get the list of active pods.
378-
activeAndAdmittedPods := m.activePods()
379-
if m.pendingAdmissionPod != nil {
380-
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
381-
}
365+
activePods := m.activePods()
382366

383367
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
384368
activeContainers := make(map[string]map[string]struct{})
385-
for _, pod := range activeAndAdmittedPods {
369+
for _, pod := range activePods {
386370
activeContainers[string(pod.UID)] = make(map[string]struct{})
387371
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
388372
activeContainers[string(pod.UID)][container.Name] = struct{}{}
@@ -554,10 +538,3 @@ func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet {
554538
func (m *manager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet {
555539
return m.state.GetCPUSetOrDefault(podUID, containerName)
556540
}
557-
558-
func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
559-
m.Lock()
560-
defer m.Unlock()
561-
562-
m.pendingAdmissionPod = pod
563-
}

pkg/kubelet/cm/cpumanager/cpu_manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func TestCPUManagerAdd(t *testing.T) {
325325

326326
pod := makePod("fakePod", "fakeContainer", "2", "2")
327327
container := &pod.Spec.Containers[0]
328-
mgr.activePods = func() []*v1.Pod { return nil }
328+
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }
329329

330330
err := mgr.Allocate(pod, container)
331331
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
@@ -1321,7 +1321,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
13211321

13221322
pod := makePod("fakePod", "fakeContainer", "2", "2")
13231323
container := &pod.Spec.Containers[0]
1324-
mgr.activePods = func() []*v1.Pod { return nil }
1324+
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }
13251325

13261326
err := mgr.Allocate(pod, container)
13271327
if !reflect.DeepEqual(err, testCase.expAllocateErr) {

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ func (p *staticPolicy) Start(s state.State) error {
194194
klog.ErrorS(err, "Static policy invalid state, please drain node and remove policy state file")
195195
return err
196196
}
197+
p.initializeMetrics(s)
197198
return nil
198199
}
199200

@@ -370,8 +371,10 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
370371
klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
371372
return err
372373
}
374+
373375
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
374376
p.updateCPUsToReuse(pod, container, cpuset)
377+
p.updateMetricsOnAllocate(cpuset)
375378

376379
return nil
377380
}
@@ -397,6 +400,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
397400
// Mutate the shared pool, adding released cpus.
398401
toRelease = toRelease.Difference(cpusInUse)
399402
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
403+
p.updateMetricsOnRelease(toRelease)
400404
}
401405
return nil
402406
}
@@ -720,3 +724,30 @@ func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableC
720724

721725
return alignedCPUs
722726
}
727+
728+
func (p *staticPolicy) initializeMetrics(s state.State) {
729+
metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000))
730+
metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s)))
731+
}
732+
733+
func (p *staticPolicy) updateMetricsOnAllocate(cset cpuset.CPUSet) {
734+
ncpus := cset.Size()
735+
metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus))
736+
metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000))
737+
}
738+
739+
func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) {
740+
ncpus := cset.Size()
741+
metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(-ncpus))
742+
metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(ncpus * 1000))
743+
}
744+
745+
func countExclusiveCPUs(s state.State) int {
746+
exclusiveCPUs := 0
747+
for _, cpuAssign := range s.GetCPUAssignments() {
748+
for _, cset := range cpuAssign {
749+
exclusiveCPUs += cset.Size()
750+
}
751+
}
752+
return exclusiveCPUs
753+
}

pkg/kubelet/cm/cpumanager/topology_hints_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,6 @@ func TestGetTopologyHints(t *testing.T) {
245245
if len(tc.expectedHints) == 0 && len(hints) == 0 {
246246
continue
247247
}
248-
249-
if m.pendingAdmissionPod == nil {
250-
t.Errorf("The pendingAdmissionPod should point to the current pod after the call to GetTopologyHints()")
251-
}
252-
253248
sort.SliceStable(hints, func(i, j int) bool {
254249
return hints[i].LessThan(hints[j])
255250
})
@@ -298,7 +293,6 @@ func TestGetPodTopologyHints(t *testing.T) {
298293
if len(tc.expectedHints) == 0 && len(podHints) == 0 {
299294
continue
300295
}
301-
302296
sort.SliceStable(podHints, func(i, j int) bool {
303297
return podHints[i].LessThan(podHints[j])
304298
})

pkg/kubelet/cm/devicemanager/manager.go

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,6 @@ type ManagerImpl struct {
101101
// init containers.
102102
devicesToReuse PodReusableDevices
103103

104-
// pendingAdmissionPod contain the pod during the admission phase
105-
pendingAdmissionPod *v1.Pod
106-
107104
// containerMap provides a mapping from (pod, container) -> containerID
108105
// for all containers in a pod. Used to detect pods running across a restart
109106
containerMap containermap.ContainerMap
@@ -364,10 +361,6 @@ func (m *ManagerImpl) Stop() error {
364361
// Allocate is the call that you can use to allocate a set of devices
365362
// from the registered device plugins.
366363
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
367-
// The pod is during the admission phase. We need to save the pod to avoid it
368-
// being cleaned before the admission ended
369-
m.setPodPendingAdmission(pod)
370-
371364
if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
372365
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.Set[string])
373366
}
@@ -548,20 +541,14 @@ func (m *ManagerImpl) getCheckpoint() (checkpoint.DeviceManagerCheckpoint, error
548541

549542
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
550543
func (m *ManagerImpl) UpdateAllocatedDevices() {
544+
activePods := m.activePods()
551545
if !m.sourcesReady.AllReady() {
552546
return
553547
}
554-
555548
m.mutex.Lock()
556549
defer m.mutex.Unlock()
557-
558-
activeAndAdmittedPods := m.activePods()
559-
if m.pendingAdmissionPod != nil {
560-
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
561-
}
562-
563550
podsToBeRemoved := m.podDevices.pods()
564-
for _, pod := range activeAndAdmittedPods {
551+
for _, pod := range activePods {
565552
podsToBeRemoved.Delete(string(pod.UID))
566553
}
567554
if len(podsToBeRemoved) <= 0 {
@@ -1171,13 +1158,6 @@ func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
11711158
return len(checkpoints) == 0
11721159
}
11731160

1174-
func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
1175-
m.mutex.Lock()
1176-
defer m.mutex.Unlock()
1177-
1178-
m.pendingAdmissionPod = pod
1179-
}
1180-
11811161
func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
11821162
cntID, err := m.containerMap.GetContainerID(podUID, cntName)
11831163
if err != nil {

pkg/kubelet/cm/devicemanager/topology_hints.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,6 @@ import (
3131
// ensures the Device Manager is consulted when Topology Aware Hints for each
3232
// container are created.
3333
func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
34-
// The pod is during the admission phase. We need to save the pod to avoid it
35-
// being cleaned before the admission ended
36-
m.setPodPendingAdmission(pod)
37-
3834
// Garbage collect any stranded device resources before providing TopologyHints
3935
m.UpdateAllocatedDevices()
4036

@@ -87,10 +83,6 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
8783
// GetPodTopologyHints implements the topologymanager.HintProvider Interface which
8884
// ensures the Device Manager is consulted when Topology Aware Hints for Pod are created.
8985
func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
90-
// The pod is during the admission phase. We need to save the pod to avoid it
91-
// being cleaned before the admission ended
92-
m.setPodPendingAdmission(pod)
93-
9486
// Garbage collect any stranded device resources before providing TopologyHints
9587
m.UpdateAllocatedDevices()
9688

pkg/kubelet/cm/memorymanager/memory_manager.go

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,6 @@ type manager struct {
126126

127127
// allocatableMemory holds the allocatable memory for each NUMA node
128128
allocatableMemory []state.Block
129-
130-
// pendingAdmissionPod contain the pod during the admission phase
131-
pendingAdmissionPod *v1.Pod
132129
}
133130

134131
var _ Manager = &manager{}
@@ -242,10 +239,6 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.
242239

243240
// Allocate is called to pre-allocate memory resources during Pod admission.
244241
func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
245-
// The pod is during the admission phase. We need to save the pod to avoid it
246-
// being cleaned before the admission ended
247-
m.setPodPendingAdmission(pod)
248-
249242
// Garbage collect any stranded resources before allocation
250243
m.removeStaleState()
251244

@@ -284,10 +277,6 @@ func (m *manager) State() state.Reader {
284277

285278
// GetPodTopologyHints returns the topology hints for the topology manager
286279
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
287-
// The pod is during the admission phase. We need to save the pod to avoid it
288-
// being cleaned before the admission ended
289-
m.setPodPendingAdmission(pod)
290-
291280
// Garbage collect any stranded resources before providing TopologyHints
292281
m.removeStaleState()
293282
// Delegate to active policy
@@ -296,10 +285,6 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.
296285

297286
// GetTopologyHints returns the topology hints for the topology manager
298287
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
299-
// The pod is during the admission phase. We need to save the pod to avoid it
300-
// being cleaned before the admission ended
301-
m.setPodPendingAdmission(pod)
302-
303288
// Garbage collect any stranded resources before providing TopologyHints
304289
m.removeStaleState()
305290
// Delegate to active policy
@@ -322,15 +307,12 @@ func (m *manager) removeStaleState() {
322307
m.Lock()
323308
defer m.Unlock()
324309

325-
// Get the list of admitted and active pods.
326-
activeAndAdmittedPods := m.activePods()
327-
if m.pendingAdmissionPod != nil {
328-
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
329-
}
310+
// Get the list of active pods.
311+
activePods := m.activePods()
330312

331313
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
332314
activeContainers := make(map[string]map[string]struct{})
333-
for _, pod := range activeAndAdmittedPods {
315+
for _, pod := range activePods {
334316
activeContainers[string(pod.UID)] = make(map[string]struct{})
335317
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
336318
activeContainers[string(pod.UID)][container.Name] = struct{}{}
@@ -464,10 +446,3 @@ func (m *manager) GetAllocatableMemory() []state.Block {
464446
func (m *manager) GetMemory(podUID, containerName string) []state.Block {
465447
return m.state.GetMemoryBlocks(podUID, containerName)
466448
}
467-
468-
func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
469-
m.Lock()
470-
defer m.Unlock()
471-
472-
m.pendingAdmissionPod = pod
473-
}

0 commit comments

Comments
 (0)