Skip to content

Commit 83d33a9

Browse files
authored
Merge pull request kubernetes#130133 from ffromani/split-uncore-metrics
node: cpumgr: metrics: add uncore cache alignment metrics
2 parents 6ef1a1f + 5c17e7b commit 83d33a9

File tree

8 files changed

+412
-51
lines changed

8 files changed

+412
-51
lines changed

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
330330
}
331331
return
332332
}
333+
// TODO: move in updateMetricsOnAllocate
333334
if p.options.FullPhysicalCPUsOnly {
334335
// increment only if we know we allocate aligned resources
335336
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Inc()
@@ -369,8 +370,8 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
369370
}
370371
}
371372
}
372-
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
373-
p.updateCPUsToReuse(pod, container, cpuset)
373+
if cset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
374+
p.updateCPUsToReuse(pod, container, cset)
374375
klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
375376
return nil
376377
}
@@ -380,17 +381,17 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
380381
klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)
381382

382383
// Allocate CPUs according to the NUMA affinity contained in the hint.
383-
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
384+
cpuAllocation, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
384385
if err != nil {
385386
klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
386387
return err
387388
}
388389

389-
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
390-
p.updateCPUsToReuse(pod, container, cpuset)
391-
p.updateMetricsOnAllocate(cpuset)
390+
s.SetCPUSet(string(pod.UID), container.Name, cpuAllocation.CPUs)
391+
p.updateCPUsToReuse(pod, container, cpuAllocation.CPUs)
392+
p.updateMetricsOnAllocate(cpuAllocation)
392393

393-
klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuset)
394+
klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuAllocation.CPUs.String())
394395
return nil
395396
}
396397

@@ -420,13 +421,13 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
420421
return nil
421422
}
422423

423-
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
424+
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (topology.Allocation, error) {
424425
klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)
425426

426427
allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs)
427428

428429
// If there are aligned CPUs in numaAffinity, attempt to take those first.
429-
result := cpuset.New()
430+
result := topology.EmptyAllocation()
430431
if numaAffinity != nil {
431432
alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs)
432433

@@ -435,25 +436,26 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
435436
numAlignedToAlloc = numCPUs
436437
}
437438

438-
alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
439+
allocatedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
439440
if err != nil {
440-
return cpuset.New(), err
441+
return topology.EmptyAllocation(), err
441442
}
442443

443-
result = result.Union(alignedCPUs)
444+
result.CPUs = result.CPUs.Union(allocatedCPUs)
444445
}
445446

446447
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
447-
remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size())
448+
remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result.CPUs), numCPUs-result.CPUs.Size())
448449
if err != nil {
449-
return cpuset.New(), err
450+
return topology.EmptyAllocation(), err
450451
}
451-
result = result.Union(remainingCPUs)
452+
result.CPUs = result.CPUs.Union(remainingCPUs)
453+
result.Aligned = p.topology.CheckAlignment(result.CPUs)
452454

453455
// Remove allocated CPUs from the shared CPUSet.
454-
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
456+
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result.CPUs))
455457

456-
klog.InfoS("AllocateCPUs", "result", result)
458+
klog.InfoS("AllocateCPUs", "result", result.String())
457459
return result, nil
458460
}
459461

@@ -755,12 +757,17 @@ func (p *staticPolicy) initializeMetrics(s state.State) {
755757
metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000))
756758
metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s)))
757759
metrics.ContainerAlignedComputeResourcesFailure.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists
760+
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists
761+
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Add(0) // ensure the value exists
758762
}
759763

760-
func (p *staticPolicy) updateMetricsOnAllocate(cset cpuset.CPUSet) {
761-
ncpus := cset.Size()
764+
func (p *staticPolicy) updateMetricsOnAllocate(cpuAlloc topology.Allocation) {
765+
ncpus := cpuAlloc.CPUs.Size()
762766
metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus))
763767
metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000))
768+
if cpuAlloc.Aligned.UncoreCache {
769+
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Inc()
770+
}
764771
}
765772

766773
func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) {

pkg/kubelet/cm/cpumanager/policy_static_test.go

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest {
7070
}
7171

7272
func TestStaticPolicyName(t *testing.T) {
73-
policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil)
73+
policy, err := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil)
74+
if err != nil {
75+
t.Fatalf("NewStaticPolicy() failed: %v", err)
76+
}
7477

7578
policyName := policy.Name()
7679
if policyName != "static" {
@@ -168,13 +171,16 @@ func TestStaticPolicyStart(t *testing.T) {
168171
for _, testCase := range testCases {
169172
t.Run(testCase.description, func(t *testing.T) {
170173
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)
171-
p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options)
174+
p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options)
175+
if err != nil {
176+
t.Fatalf("NewStaticPolicy() failed: %v", err)
177+
}
172178
policy := p.(*staticPolicy)
173179
st := &mockState{
174180
assignments: testCase.stAssignments,
175181
defaultCPUSet: testCase.stDefaultCPUSet,
176182
}
177-
err := policy.Start(st)
183+
err = policy.Start(st)
178184
if !reflect.DeepEqual(err, testCase.expErr) {
179185
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
180186
testCase.description, testCase.expErr, err)
@@ -637,15 +643,18 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
637643
if testCase.reservedCPUs != nil {
638644
cpus = testCase.reservedCPUs.Clone()
639645
}
640-
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options)
646+
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options)
647+
if err != nil {
648+
t.Fatalf("NewStaticPolicy() failed: %v", err)
649+
}
641650

642651
st := &mockState{
643652
assignments: testCase.stAssignments,
644653
defaultCPUSet: testCase.stDefaultCPUSet,
645654
}
646655

647656
container := &testCase.pod.Spec.Containers[0]
648-
err := policy.Allocate(st, testCase.pod, container)
657+
err = policy.Allocate(st, testCase.pod, container)
649658
if !reflect.DeepEqual(err, testCase.expErr) {
650659
t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %q but got: %q",
651660
testCase.description, testCase.expErr, err)
@@ -658,13 +667,13 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
658667
testCase.description, container.Name, st.assignments)
659668
}
660669

661-
if !reflect.DeepEqual(cset, testCase.expCSet) {
662-
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v",
670+
if !cset.Equals(testCase.expCSet) {
671+
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s but got %s",
663672
testCase.description, testCase.expCSet, cset)
664673
}
665674

666675
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
667-
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
676+
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s to be disoint from the shared cpuset %s",
668677
testCase.description, cset, st.defaultCPUSet)
669678
}
670679
}
@@ -708,7 +717,10 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
708717
}
709718

710719
for _, testCase := range testCases {
711-
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
720+
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
721+
if err != nil {
722+
t.Fatalf("NewStaticPolicy() failed: %v", err)
723+
}
712724

713725
st := &mockState{
714726
assignments: testCase.stAssignments,
@@ -720,16 +732,16 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
720732
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
721733
policy.Allocate(st, pod, &container)
722734
}
723-
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) {
724-
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v",
735+
if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) {
736+
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %s but got %s",
725737
testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet)
726738
}
727739

728740
// remove
729741
policy.RemoveContainer(st, string(pod.UID), testCase.containerName)
730742

731-
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterRemove) {
732-
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v",
743+
if !st.defaultCPUSet.Equals(testCase.expCSetAfterRemove) {
744+
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %sv but got %s",
733745
testCase.description, testCase.expCSetAfterRemove, st.defaultCPUSet)
734746
}
735747
if _, found := st.assignments[string(pod.UID)][testCase.containerName]; found {
@@ -761,7 +773,10 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
761773
}
762774

763775
for _, testCase := range testCases {
764-
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
776+
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
777+
if err != nil {
778+
t.Fatalf("NewStaticPolicy() failed: %v", err)
779+
}
765780

766781
st := &mockState{
767782
assignments: testCase.stAssignments,
@@ -777,8 +792,8 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
777792
testCase.description, err)
778793
}
779794
}
780-
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) {
781-
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v",
795+
if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) {
796+
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %s but got %s",
782797
testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet)
783798
}
784799
}
@@ -843,7 +858,10 @@ func TestStaticPolicyRemove(t *testing.T) {
843858
}
844859

845860
for _, testCase := range testCases {
846-
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
861+
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
862+
if err != nil {
863+
t.Fatalf("NewStaticPolicy() failed: %v", err)
864+
}
847865

848866
st := &mockState{
849867
assignments: testCase.stAssignments,
@@ -852,8 +870,8 @@ func TestStaticPolicyRemove(t *testing.T) {
852870

853871
policy.RemoveContainer(st, testCase.podUID, testCase.containerName)
854872

855-
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSet) {
856-
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v",
873+
if !st.defaultCPUSet.Equals(testCase.expCSet) {
874+
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %s but got %s",
857875
testCase.description, testCase.expCSet, st.defaultCPUSet)
858876
}
859877

@@ -933,28 +951,31 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
933951
},
934952
}
935953
for _, tc := range testCases {
936-
p, _ := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil)
954+
p, err := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil)
955+
if err != nil {
956+
t.Fatalf("NewStaticPolicy() failed: %v", err)
957+
}
937958
policy := p.(*staticPolicy)
938959
st := &mockState{
939960
assignments: tc.stAssignments,
940961
defaultCPUSet: tc.stDefaultCPUSet,
941962
}
942-
err := policy.Start(st)
963+
err = policy.Start(st)
943964
if err != nil {
944965
t.Errorf("StaticPolicy Start() error (%v)", err)
945966
continue
946967
}
947968

948-
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.New())
969+
cpuAlloc, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.New())
949970
if err != nil {
950971
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v",
951972
tc.description, tc.expCSet, err)
952973
continue
953974
}
954975

955-
if !reflect.DeepEqual(tc.expCSet, cset) {
976+
if !tc.expCSet.Equals(cpuAlloc.CPUs) {
956977
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v but got %v",
957-
tc.description, tc.expCSet, cset)
978+
tc.description, tc.expCSet, cpuAlloc.CPUs)
958979
}
959980
}
960981
}
@@ -1107,15 +1128,18 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
11071128
}
11081129

11091130
for _, testCase := range testCases {
1110-
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
1131+
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
1132+
if err != nil {
1133+
t.Fatalf("NewStaticPolicy() failed: %v", err)
1134+
}
11111135

11121136
st := &mockState{
11131137
assignments: testCase.stAssignments,
11141138
defaultCPUSet: testCase.stDefaultCPUSet,
11151139
}
11161140

11171141
container := &testCase.pod.Spec.Containers[0]
1118-
err := policy.Allocate(st, testCase.pod, container)
1142+
err = policy.Allocate(st, testCase.pod, container)
11191143
if !reflect.DeepEqual(err, testCase.expErr) {
11201144
t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v",
11211145
testCase.description, testCase.expErr, err)
@@ -1128,13 +1152,13 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
11281152
testCase.description, container.Name, st.assignments)
11291153
}
11301154

1131-
if !reflect.DeepEqual(cset, testCase.expCSet) {
1132-
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v",
1155+
if !cset.Equals(testCase.expCSet) {
1156+
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s but got %s",
11331157
testCase.description, testCase.expCSet, cset)
11341158
}
11351159

11361160
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
1137-
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
1161+
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s to be disoint from the shared cpuset %s",
11381162
testCase.description, cset, st.defaultCPUSet)
11391163
}
11401164
}

0 commit comments

Comments
 (0)