Skip to content

Commit fba6365

Browse files
authored
Merge pull request kubernetes#130498 from swatisehgal/distribute-across-numa-e2e-tests
node: cpumgr: e2e: Tests for `distribute-cpus-across-numa` policy option
2 parents dc6f70c + 327ebcf commit fba6365

File tree

1 file changed

+237
-7
lines changed

1 file changed

+237
-7
lines changed

test/e2e_node/cpu_manager_test.go

Lines changed: 237 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"io/fs"
2424
"os"
2525
"os/exec"
26+
"path/filepath"
2627
"regexp"
2728
"strconv"
2829
"strings"
@@ -40,12 +41,19 @@ import (
4041

4142
"github.com/onsi/ginkgo/v2"
4243
"github.com/onsi/gomega"
44+
"github.com/onsi/gomega/gcustom"
45+
gomegatypes "github.com/onsi/gomega/types"
4346
"k8s.io/kubernetes/test/e2e/feature"
4447
"k8s.io/kubernetes/test/e2e/framework"
4548
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
4649
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
4750
)
4851

52+
const (
53+
minSMTLevel = 2
54+
minCPUCapacity = 2
55+
)
56+
4957
// Helper for makeCPUManagerPod().
5058
type ctnAttribute struct {
5159
ctnName string
@@ -877,9 +885,9 @@ func runCPUManagerTests(f *framework.Framework) {
877885
ginkgo.It("should assign CPUs as expected based on the Pod spec", func(ctx context.Context) {
878886
cpuCap, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f)
879887

880-
// Skip CPU Manager tests altogether if the CPU capacity < 2.
881-
if cpuCap < 2 {
882-
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU capacity < 2")
888+
// Skip CPU Manager tests altogether if the CPU capacity < minCPUCapacity.
889+
if cpuCap < minCPUCapacity {
890+
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU capacity < %d", minCPUCapacity)
883891
}
884892

885893
// Enable CPU Manager in the kubelet.
@@ -972,13 +980,14 @@ func runCPUManagerTests(f *framework.Framework) {
972980
smtLevel := getSMTLevel()
973981

974982
// strict SMT alignment is trivially verified and granted on non-SMT systems
975-
if smtLevel < 2 {
983+
if smtLevel < minSMTLevel {
976984
e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt)
977985
}
978986

979-
// our tests want to allocate a full core, so we need at last 2*2=4 virtual cpus
980-
if cpuAlloc < int64(smtLevel*2) {
981-
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < 4", fullCPUsOnlyOpt)
987+
// our tests want to allocate a full core, so we need at least 2*2=4 virtual cpus
988+
minCPUCount := int64(smtLevel * minCPUCapacity)
989+
if cpuAlloc < minCPUCount {
990+
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < %d", fullCPUsOnlyOpt, minCPUCount)
982991
}
983992

984993
framework.Logf("SMT level %d", smtLevel)
@@ -1153,6 +1162,155 @@ func runCPUManagerTests(f *framework.Framework) {
11531162
waitForContainerRemoval(ctx, pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
11541163
})
11551164

1165+
ginkgo.It("should assign packed CPUs with distribute-cpus-across-numa disabled and pcpu-only policy options enabled", func(ctx context.Context) {
1166+
fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption)
1167+
_, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f)
1168+
smtLevel := getSMTLevel()
1169+
1170+
// strict SMT alignment is trivially verified and granted on non-SMT systems
1171+
if smtLevel < minSMTLevel {
1172+
e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt)
1173+
}
1174+
1175+
// our tests want to allocate a full core, so we need at least 2*2=4 virtual cpus
1176+
minCPUCount := int64(smtLevel * minCPUCapacity)
1177+
if cpuAlloc < minCPUCount {
1178+
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < %d", fullCPUsOnlyOpt, minCPUCount)
1179+
}
1180+
1181+
framework.Logf("SMT level %d", smtLevel)
1182+
1183+
cpuPolicyOptions := map[string]string{
1184+
cpumanager.FullPCPUsOnlyOption: "true",
1185+
cpumanager.DistributeCPUsAcrossNUMAOption: "false",
1186+
}
1187+
newCfg := configureCPUManagerInKubelet(oldCfg,
1188+
&cpuManagerKubeletArguments{
1189+
policyName: string(cpumanager.PolicyStatic),
1190+
reservedSystemCPUs: cpuset.New(0),
1191+
enableCPUManagerOptions: true,
1192+
options: cpuPolicyOptions,
1193+
},
1194+
)
1195+
updateKubeletConfig(ctx, f, newCfg, true)
1196+
1197+
ctnAttrs := []ctnAttribute{
1198+
{
1199+
ctnName: "test-gu-container-distribute-cpus-across-numa-disabled",
1200+
cpuRequest: "2000m",
1201+
cpuLimit: "2000m",
1202+
},
1203+
}
1204+
pod := makeCPUManagerPod("test-pod-distribute-cpus-across-numa-disabled", ctnAttrs)
1205+
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
1206+
1207+
for _, cnt := range pod.Spec.Containers {
1208+
ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name))
1209+
1210+
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name)
1211+
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name)
1212+
1213+
framework.Logf("got pod logs: %v", logs)
1214+
cpus, err := cpuset.Parse(strings.TrimSpace(logs))
1215+
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name)
1216+
1217+
validateSMTAlignment(cpus, smtLevel, pod, &cnt)
1218+
gomega.Expect(cpus).To(BePackedCPUs())
1219+
}
1220+
deletePodSyncByName(ctx, f, pod.Name)
1221+
// we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state.
1222+
// this is in turn needed because we will have an unavoidable (in the current framework) race with th
1223+
// reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire
1224+
waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace)
1225+
})
1226+
1227+
ginkgo.It("should assign CPUs distributed across NUMA with distribute-cpus-across-numa and pcpu-only policy options enabled", func(ctx context.Context) {
1228+
var cpusNumPerNUMA, coresNumPerNUMA, numaNodeNum, threadsPerCore int
1229+
1230+
fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption)
1231+
_, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f)
1232+
smtLevel := getSMTLevel()
1233+
framework.Logf("SMT level %d", smtLevel)
1234+
1235+
// strict SMT alignment is trivially verified and granted on non-SMT systems
1236+
if smtLevel < minSMTLevel {
1237+
e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt)
1238+
}
1239+
1240+
// our tests want to allocate a full core, so we need at least 2*2=4 virtual cpus
1241+
minCPUCount := int64(smtLevel * minCPUCapacity)
1242+
if cpuAlloc < minCPUCount {
1243+
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < %d", fullCPUsOnlyOpt, minCPUCount)
1244+
}
1245+
1246+
// this test is intended to be run on a multi-node NUMA system and
1247+
// a system with at least 4 cores per socket, hostcheck skips test
1248+
// if above requirements are not satisfied
1249+
numaNodeNum, coresNumPerNUMA, threadsPerCore = hostCheck()
1250+
cpusNumPerNUMA = coresNumPerNUMA * threadsPerCore
1251+
1252+
framework.Logf("numaNodes on the system %d", numaNodeNum)
1253+
framework.Logf("Cores per NUMA on the system %d", coresNumPerNUMA)
1254+
framework.Logf("Threads per Core on the system %d", threadsPerCore)
1255+
framework.Logf("CPUs per NUMA on the system %d", cpusNumPerNUMA)
1256+
1257+
cpuPolicyOptions := map[string]string{
1258+
cpumanager.FullPCPUsOnlyOption: "true",
1259+
cpumanager.DistributeCPUsAcrossNUMAOption: "true",
1260+
}
1261+
newCfg := configureCPUManagerInKubelet(oldCfg,
1262+
&cpuManagerKubeletArguments{
1263+
policyName: string(cpumanager.PolicyStatic),
1264+
reservedSystemCPUs: cpuset.New(0),
1265+
enableCPUManagerOptions: true,
1266+
options: cpuPolicyOptions,
1267+
},
1268+
)
1269+
updateKubeletConfig(ctx, f, newCfg, true)
1270+
// 'distribute-cpus-across-numa' policy option ensures that CPU allocations are evenly distributed
1271+
// across NUMA nodes in cases where more than one NUMA node is required to satisfy the allocation.
1272+
// So, we want to ensure that the CPU Request exceeds the number of CPUs that can fit within a single
1273+
// NUMA node. We have to pick cpuRequest such that:
1274+
// 1. CPURequest > cpusNumPerNUMA
1275+
// 2. Not occupy all the CPUs on the node ande leave room for reserved CPU
1276+
// 3. CPURequest is a multiple if number of NUMA nodes to allow equal CPU distribution across NUMA nodes
1277+
//
1278+
// In summary: cpusNumPerNUMA < CPURequest < ((cpusNumPerNuma * numaNodeNum) - reservedCPUscount)
1279+
// Considering all these constraints we select: CPURequest= (cpusNumPerNUMA-smtLevel)*numaNodeNum
1280+
1281+
cpuReq := (cpusNumPerNUMA - smtLevel) * numaNodeNum
1282+
ctnAttrs := []ctnAttribute{
1283+
{
1284+
ctnName: "test-gu-container-distribute-cpus-across-numa",
1285+
cpuRequest: fmt.Sprintf("%d", cpuReq),
1286+
cpuLimit: fmt.Sprintf("%d", cpuReq),
1287+
},
1288+
}
1289+
pod := makeCPUManagerPod("test-pod-distribute-cpus-across-numa", ctnAttrs)
1290+
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
1291+
1292+
for _, cnt := range pod.Spec.Containers {
1293+
ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name))
1294+
1295+
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name)
1296+
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name)
1297+
1298+
framework.Logf("got pod logs: %v", logs)
1299+
cpus, err := cpuset.Parse(strings.TrimSpace(logs))
1300+
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name)
1301+
1302+
validateSMTAlignment(cpus, smtLevel, pod, &cnt)
1303+
// We expect a perfectly even spilit i.e. equal distribution across NUMA Node as the CPU Request is 4*smtLevel*numaNodeNum.
1304+
expectedSpread := cpus.Size() / numaNodeNum
1305+
gomega.Expect(cpus).To(BeDistributedCPUs(expectedSpread))
1306+
}
1307+
deletePodSyncByName(ctx, f, pod.Name)
1308+
// we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state.
1309+
// this is in turn needed because we will have an unavoidable (in the current framework) race with th
1310+
// reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire
1311+
waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace)
1312+
})
1313+
11561314
ginkgo.AfterEach(func(ctx context.Context) {
11571315
updateKubeletConfig(ctx, f, oldCfg, true)
11581316
})
@@ -1260,6 +1418,78 @@ func isSMTAlignmentError(pod *v1.Pod) bool {
12601418
return re.MatchString(pod.Status.Reason)
12611419
}
12621420

1421+
// getNumaNodeCPUs retrieves CPUs for each NUMA node.
1422+
func getNumaNodeCPUs() (map[int]cpuset.CPUSet, error) {
1423+
numaNodes := make(map[int]cpuset.CPUSet)
1424+
nodePaths, err := filepath.Glob("/sys/devices/system/node/node*/cpulist")
1425+
if err != nil {
1426+
return nil, err
1427+
}
1428+
1429+
for _, nodePath := range nodePaths {
1430+
data, err := os.ReadFile(nodePath)
1431+
framework.ExpectNoError(err, "Error obtaning CPU information from the node")
1432+
cpuSet := strings.TrimSpace(string(data))
1433+
cpus, err := cpuset.Parse(cpuSet)
1434+
framework.ExpectNoError(err, "Error parsing CPUset")
1435+
1436+
// Extract node ID from path (e.g., "node0" -> 0)
1437+
base := filepath.Base(filepath.Dir(nodePath))
1438+
nodeID, err := strconv.Atoi(strings.TrimPrefix(base, "node"))
1439+
if err != nil {
1440+
continue
1441+
}
1442+
numaNodes[nodeID] = cpus
1443+
}
1444+
1445+
return numaNodes, nil
1446+
}
1447+
1448+
// computeNUMADistribution calculates CPU distribution per NUMA node.
1449+
func computeNUMADistribution(allocatedCPUs cpuset.CPUSet) map[int]int {
1450+
numaCPUs, err := getNumaNodeCPUs()
1451+
framework.ExpectNoError(err, "Error retrieving NUMA nodes")
1452+
framework.Logf("NUMA Node CPUs allocation: %v", numaCPUs)
1453+
1454+
distribution := make(map[int]int)
1455+
for node, cpus := range numaCPUs {
1456+
distribution[node] = cpus.Intersection(allocatedCPUs).Size()
1457+
}
1458+
1459+
framework.Logf("allocated CPUs %s distribution: %v", allocatedCPUs.String(), distribution)
1460+
return distribution
1461+
}
1462+
1463+
// Custom matcher for checking packed CPUs.
1464+
func BePackedCPUs() gomegatypes.GomegaMatcher {
1465+
return gcustom.MakeMatcher(func(allocatedCPUs cpuset.CPUSet) (bool, error) {
1466+
distribution := computeNUMADistribution(allocatedCPUs)
1467+
for _, count := range distribution {
1468+
// This assumption holds true if there are enough CPUs on a single NUMA node.
1469+
// We are intentionally limiting the CPU request to 2 to minimize the number
1470+
// of CPUs required to fulfill this case and therefore maximize the chances
1471+
// of correctly validating this case.
1472+
if count == allocatedCPUs.Size() {
1473+
return true, nil
1474+
}
1475+
}
1476+
return false, nil
1477+
}).WithMessage("expected CPUs to be packed")
1478+
}
1479+
1480+
// Custom matcher for checking distributed CPUs.
1481+
func BeDistributedCPUs(expectedSpread int) gomegatypes.GomegaMatcher {
1482+
return gcustom.MakeMatcher(func(allocatedCPUs cpuset.CPUSet) (bool, error) {
1483+
distribution := computeNUMADistribution(allocatedCPUs)
1484+
for _, count := range distribution {
1485+
if count != expectedSpread {
1486+
return false, nil
1487+
}
1488+
}
1489+
return true, nil
1490+
}).WithTemplate("expected CPUs to be evenly distributed across NUMA nodes\nExpected: {{.Data}}\nGot:\n{{.FormattedActual}}\nDistribution: {{.Data}}\n").WithTemplateData(expectedSpread)
1491+
}
1492+
12631493
// Serial because the test updates kubelet configuration.
12641494
var _ = SIGDescribe("CPU Manager", framework.WithSerial(), feature.CPUManager, func() {
12651495
f := framework.NewDefaultFramework("cpu-manager-test")

0 commit comments

Comments
 (0)