Skip to content

NodeResourceTopology updates to match latest kubelet features #920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 50 additions & 55 deletions pkg/noderesourcetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,52 +35,40 @@ import (
"sigs.k8s.io/scheduler-plugins/pkg/util"
)

// The maximum number of NUMA nodes that Topology Manager allows is 8
// https://kubernetes.io/docs/tasks/administer-cluster/topology-manager/#known-limitations
const highestNUMAID = 8

type PolicyHandler func(pod *v1.Pod, zoneMap topologyv1alpha2.ZoneList) *framework.Status

func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, nodeInfo *framework.NodeInfo) *framework.Status {
lh.V(5).Info("container level single NUMA node handler")

// prepare NUMANodes list from zoneMap
nodes := createNUMANodeList(lh, zones)
qos := v1qos.GetPodQOS(pod)

// Node() != nil already verified in Filter(), which is the only public entry point
logNumaNodes(lh, "container handler NUMA resources", nodeInfo.Node().Name, nodes)

func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, info *filterInfo) *framework.Status {
// the init containers are running SERIALLY and BEFORE the normal containers.
// https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#understanding-init-containers
// therefore, we don't need to accumulate their resources together
for _, initContainer := range pod.Spec.InitContainers {
// TODO: handle sidecar explicitely (new kind)
clh := lh.WithValues(logging.KeyContainer, initContainer.Name, logging.KeyContainerKind, logging.KindContainerInit)
cntKind := logging.GetInitContainerKind(&initContainer)
clh := lh.WithValues(logging.KeyContainer, initContainer.Name, logging.KeyContainerKind, cntKind)
clh.V(6).Info("desired resources", stringify.ResourceListToLoggable(initContainer.Resources.Requests)...)

_, match := resourcesAvailableInAnyNUMANodes(clh, nodes, initContainer.Resources.Requests, qos, nodeInfo)
_, match, reason := resourcesAvailableInAnyNUMANodes(clh, info, initContainer.Resources.Requests)
if !match {
msg := "cannot align " + cntKind + " container"
// we can't align init container, so definitely we can't align a pod
clh.V(2).Info("cannot align container")
return framework.NewStatus(framework.Unschedulable, "cannot align init container")
clh.V(2).Info(msg, "reason", reason)
return framework.NewStatus(framework.Unschedulable, msg)
}
}

for _, container := range pod.Spec.Containers {
clh := lh.WithValues(logging.KeyContainer, container.Name, logging.KeyContainerKind, logging.KindContainerApp)
clh.V(6).Info("container requests", stringify.ResourceListToLoggable(container.Resources.Requests)...)

numaID, match := resourcesAvailableInAnyNUMANodes(clh, nodes, container.Resources.Requests, qos, nodeInfo)
numaID, match, reason := resourcesAvailableInAnyNUMANodes(clh, info, container.Resources.Requests)
if !match {
// we can't align container, so definitely we can't align a pod
clh.V(2).Info("cannot align container")
clh.V(2).Info("cannot align container", "reason", reason)
return framework.NewStatus(framework.Unschedulable, "cannot align container")
}

// subtract the resources requested by the container from the given NUMA.
// this is necessary, so we won't allocate the same resources for the upcoming containers
err := subtractResourcesFromNUMANodeList(clh, nodes, numaID, qos, container.Resources.Requests)
err := subtractResourcesFromNUMANodeList(clh, info.numaNodes, numaID, info.qos, container.Resources.Requests)
if err != nil {
// this is an internal error which should never happen
return framework.NewStatus(framework.Error, "inconsistent resource accounting", err.Error())
Expand All @@ -92,60 +80,64 @@ func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, zones topology

// resourcesAvailableInAnyNUMANodes checks for sufficient resource and return the NUMAID that would be selected by Kubelet.
// this function requires NUMANodeList with properly populated NUMANode, NUMAID should be in range 0-63
func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, resources v1.ResourceList, qos v1.PodQOSClass, nodeInfo *framework.NodeInfo) (int, bool) {
numaID := highestNUMAID
func resourcesAvailableInAnyNUMANodes(lh logr.Logger, info *filterInfo, resources v1.ResourceList) (int, bool, string) {
numaID := info.topologyManager.MaxNUMANodes // highest NUMA ID
bitmask := bm.NewEmptyBitMask()
// set all bits, each bit is a NUMA node, if resources couldn't be aligned
// on the NUMA node, bit should be unset
bitmask.Fill()

nodeResources := util.ResourceList(nodeInfo.Allocatable)
nodeResources := util.ResourceList(info.node.Allocatable)

for resource, quantity := range resources {
clh := lh.WithValues("resource", resource)
if quantity.IsZero() {
// why bother? everything's fine from the perspective of this resource
lh.V(4).Info("ignoring zero-qty resource request", "resource", resource)
clh.V(4).Info("ignoring zero-qty resource request")
continue
}

if _, ok := nodeResources[resource]; !ok {
// some resources may not expose NUMA affinity (device plugins, extended resources), but all resources
// must be reported at node level; thus, if they are not present at node level, we can safely assume
// we don't have the resource at all.
lh.V(2).Info("early verdict: cannot meet request", "resource", resource, "suitable", "false")
return numaID, false
clh.V(2).Info("early verdict: cannot meet request")
return -1, false, string(resource)
}

// for each requested resource, calculate which NUMA slots are good fits, and then AND with the aggregated bitmask, IOW unset appropriate bit if we can't align resources, or set it
// obvious, bits which are not in the NUMA id's range would be unset
hasNUMAAffinity := false
resourceBitmask := bm.NewEmptyBitMask()
for _, numaNode := range numaNodes {
for _, numaNode := range info.numaNodes {
nlh := clh.WithValues("numaCell", numaNode.NUMAID)
numaQuantity, ok := numaNode.Resources[resource]
if !ok {
nlh.V(6).Info("missing")
continue
}

hasNUMAAffinity = true
if !isResourceSetSuitable(qos, resource, quantity, numaQuantity) {
if !isResourceSetSuitable(info.qos, resource, quantity, numaQuantity) {
nlh.V(6).Info("discarded", "quantity", quantity.String(), "numaQuantity", numaQuantity.String())
continue
}

resourceBitmask.Add(numaNode.NUMAID)
lh.V(6).Info("feasible", "numaCell", numaNode.NUMAID, "resource", resource)
nlh.V(6).Info("feasible")
}

// non-native resources or ephemeral-storage may not expose NUMA affinity,
// but since they are available at node level, this is fine
if !hasNUMAAffinity && isHostLevelResource(resource) {
lh.V(6).Info("resource available at host level (no NUMA affinity)", "resource", resource)
clh.V(6).Info("resource available at host level (no NUMA affinity)")
continue
}

bitmask.And(resourceBitmask)
if bitmask.IsEmpty() {
lh.V(2).Info("early verdict", "resource", resource, "suitable", "false")
return numaID, false
lh.V(2).Info("early verdict: cannot find affinity")
return numaID, false, string(resource)
}
}
// according to TopologyManager, the preferred NUMA affinity, is the narrowest one.
Expand All @@ -157,23 +149,16 @@ func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, re
// at least one NUMA node is available
ret := !bitmask.IsEmpty()
lh.V(2).Info("final verdict", "suitable", ret, "numaCell", numaID)
return numaID, ret
return numaID, ret, "generic"
}

func singleNUMAPodLevelHandler(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, nodeInfo *framework.NodeInfo) *framework.Status {
lh.V(5).Info("pod level single NUMA node handler")

func singleNUMAPodLevelHandler(lh logr.Logger, pod *v1.Pod, info *filterInfo) *framework.Status {
resources := util.GetPodEffectiveRequest(pod)

nodes := createNUMANodeList(lh, zones)

// Node() != nil already verified in Filter(), which is the only public entry point
logNumaNodes(lh, "pod handler NUMA resources", nodeInfo.Node().Name, nodes)
lh.V(6).Info("pod desired resources", stringify.ResourceListToLoggable(resources)...)

numaID, match := resourcesAvailableInAnyNUMANodes(lh, createNUMANodeList(lh, zones), resources, v1qos.GetPodQOS(pod), nodeInfo)
numaID, match, reason := resourcesAvailableInAnyNUMANodes(lh, info, resources)
if !match {
lh.V(2).Info("cannot align pod", "name", pod.Name)
lh.V(2).Info("cannot align pod", "name", pod.Name, "reason", reason)
return framework.NewStatus(framework.Unschedulable, "cannot align pod")
}
lh.V(4).Info("all container placed", "numaCell", numaID)
Expand All @@ -185,14 +170,14 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle
if nodeInfo.Node() == nil {
return framework.NewStatus(framework.Error, "node not found")
}
if v1qos.GetPodQOS(pod) == v1.PodQOSBestEffort && !resourcerequests.IncludeNonNative(pod) {
qos := v1qos.GetPodQOS(pod)
if qos == v1.PodQOSBestEffort && !resourcerequests.IncludeNonNative(pod) {
return nil
}

nodeName := nodeInfo.Node().Name

lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues("ExtensionPoint", "Filter").
WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)

lh.V(4).Info(logging.FlowBegin)
defer lh.V(4).Info(logging.FlowEnd)
Expand All @@ -211,26 +196,36 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle

lh.V(4).Info("found nrt data", "object", stringify.NodeResourceTopologyResources(nodeTopology), "conf", conf.String())

handler := filterHandlerFromTopologyManager(conf)
handler, boundary := filterHandlerFromTopologyManager(conf)
if handler == nil {
return nil
}
status := handler(lh, pod, nodeTopology.Zones, nodeInfo)

numaNodes := createNUMANodeList(lh, nodeTopology.Zones)
lh.V(4).Info("aligning resources", "boundary", boundary, "numaCells", len(numaNodes))
fi := filterInfo{
nodeName: nodeName,
node: nodeInfo,
topologyManager: conf,
qos: qos,
numaNodes: numaNodes,
}
status := handler(lh, pod, &fi)
if status != nil {
tm.nrtCache.NodeMaybeOverReserved(nodeName, pod)
}
return status
}

func filterHandlerFromTopologyManager(conf nodeconfig.TopologyManager) filterFn {
func filterHandlerFromTopologyManager(conf nodeconfig.TopologyManager) (filterFn, string) {
if conf.Policy != kubeletconfig.SingleNumaNodeTopologyManagerPolicy {
return nil
return nil, ""
}
if conf.Scope == kubeletconfig.PodTopologyManagerScope {
return singleNUMAPodLevelHandler
return singleNUMAPodLevelHandler, "pod"
}
if conf.Scope == kubeletconfig.ContainerTopologyManagerScope {
return singleNUMAContainerLevelHandler
return singleNUMAContainerLevelHandler, "container"
}
return nil // cannot happen
return nil, "" // cannot happen
}
24 changes: 20 additions & 4 deletions pkg/noderesourcetopology/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package noderesourcetopology
import (
"context"
"fmt"
"reflect"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -696,7 +697,7 @@ func TestNodeResourceTopology(t *testing.T) {
}
gotStatus := tm.Filter(context.Background(), framework.NewCycleState(), tt.pod, nodeInfo)

if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
if !quasiEqualStatus(gotStatus, tt.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, tt.wantStatus)
}
})
Expand Down Expand Up @@ -913,7 +914,7 @@ func TestNodeResourceTopologyMultiContainerPodScope(t *testing.T) {
}
gotStatus := tm.Filter(context.Background(), framework.NewCycleState(), tt.pod, nodeInfo)

if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
if !quasiEqualStatus(gotStatus, tt.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, tt.wantStatus)
}
})
Expand Down Expand Up @@ -1172,7 +1173,7 @@ func TestNodeResourceTopologyMultiContainerContainerScope(t *testing.T) {
nodeInfo.SetNode(nodes[0])
gotStatus := tm.Filter(context.Background(), framework.NewCycleState(), tt.pod, nodeInfo)

if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
if !quasiEqualStatus(gotStatus, tt.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, tt.wantStatus)
}
})
Expand Down Expand Up @@ -1286,3 +1287,18 @@ func parseState(error string) *framework.Status {

return framework.NewStatus(framework.Unschedulable, error)
}

func quasiEqualStatus(s, x *framework.Status) bool {
if s == nil || x == nil {
return s.IsSuccess() && x.IsSuccess()
}
if s.Code() != x.Code() {
return false
}
sMsg := s.Message()
xMsg := x.Message()
if !strings.HasPrefix(sMsg, xMsg) {
return false
}
return cmp.Equal(s.Plugin(), x.Plugin())
}
30 changes: 11 additions & 19 deletions pkg/noderesourcetopology/least_numa.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package noderesourcetopology

import (
v1 "k8s.io/api/core/v1"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/go-logr/logr"
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"gonum.org/v1/gonum/stat/combin"

"sigs.k8s.io/scheduler-plugins/pkg/util"
Expand All @@ -34,20 +32,17 @@ const (
maxDistanceValue = 255
)

func leastNUMAContainerScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList) (int64, *framework.Status) {
nodes := createNUMANodeList(lh, zones)
qos := v1qos.GetPodQOS(pod)

func leastNUMAContainerScopeScore(lh logr.Logger, pod *v1.Pod, info *scoreInfo) (int64, *framework.Status) {
maxNUMANodesCount := 0
allContainersMinAvgDistance := true
// the order how TopologyManager asks for hint is important so doing it in the same order
// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cm/topologymanager/scope_container.go#L52
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
// if a container requests only non NUMA just continue
if onlyNonNUMAResources(nodes, container.Resources.Requests) {
if onlyNonNUMAResources(info.numaNodes, container.Resources.Requests) {
continue
}
numaNodes, isMinAvgDistance := numaNodesRequired(lh, qos, nodes, container.Resources.Requests)
numaNodes, isMinAvgDistance := numaNodesRequired(lh, info.qos, info.numaNodes, container.Resources.Requests)
// container's resources can't fit onto node, return MinNodeScore for whole pod
if numaNodes == nil {
// score plugin should be running after resource filter plugin so we should always find sufficient amount of NUMA nodes
Expand All @@ -65,39 +60,36 @@ func leastNUMAContainerScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1a

// subtract the resources requested by the container from the given NUMA.
// this is necessary, so we won't allocate the same resources for the upcoming containers
subtractFromNUMAs(container.Resources.Requests, nodes, numaNodes.GetBits()...)
subtractFromNUMAs(container.Resources.Requests, info.numaNodes, numaNodes.GetBits()...)
}

if maxNUMANodesCount == 0 {
return framework.MaxNodeScore, nil
}

return normalizeScore(maxNUMANodesCount, allContainersMinAvgDistance), nil
return normalizeScore(maxNUMANodesCount, allContainersMinAvgDistance, info.topologyManager.MaxNUMANodes), nil
}

func leastNUMAPodScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList) (int64, *framework.Status) {
nodes := createNUMANodeList(lh, zones)
qos := v1qos.GetPodQOS(pod)

func leastNUMAPodScopeScore(lh logr.Logger, pod *v1.Pod, info *scoreInfo) (int64, *framework.Status) {
resources := util.GetPodEffectiveRequest(pod)
// if a pod requests only non NUMA resources return max score
if onlyNonNUMAResources(nodes, resources) {
if onlyNonNUMAResources(info.numaNodes, resources) {
return framework.MaxNodeScore, nil
}

numaNodes, isMinAvgDistance := numaNodesRequired(lh, qos, nodes, resources)
numaNodes, isMinAvgDistance := numaNodesRequired(lh, info.qos, info.numaNodes, resources)
// pod's resources can't fit onto node, return MinNodeScore
if numaNodes == nil {
// score plugin should be running after resource filter plugin so we should always find sufficient amount of NUMA nodes
lh.Info("cannot calculate how many NUMA nodes are required")
return framework.MinNodeScore, nil
}

return normalizeScore(numaNodes.Count(), isMinAvgDistance), nil
return normalizeScore(numaNodes.Count(), isMinAvgDistance, info.topologyManager.MaxNUMANodes), nil
}

func normalizeScore(numaNodesCount int, isMinAvgDistance bool) int64 {
numaNodeScore := framework.MaxNodeScore / highestNUMAID
func normalizeScore(numaNodesCount int, isMinAvgDistance bool, highestNUMAID int) int64 {
numaNodeScore := framework.MaxNodeScore / int64(highestNUMAID)
score := framework.MaxNodeScore - int64(numaNodesCount)*numaNodeScore
if isMinAvgDistance {
// if distance between NUMA domains is optimal add half of numaNodeScore to make this node more favorable
Expand Down
3 changes: 2 additions & 1 deletion pkg/noderesourcetopology/least_numa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/nodeconfig"
)

const (
Expand Down Expand Up @@ -746,7 +747,7 @@ func TestNormalizeScore(t *testing.T) {

for _, tc := range tcases {
t.Run(tc.description, func(t *testing.T) {
normalizedScore := normalizeScore(tc.score, tc.optimalDistance)
normalizedScore := normalizeScore(tc.score, tc.optimalDistance, nodeconfig.DefaultMaxNUMANodes)
if normalizedScore != tc.expectedScore {
t.Errorf("Expected normalizedScore to be %d not %d", tc.expectedScore, normalizedScore)
}
Expand Down
Loading