diff --git a/pkg/noderesourcetopology/filter.go b/pkg/noderesourcetopology/filter.go index b0dbeb324..b17890b12 100644 --- a/pkg/noderesourcetopology/filter.go +++ b/pkg/noderesourcetopology/filter.go @@ -35,35 +35,23 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/util" ) -// The maximum number of NUMA nodes that Topology Manager allows is 8 -// https://kubernetes.io/docs/tasks/administer-cluster/topology-manager/#known-limitations -const highestNUMAID = 8 - type PolicyHandler func(pod *v1.Pod, zoneMap topologyv1alpha2.ZoneList) *framework.Status -func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, nodeInfo *framework.NodeInfo) *framework.Status { - lh.V(5).Info("container level single NUMA node handler") - - // prepare NUMANodes list from zoneMap - nodes := createNUMANodeList(lh, zones) - qos := v1qos.GetPodQOS(pod) - - // Node() != nil already verified in Filter(), which is the only public entry point - logNumaNodes(lh, "container handler NUMA resources", nodeInfo.Node().Name, nodes) - +func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, info *filterInfo) *framework.Status { // the init containers are running SERIALLY and BEFORE the normal containers. // https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#understanding-init-containers // therefore, we don't need to accumulate their resources together for _, initContainer := range pod.Spec.InitContainers { - // TODO: handle sidecar explicitely (new kind) - clh := lh.WithValues(logging.KeyContainer, initContainer.Name, logging.KeyContainerKind, logging.KindContainerInit) + cntKind := logging.GetInitContainerKind(&initContainer) + clh := lh.WithValues(logging.KeyContainer, initContainer.Name, logging.KeyContainerKind, cntKind) clh.V(6).Info("desired resources", stringify.ResourceListToLoggable(initContainer.Resources.Requests)...) - _, match := resourcesAvailableInAnyNUMANodes(clh, nodes, initContainer.Resources.Requests, qos, nodeInfo) + _, match, reason := resourcesAvailableInAnyNUMANodes(clh, info, initContainer.Resources.Requests) if !match { + msg := "cannot align " + cntKind + " container" // we can't align init container, so definitely we can't align a pod - clh.V(2).Info("cannot align container") - return framework.NewStatus(framework.Unschedulable, "cannot align init container") + clh.V(2).Info(msg, "reason", reason) + return framework.NewStatus(framework.Unschedulable, msg) } } @@ -71,16 +59,16 @@ func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, zones topology clh := lh.WithValues(logging.KeyContainer, container.Name, logging.KeyContainerKind, logging.KindContainerApp) clh.V(6).Info("container requests", stringify.ResourceListToLoggable(container.Resources.Requests)...) - numaID, match := resourcesAvailableInAnyNUMANodes(clh, nodes, container.Resources.Requests, qos, nodeInfo) + numaID, match, reason := resourcesAvailableInAnyNUMANodes(clh, info, container.Resources.Requests) if !match { // we can't align container, so definitely we can't align a pod - clh.V(2).Info("cannot align container") + clh.V(2).Info("cannot align container", "reason", reason) return framework.NewStatus(framework.Unschedulable, "cannot align container") } // subtract the resources requested by the container from the given NUMA. // this is necessary, so we won't allocate the same resources for the upcoming containers - err := subtractResourcesFromNUMANodeList(clh, nodes, numaID, qos, container.Resources.Requests) + err := subtractResourcesFromNUMANodeList(clh, info.numaNodes, numaID, info.qos, container.Resources.Requests) if err != nil { // this is an internal error which should never happen return framework.NewStatus(framework.Error, "inconsistent resource accounting", err.Error()) @@ -92,19 +80,20 @@ func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, zones topology // resourcesAvailableInAnyNUMANodes checks for sufficient resource and return the NUMAID that would be selected by Kubelet. // this function requires NUMANodeList with properly populated NUMANode, NUMAID should be in range 0-63 -func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, resources v1.ResourceList, qos v1.PodQOSClass, nodeInfo *framework.NodeInfo) (int, bool) { - numaID := highestNUMAID +func resourcesAvailableInAnyNUMANodes(lh logr.Logger, info *filterInfo, resources v1.ResourceList) (int, bool, string) { + numaID := info.topologyManager.MaxNUMANodes // highest NUMA ID bitmask := bm.NewEmptyBitMask() // set all bits, each bit is a NUMA node, if resources couldn't be aligned // on the NUMA node, bit should be unset bitmask.Fill() - nodeResources := util.ResourceList(nodeInfo.Allocatable) + nodeResources := util.ResourceList(info.node.Allocatable) for resource, quantity := range resources { + clh := lh.WithValues("resource", resource) if quantity.IsZero() { // why bother? everything's fine from the perspective of this resource - lh.V(4).Info("ignoring zero-qty resource request", "resource", resource) + clh.V(4).Info("ignoring zero-qty resource request") continue } @@ -112,40 +101,43 @@ func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, re // some resources may not expose NUMA affinity (device plugins, extended resources), but all resources // must be reported at node level; thus, if they are not present at node level, we can safely assume // we don't have the resource at all. - lh.V(2).Info("early verdict: cannot meet request", "resource", resource, "suitable", "false") - return numaID, false + clh.V(2).Info("early verdict: cannot meet request") + return -1, false, string(resource) } // for each requested resource, calculate which NUMA slots are good fits, and then AND with the aggregated bitmask, IOW unset appropriate bit if we can't align resources, or set it // obvious, bits which are not in the NUMA id's range would be unset hasNUMAAffinity := false resourceBitmask := bm.NewEmptyBitMask() - for _, numaNode := range numaNodes { + for _, numaNode := range info.numaNodes { + nlh := clh.WithValues("numaCell", numaNode.NUMAID) numaQuantity, ok := numaNode.Resources[resource] if !ok { + nlh.V(6).Info("missing") continue } hasNUMAAffinity = true - if !isResourceSetSuitable(qos, resource, quantity, numaQuantity) { + if !isResourceSetSuitable(info.qos, resource, quantity, numaQuantity) { + nlh.V(6).Info("discarded", "quantity", quantity.String(), "numaQuantity", numaQuantity.String()) continue } resourceBitmask.Add(numaNode.NUMAID) - lh.V(6).Info("feasible", "numaCell", numaNode.NUMAID, "resource", resource) + nlh.V(6).Info("feasible") } // non-native resources or ephemeral-storage may not expose NUMA affinity, // but since they are available at node level, this is fine if !hasNUMAAffinity && isHostLevelResource(resource) { - lh.V(6).Info("resource available at host level (no NUMA affinity)", "resource", resource) + clh.V(6).Info("resource available at host level (no NUMA affinity)") continue } bitmask.And(resourceBitmask) if bitmask.IsEmpty() { - lh.V(2).Info("early verdict", "resource", resource, "suitable", "false") - return numaID, false + lh.V(2).Info("early verdict: cannot find affinity") + return numaID, false, string(resource) } } // according to TopologyManager, the preferred NUMA affinity, is the narrowest one. @@ -157,23 +149,16 @@ func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, re // at least one NUMA node is available ret := !bitmask.IsEmpty() lh.V(2).Info("final verdict", "suitable", ret, "numaCell", numaID) - return numaID, ret + return numaID, ret, "generic" } -func singleNUMAPodLevelHandler(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, nodeInfo *framework.NodeInfo) *framework.Status { - lh.V(5).Info("pod level single NUMA node handler") - +func singleNUMAPodLevelHandler(lh logr.Logger, pod *v1.Pod, info *filterInfo) *framework.Status { resources := util.GetPodEffectiveRequest(pod) - - nodes := createNUMANodeList(lh, zones) - - // Node() != nil already verified in Filter(), which is the only public entry point - logNumaNodes(lh, "pod handler NUMA resources", nodeInfo.Node().Name, nodes) lh.V(6).Info("pod desired resources", stringify.ResourceListToLoggable(resources)...) - numaID, match := resourcesAvailableInAnyNUMANodes(lh, createNUMANodeList(lh, zones), resources, v1qos.GetPodQOS(pod), nodeInfo) + numaID, match, reason := resourcesAvailableInAnyNUMANodes(lh, info, resources) if !match { - lh.V(2).Info("cannot align pod", "name", pod.Name) + lh.V(2).Info("cannot align pod", "name", pod.Name, "reason", reason) return framework.NewStatus(framework.Unschedulable, "cannot align pod") } lh.V(4).Info("all container placed", "numaCell", numaID) @@ -185,14 +170,14 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle if nodeInfo.Node() == nil { return framework.NewStatus(framework.Error, "node not found") } - if v1qos.GetPodQOS(pod) == v1.PodQOSBestEffort && !resourcerequests.IncludeNonNative(pod) { + qos := v1qos.GetPodQOS(pod) + if qos == v1.PodQOSBestEffort && !resourcerequests.IncludeNonNative(pod) { return nil } nodeName := nodeInfo.Node().Name - lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues("ExtensionPoint", "Filter"). - WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) + lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) lh.V(4).Info(logging.FlowBegin) defer lh.V(4).Info(logging.FlowEnd) @@ -211,26 +196,36 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle lh.V(4).Info("found nrt data", "object", stringify.NodeResourceTopologyResources(nodeTopology), "conf", conf.String()) - handler := filterHandlerFromTopologyManager(conf) + handler, boundary := filterHandlerFromTopologyManager(conf) if handler == nil { return nil } - status := handler(lh, pod, nodeTopology.Zones, nodeInfo) + + numaNodes := createNUMANodeList(lh, nodeTopology.Zones) + lh.V(4).Info("aligning resources", "boundary", boundary, "numaCells", len(numaNodes)) + fi := filterInfo{ + nodeName: nodeName, + node: nodeInfo, + topologyManager: conf, + qos: qos, + numaNodes: numaNodes, + } + status := handler(lh, pod, &fi) if status != nil { tm.nrtCache.NodeMaybeOverReserved(nodeName, pod) } return status } -func filterHandlerFromTopologyManager(conf nodeconfig.TopologyManager) filterFn { +func filterHandlerFromTopologyManager(conf nodeconfig.TopologyManager) (filterFn, string) { if conf.Policy != kubeletconfig.SingleNumaNodeTopologyManagerPolicy { - return nil + return nil, "" } if conf.Scope == kubeletconfig.PodTopologyManagerScope { - return singleNUMAPodLevelHandler + return singleNUMAPodLevelHandler, "pod" } if conf.Scope == kubeletconfig.ContainerTopologyManagerScope { - return singleNUMAContainerLevelHandler + return singleNUMAContainerLevelHandler, "container" } - return nil // cannot happen + return nil, "" // cannot happen } diff --git a/pkg/noderesourcetopology/filter_test.go b/pkg/noderesourcetopology/filter_test.go index 4a19c6c6a..859b88a4c 100644 --- a/pkg/noderesourcetopology/filter_test.go +++ b/pkg/noderesourcetopology/filter_test.go @@ -19,9 +19,10 @@ package noderesourcetopology import ( "context" "fmt" - "reflect" + "strings" "testing" + "github.com/google/go-cmp/cmp" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" v1 "k8s.io/api/core/v1" @@ -696,7 +697,7 @@ func TestNodeResourceTopology(t *testing.T) { } gotStatus := tm.Filter(context.Background(), framework.NewCycleState(), tt.pod, nodeInfo) - if !reflect.DeepEqual(gotStatus, tt.wantStatus) { + if !quasiEqualStatus(gotStatus, tt.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, tt.wantStatus) } }) @@ -913,7 +914,7 @@ func TestNodeResourceTopologyMultiContainerPodScope(t *testing.T) { } gotStatus := tm.Filter(context.Background(), framework.NewCycleState(), tt.pod, nodeInfo) - if !reflect.DeepEqual(gotStatus, tt.wantStatus) { + if !quasiEqualStatus(gotStatus, tt.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, tt.wantStatus) } }) @@ -1172,7 +1173,7 @@ func TestNodeResourceTopologyMultiContainerContainerScope(t *testing.T) { nodeInfo.SetNode(nodes[0]) gotStatus := tm.Filter(context.Background(), framework.NewCycleState(), tt.pod, nodeInfo) - if !reflect.DeepEqual(gotStatus, tt.wantStatus) { + if !quasiEqualStatus(gotStatus, tt.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, tt.wantStatus) } }) @@ -1286,3 +1287,18 @@ func parseState(error string) *framework.Status { return framework.NewStatus(framework.Unschedulable, error) } + +func quasiEqualStatus(s, x *framework.Status) bool { + if s == nil || x == nil { + return s.IsSuccess() && x.IsSuccess() + } + if s.Code() != x.Code() { + return false + } + sMsg := s.Message() + xMsg := x.Message() + if !strings.HasPrefix(sMsg, xMsg) { + return false + } + return cmp.Equal(s.Plugin(), x.Plugin()) +} diff --git a/pkg/noderesourcetopology/least_numa.go b/pkg/noderesourcetopology/least_numa.go index cf14195ae..2805bdc28 100644 --- a/pkg/noderesourcetopology/least_numa.go +++ b/pkg/noderesourcetopology/least_numa.go @@ -18,12 +18,10 @@ package noderesourcetopology import ( v1 "k8s.io/api/core/v1" - v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/go-logr/logr" - topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "gonum.org/v1/gonum/stat/combin" "sigs.k8s.io/scheduler-plugins/pkg/util" @@ -34,20 +32,17 @@ const ( maxDistanceValue = 255 ) -func leastNUMAContainerScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList) (int64, *framework.Status) { - nodes := createNUMANodeList(lh, zones) - qos := v1qos.GetPodQOS(pod) - +func leastNUMAContainerScopeScore(lh logr.Logger, pod *v1.Pod, info *scoreInfo) (int64, *framework.Status) { maxNUMANodesCount := 0 allContainersMinAvgDistance := true // the order how TopologyManager asks for hint is important so doing it in the same order // https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cm/topologymanager/scope_container.go#L52 for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { // if a container requests only non NUMA just continue - if onlyNonNUMAResources(nodes, container.Resources.Requests) { + if onlyNonNUMAResources(info.numaNodes, container.Resources.Requests) { continue } - numaNodes, isMinAvgDistance := numaNodesRequired(lh, qos, nodes, container.Resources.Requests) + numaNodes, isMinAvgDistance := numaNodesRequired(lh, info.qos, info.numaNodes, container.Resources.Requests) // container's resources can't fit onto node, return MinNodeScore for whole pod if numaNodes == nil { // score plugin should be running after resource filter plugin so we should always find sufficient amount of NUMA nodes @@ -65,27 +60,24 @@ func leastNUMAContainerScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1a // subtract the resources requested by the container from the given NUMA. // this is necessary, so we won't allocate the same resources for the upcoming containers - subtractFromNUMAs(container.Resources.Requests, nodes, numaNodes.GetBits()...) + subtractFromNUMAs(container.Resources.Requests, info.numaNodes, numaNodes.GetBits()...) } if maxNUMANodesCount == 0 { return framework.MaxNodeScore, nil } - return normalizeScore(maxNUMANodesCount, allContainersMinAvgDistance), nil + return normalizeScore(maxNUMANodesCount, allContainersMinAvgDistance, info.topologyManager.MaxNUMANodes), nil } -func leastNUMAPodScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList) (int64, *framework.Status) { - nodes := createNUMANodeList(lh, zones) - qos := v1qos.GetPodQOS(pod) - +func leastNUMAPodScopeScore(lh logr.Logger, pod *v1.Pod, info *scoreInfo) (int64, *framework.Status) { resources := util.GetPodEffectiveRequest(pod) // if a pod requests only non NUMA resources return max score - if onlyNonNUMAResources(nodes, resources) { + if onlyNonNUMAResources(info.numaNodes, resources) { return framework.MaxNodeScore, nil } - numaNodes, isMinAvgDistance := numaNodesRequired(lh, qos, nodes, resources) + numaNodes, isMinAvgDistance := numaNodesRequired(lh, info.qos, info.numaNodes, resources) // pod's resources can't fit onto node, return MinNodeScore if numaNodes == nil { // score plugin should be running after resource filter plugin so we should always find sufficient amount of NUMA nodes @@ -93,11 +85,11 @@ func leastNUMAPodScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2. return framework.MinNodeScore, nil } - return normalizeScore(numaNodes.Count(), isMinAvgDistance), nil + return normalizeScore(numaNodes.Count(), isMinAvgDistance, info.topologyManager.MaxNUMANodes), nil } -func normalizeScore(numaNodesCount int, isMinAvgDistance bool) int64 { - numaNodeScore := framework.MaxNodeScore / highestNUMAID +func normalizeScore(numaNodesCount int, isMinAvgDistance bool, highestNUMAID int) int64 { + numaNodeScore := framework.MaxNodeScore / int64(highestNUMAID) score := framework.MaxNodeScore - int64(numaNodesCount)*numaNodeScore if isMinAvgDistance { // if distance between NUMA domains is optimal add half of numaNodeScore to make this node more favorable diff --git a/pkg/noderesourcetopology/least_numa_test.go b/pkg/noderesourcetopology/least_numa_test.go index f847b397a..91142850a 100644 --- a/pkg/noderesourcetopology/least_numa_test.go +++ b/pkg/noderesourcetopology/least_numa_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/nodeconfig" ) const ( @@ -746,7 +747,7 @@ func TestNormalizeScore(t *testing.T) { for _, tc := range tcases { t.Run(tc.description, func(t *testing.T) { - normalizedScore := normalizeScore(tc.score, tc.optimalDistance) + normalizedScore := normalizeScore(tc.score, tc.optimalDistance, nodeconfig.DefaultMaxNUMANodes) if normalizedScore != tc.expectedScore { t.Errorf("Expected normalizedScore to be %d not %d", tc.expectedScore, normalizedScore) } diff --git a/pkg/noderesourcetopology/logging/logging.go b/pkg/noderesourcetopology/logging/logging.go index 78bfa3998..2dcf3c09f 100644 --- a/pkg/noderesourcetopology/logging/logging.go +++ b/pkg/noderesourcetopology/logging/logging.go @@ -20,6 +20,7 @@ import ( "reflect" corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/scheduler-plugins/pkg/util" ) // well-known structured log keys @@ -44,8 +45,9 @@ const ( ) const ( - KindContainerInit string = "init" - KindContainerApp string = "app" + KindContainerInit string = "init" + KindContainerSidecar string = "sidecar" + KindContainerApp string = "app" ) const ( @@ -62,3 +64,10 @@ func PodUID(pod *corev1.Pod) string { } return string(pod.GetUID()) } + +func GetInitContainerKind(container *corev1.Container) string { + if util.IsSidecarInitContainer(container) { + return KindContainerSidecar + } + return KindContainerInit +} diff --git a/pkg/noderesourcetopology/nodeconfig/topologymanager.go b/pkg/noderesourcetopology/nodeconfig/topologymanager.go index 6880d2dba..3a6f218b5 100644 --- a/pkg/noderesourcetopology/nodeconfig/topologymanager.go +++ b/pkg/noderesourcetopology/nodeconfig/topologymanager.go @@ -18,6 +18,7 @@ package nodeconfig import ( "fmt" + "strconv" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -26,11 +27,16 @@ import ( ) const ( - AttributeScope = "topologyManagerScope" - AttributePolicy = "topologyManagerPolicy" + DefaultMaxNUMANodes = 8 // legacy setting and default value for TopologyManager. NOTE: kube doesn't expose this constant + + LimitNUMANodes = 1024 // basic sanitization, but we will likely need to bump soon enough ) -// TODO: handle topologyManagerPolicyOptions added in k8s 1.26 +const ( + AttributeScope = "topologyManagerScope" + AttributePolicy = "topologyManagerPolicy" + AttributeMaxNUMANodes = "topologyManagerMaxNUMANodes" +) func IsValidScope(scope string) bool { if scope == kubeletconfig.ContainerTopologyManagerScope || scope == kubeletconfig.PodTopologyManagerScope { @@ -47,15 +53,25 @@ func IsValidPolicy(policy string) bool { return false } +func IsValidMaxNUMANodes(value int) bool { + // machines always report at least 1 NUMA node anyway, and furthermore the value is used in a division, + // so we need to enforce 1 (not 0) as minimum + // NOTE: there's no legit upper bound, so care must be taken to cap this value. But still, theoretically + // 4096 NUMA nodes is a valid legal value. + return value > 1 +} + type TopologyManager struct { - Scope string - Policy string + Scope string + Policy string + MaxNUMANodes int } func TopologyManagerDefaults() TopologyManager { return TopologyManager{ - Scope: kubeletconfig.ContainerTopologyManagerScope, - Policy: kubeletconfig.NoneTopologyManagerPolicy, + Scope: kubeletconfig.ContainerTopologyManagerScope, + Policy: kubeletconfig.NoneTopologyManagerPolicy, + MaxNUMANodes: DefaultMaxNUMANodes, } } @@ -65,12 +81,12 @@ func TopologyManagerFromNodeResourceTopology(lh logr.Logger, nodeTopology *topol // Backward compatibility (v1alpha2 and previous). Deprecated, will be removed when the NRT API moves to v1beta1. cfg.updateFromPolicies(lh, nodeTopology.Name, nodeTopology.TopologyPolicies) // preferred new configuration source (v1alpha2 and onwards) - cfg.updateFromAttributes(nodeTopology.Attributes) + cfg.updateFromAttributes(lh, nodeTopology.Attributes) return conf } func (conf TopologyManager) String() string { - return fmt.Sprintf("policy=%q scope=%q", conf.Policy, conf.Scope) + return fmt.Sprintf("policy=%s scope=%s maxNUMANodes=%d", conf.Policy, conf.Scope, conf.MaxNUMANodes) } func (conf TopologyManager) Equal(other TopologyManager) bool { @@ -80,10 +96,10 @@ func (conf TopologyManager) Equal(other TopologyManager) bool { if conf.Policy != other.Policy { return false } - return true + return conf.MaxNUMANodes == other.MaxNUMANodes } -func (conf *TopologyManager) updateFromAttributes(attrs topologyv1alpha2.AttributeList) { +func (conf *TopologyManager) updateFromAttributes(lh logr.Logger, attrs topologyv1alpha2.AttributeList) { for _, attr := range attrs { if attr.Name == AttributeScope && IsValidScope(attr.Value) { conf.Scope = attr.Value @@ -93,8 +109,22 @@ func (conf *TopologyManager) updateFromAttributes(attrs topologyv1alpha2.Attribu conf.Policy = attr.Value continue } - // TODO: handle topologyManagerPolicyOptions added in k8s 1.26 + if attr.Name == AttributeMaxNUMANodes { + if val, err := strconv.Atoi(attr.Value); err == nil && IsValidMaxNUMANodes(val) { + conf.MaxNUMANodes = clampMaxNUMANodes(lh, val) + continue + } + } + } +} + +func clampMaxNUMANodes(lh logr.Logger, val int) int { + if val > LimitNUMANodes { + // should never happen, so we are verbose + lh.Info("capped MaxNUMANodes value to limit", "value", val, "limit", LimitNUMANodes) + val = LimitNUMANodes } + return val } func (conf *TopologyManager) updateFromPolicies(lh logr.Logger, nodeName string, topologyPolicies []string) { diff --git a/pkg/noderesourcetopology/nodeconfig/topologymanager_test.go b/pkg/noderesourcetopology/nodeconfig/topologymanager_test.go index 04bf3c783..5ed0c1e52 100644 --- a/pkg/noderesourcetopology/nodeconfig/topologymanager_test.go +++ b/pkg/noderesourcetopology/nodeconfig/topologymanager_test.go @@ -136,12 +136,14 @@ func TestTopologyManagerEqual(t *testing.T) { { name: "matching", tmA: TopologyManager{ - Scope: "container", - Policy: "single-numa-node", + Scope: "container", + Policy: "single-numa-node", + MaxNUMANodes: 9, // gracehopper }, tmB: TopologyManager{ - Scope: "container", - Policy: "single-numa-node", + Scope: "container", + Policy: "single-numa-node", + MaxNUMANodes: 9, // gracehopper }, expected: true, }, @@ -181,6 +183,25 @@ func TestTopologyManagerEqual(t *testing.T) { }, expected: false, }, + { + name: "nodes diff vs nil", + tmA: TopologyManager{ + MaxNUMANodes: 16, + }, + tmB: TopologyManager{}, + expected: false, + }, + { + name: "nodes diff", + tmA: TopologyManager{ + MaxNUMANodes: 16, + }, + tmB: TopologyManager{ + MaxNUMANodes: 9, // gracehopper + }, + expected: false, + }, + { name: "scope diff, policy matching", tmA: TopologyManager{ @@ -205,6 +226,19 @@ func TestTopologyManagerEqual(t *testing.T) { }, expected: false, }, + { + name: "scope, policy matching, nodes diff", + tmA: TopologyManager{ + Scope: "container", + Policy: "single-numa-node", + MaxNUMANodes: 9, + }, + tmB: TopologyManager{ + Scope: "container", + Policy: "best-effort", + }, + expected: false, + }, } for _, tt := range tests { @@ -324,13 +358,67 @@ func TestConfigFromAttributes(t *testing.T) { Policy: kubeletconfig.RestrictedTopologyManagerPolicy, }, }, + { + name: "invalid-nodes-string", + attrs: topologyv1alpha2.AttributeList{ + { + Name: "topologyManagerMaxNUMANodes", + Value: "A", + }, + }, + expected: TopologyManager{}, + }, + { + name: "invalid-nodes-zero", + attrs: topologyv1alpha2.AttributeList{ + { + Name: "topologyManagerMaxNUMANodes", + Value: "0", + }, + }, + expected: TopologyManager{}, + }, + { + name: "invalid-nodes-negative", + attrs: topologyv1alpha2.AttributeList{ + { + Name: "topologyManagerMaxNUMANodes", + Value: "-2", + }, + }, + expected: TopologyManager{}, + }, + { + name: "valid-nodes", + attrs: topologyv1alpha2.AttributeList{ + { + Name: "topologyManagerMaxNUMANodes", + Value: "16", + }, + }, + expected: TopologyManager{ + MaxNUMANodes: 16, + }, + }, + { + name: "valid-nodes-upper-bound", + attrs: topologyv1alpha2.AttributeList{ + { + Name: "topologyManagerMaxNUMANodes", + Value: "65535", + }, + }, + expected: TopologyManager{ + MaxNUMANodes: LimitNUMANodes, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := TopologyManager{} cfg := &got // shortcut - cfg.updateFromAttributes(tt.attrs) + cfg.updateFromAttributes(klog.Background(), tt.attrs) if !reflect.DeepEqual(got, tt.expected) { t.Errorf("conf got=%+#v expected=%+#v", got, tt.expected) } @@ -427,8 +515,9 @@ func TestConfigFromNRT(t *testing.T) { }, }, expected: TopologyManager{ - Policy: kubeletconfig.BestEffortTopologyManagerPolicy, - Scope: kubeletconfig.PodTopologyManagerScope, + Policy: kubeletconfig.BestEffortTopologyManagerPolicy, + Scope: kubeletconfig.PodTopologyManagerScope, + MaxNUMANodes: DefaultMaxNUMANodes, }, }, { @@ -440,8 +529,9 @@ func TestConfigFromNRT(t *testing.T) { }, }, expected: TopologyManager{ - Policy: kubeletconfig.RestrictedTopologyManagerPolicy, - Scope: kubeletconfig.ContainerTopologyManagerScope, + Policy: kubeletconfig.RestrictedTopologyManagerPolicy, + Scope: kubeletconfig.ContainerTopologyManagerScope, + MaxNUMANodes: DefaultMaxNUMANodes, }, }, { @@ -455,8 +545,9 @@ func TestConfigFromNRT(t *testing.T) { }, }, expected: TopologyManager{ - Policy: kubeletconfig.RestrictedTopologyManagerPolicy, - Scope: kubeletconfig.ContainerTopologyManagerScope, + Policy: kubeletconfig.RestrictedTopologyManagerPolicy, + Scope: kubeletconfig.ContainerTopologyManagerScope, + MaxNUMANodes: DefaultMaxNUMANodes, }, }, { @@ -473,8 +564,9 @@ func TestConfigFromNRT(t *testing.T) { }, }, expected: TopologyManager{ - Policy: kubeletconfig.BestEffortTopologyManagerPolicy, - Scope: kubeletconfig.ContainerTopologyManagerScope, + Policy: kubeletconfig.BestEffortTopologyManagerPolicy, + Scope: kubeletconfig.ContainerTopologyManagerScope, + MaxNUMANodes: DefaultMaxNUMANodes, }, }, { @@ -495,8 +587,9 @@ func TestConfigFromNRT(t *testing.T) { }, }, expected: TopologyManager{ - Policy: kubeletconfig.RestrictedTopologyManagerPolicy, - Scope: kubeletconfig.ContainerTopologyManagerScope, + Policy: kubeletconfig.RestrictedTopologyManagerPolicy, + Scope: kubeletconfig.ContainerTopologyManagerScope, + MaxNUMANodes: DefaultMaxNUMANodes, }, }, } diff --git a/pkg/noderesourcetopology/plugin.go b/pkg/noderesourcetopology/plugin.go index bb6a2727d..3ca01eecc 100644 --- a/pkg/noderesourcetopology/plugin.go +++ b/pkg/noderesourcetopology/plugin.go @@ -30,6 +30,7 @@ import ( apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" "sigs.k8s.io/scheduler-plugins/apis/config/validation" nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/nodeconfig" "github.com/go-logr/logr" topologyapi "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology" @@ -48,8 +49,23 @@ func init() { utilruntime.Must(topologyv1alpha2.AddToScheme(scheme)) } -type filterFn func(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, nodeInfo *framework.NodeInfo) *framework.Status -type scoringFn func(logr.Logger, *v1.Pod, topologyv1alpha2.ZoneList) (int64, *framework.Status) +type filterInfo struct { + nodeName string // shortcut, used very often + node *framework.NodeInfo + topologyManager nodeconfig.TopologyManager + qos v1.PodQOSClass + numaNodes NUMANodeList +} + +type filterFn func(logr.Logger, *v1.Pod, *filterInfo) *framework.Status + +type scoreInfo struct { + topologyManager nodeconfig.TopologyManager + qos v1.PodQOSClass + numaNodes NUMANodeList +} + +type scoringFn func(logr.Logger, *v1.Pod, *scoreInfo) (int64, *framework.Status) // TopologyMatch plugin which run simplified version of TopologyManager's admit handler type TopologyMatch struct { @@ -73,7 +89,7 @@ func (tm *TopologyMatch) Name() string { // New initializes a new plugin and returns it. func New(ctx context.Context, args runtime.Object, handle framework.Handle) (framework.Plugin, error) { - lh := klog.FromContext(ctx).WithValues("plugin", Name) + lh := klog.FromContext(ctx) lh.V(5).Info("creating new noderesourcetopology plugin") tcfg, ok := args.(*apiconfig.NodeResourceTopologyMatchArgs) diff --git a/pkg/noderesourcetopology/pluginhelpers.go b/pkg/noderesourcetopology/pluginhelpers.go index 74c71f9a5..f6eba0e86 100644 --- a/pkg/noderesourcetopology/pluginhelpers.go +++ b/pkg/noderesourcetopology/pluginhelpers.go @@ -27,6 +27,7 @@ import ( "github.com/go-logr/logr" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" + "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2/helper" "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2/helper/numanode" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -105,7 +106,7 @@ func createNUMANodeList(lh logr.Logger, zones topologyv1alpha2.ZoneList) NUMANod nodes := NUMANodeList{} // filter non Node zones and create idToIdx lookup array for i, zone := range zones { - if zone.Type != "Node" { + if zone.Type != helper.ZoneTypeNUMANode { continue } @@ -180,10 +181,3 @@ func getForeignPodsDetectMode(lh logr.Logger, cfg *apiconfig.NodeResourceTopolog } return foreignPodsDetect } - -func logNumaNodes(lh logr.Logger, desc, nodeName string, nodes NUMANodeList) { - for _, numaNode := range nodes { - numaItems := []interface{}{"numaCell", numaNode.NUMAID} - lh.V(6).Info(desc, stringify.ResourceListToLoggableWithValues(numaItems, numaNode.Resources)...) - } -} diff --git a/pkg/noderesourcetopology/postbind.go b/pkg/noderesourcetopology/postbind.go index 3cb567f38..0831a83ff 100644 --- a/pkg/noderesourcetopology/postbind.go +++ b/pkg/noderesourcetopology/postbind.go @@ -26,8 +26,7 @@ import ( ) func (tm *TopologyMatch) PostBind(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { - lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues("ExtensionPoint", "PostBind"). - WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) + lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) lh.V(4).Info(logging.FlowBegin) defer lh.V(4).Info(logging.FlowEnd) diff --git a/pkg/noderesourcetopology/reserve.go b/pkg/noderesourcetopology/reserve.go index 4b48c9a1f..1cb3876eb 100644 --- a/pkg/noderesourcetopology/reserve.go +++ b/pkg/noderesourcetopology/reserve.go @@ -27,8 +27,7 @@ import ( func (tm *TopologyMatch) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { // the scheduler framework will add the node/name key/value pair - lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues("ExtensionPoint", "Reserve"). - WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) + lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) lh.V(4).Info(logging.FlowBegin) defer lh.V(4).Info(logging.FlowEnd) @@ -39,7 +38,7 @@ func (tm *TopologyMatch) Reserve(ctx context.Context, state *framework.CycleStat func (tm *TopologyMatch) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { // the scheduler framework will add the node/name key/value pair - lh := klog.FromContext(ctx).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod)) + lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) lh.V(4).Info(logging.FlowBegin) defer lh.V(4).Info(logging.FlowEnd) diff --git a/pkg/noderesourcetopology/score.go b/pkg/noderesourcetopology/score.go index 7b6472d3c..60cd755d2 100644 --- a/pkg/noderesourcetopology/score.go +++ b/pkg/noderesourcetopology/score.go @@ -30,7 +30,6 @@ import ( apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" "github.com/go-logr/logr" - topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/logging" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/nodeconfig" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" @@ -62,15 +61,15 @@ func (rw resourceToWeightMap) weight(r v1.ResourceName) int64 { func (tm *TopologyMatch) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { // the scheduler framework will add the node/name key/value pair - lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues("ExtensionPoint", "Score"). - WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) + lh := klog.FromContext(klog.NewContext(ctx, tm.logger)).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) lh.V(4).Info(logging.FlowBegin) defer lh.V(4).Info(logging.FlowEnd) lh.V(6).Info("scoring node") // if it's a non-guaranteed pod, every node is considered to be a good fit - if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { + qos := v1qos.GetPodQOS(pod) + if qos != v1.PodQOSGuaranteed { return framework.MaxNodeScore, nil } @@ -87,11 +86,18 @@ func (tm *TopologyMatch) Score(ctx context.Context, state *framework.CycleState, lh.V(6).Info("found object", "noderesourcetopology", stringify.NodeResourceTopologyResources(nodeTopology)) - handler := tm.scoringHandlerFromTopologyManagerConfig(nodeconfig.TopologyManagerFromNodeResourceTopology(lh, nodeTopology)) + conf := nodeconfig.TopologyManagerFromNodeResourceTopology(lh, nodeTopology) + handler := tm.scoringHandlerFromTopologyManagerConfig(conf) if handler == nil { return 0, nil } - return handler(lh, pod, nodeTopology.Zones) + numaNodes := createNUMANodeList(lh, nodeTopology.Zones) + si := scoreInfo{ + topologyManager: conf, + qos: qos, + numaNodes: numaNodes, + } + return handler(lh, pod, &si) } func (tm *TopologyMatch) ScoreExtensions() framework.ScoreExtensions { @@ -132,27 +138,24 @@ func getScoringStrategyFunction(strategy apiconfig.ScoringStrategyType) (scoreSt } } -func podScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, scorerFn scoreStrategyFn, resourceToWeightMap resourceToWeightMap) (int64, *framework.Status) { +func podScopeScore(lh logr.Logger, pod *v1.Pod, info *scoreInfo, scorerFn scoreStrategyFn, resourceToWeightMap resourceToWeightMap) (int64, *framework.Status) { // This code is in Admit implementation of pod scope // https://github.com/kubernetes/kubernetes/blob/9ff3b7e744b34c099c1405d9add192adbef0b6b1/pkg/kubelet/cm/topologymanager/scope_pod.go#L52 // but it works with HintProviders, takes into account all possible allocations. resources := util.GetPodEffectiveRequest(pod) - - allocatablePerNUMA := createNUMANodeList(lh, zones) - finalScore := scoreForEachNUMANode(lh, resources, allocatablePerNUMA, scorerFn, resourceToWeightMap) + finalScore := scoreForEachNUMANode(lh, resources, info.numaNodes, scorerFn, resourceToWeightMap) lh.V(2).Info("pod scope scoring final node score", "finalScore", finalScore) return finalScore, nil } -func containerScopeScore(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, scorerFn scoreStrategyFn, resourceToWeightMap resourceToWeightMap) (int64, *framework.Status) { +func containerScopeScore(lh logr.Logger, pod *v1.Pod, info *scoreInfo, scorerFn scoreStrategyFn, resourceToWeightMap resourceToWeightMap) (int64, *framework.Status) { // This code is in Admit implementation of container scope // https://github.com/kubernetes/kubernetes/blob/9ff3b7e744b34c099c1405d9add192adbef0b6b1/pkg/kubelet/cm/topologymanager/scope_container.go#L52 containers := append(pod.Spec.InitContainers, pod.Spec.Containers...) contScore := make([]float64, len(containers)) - allocatablePerNUMA := createNUMANodeList(lh, zones) for i, container := range containers { - contScore[i] = float64(scoreForEachNUMANode(lh, container.Resources.Requests, allocatablePerNUMA, scorerFn, resourceToWeightMap)) + contScore[i] = float64(scoreForEachNUMANode(lh, container.Resources.Requests, info.numaNodes, scorerFn, resourceToWeightMap)) lh.V(6).Info("container scope scoring", "container", container.Name, "score", contScore[i]) } finalScore := int64(stat.Mean(contScore, nil)) @@ -174,13 +177,13 @@ func (tm *TopologyMatch) scoringHandlerFromTopologyManagerConfig(conf nodeconfig return nil } if conf.Scope == kubeletconfig.PodTopologyManagerScope { - return func(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList) (int64, *framework.Status) { - return podScopeScore(lh, pod, zones, tm.scoreStrategyFunc, tm.resourceToWeightMap) + return func(lh logr.Logger, pod *v1.Pod, info *scoreInfo) (int64, *framework.Status) { + return podScopeScore(lh, pod, info, tm.scoreStrategyFunc, tm.resourceToWeightMap) } } if conf.Scope == kubeletconfig.ContainerTopologyManagerScope { - return func(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList) (int64, *framework.Status) { - return containerScopeScore(lh, pod, zones, tm.scoreStrategyFunc, tm.resourceToWeightMap) + return func(lh logr.Logger, pod *v1.Pod, info *scoreInfo) (int64, *framework.Status) { + return containerScopeScore(lh, pod, info, tm.scoreStrategyFunc, tm.resourceToWeightMap) } } return nil // cannot happen diff --git a/pkg/util/sidecar.go b/pkg/util/sidecar.go new file mode 100644 index 000000000..7a428ff02 --- /dev/null +++ b/pkg/util/sidecar.go @@ -0,0 +1,27 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + v1 "k8s.io/api/core/v1" +) + +// IsSidecarInitContainer assumes the given container is a pod init container; +// returns true if that container is a sidecar, false otherwise. +func IsSidecarInitContainer(container *v1.Container) bool { + return container.RestartPolicy != nil && *container.RestartPolicy == v1.ContainerRestartPolicyAlways +} diff --git a/pkg/util/sidecar_test.go b/pkg/util/sidecar_test.go new file mode 100644 index 000000000..56f0247ba --- /dev/null +++ b/pkg/util/sidecar_test.go @@ -0,0 +1,61 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + v1 "k8s.io/api/core/v1" +) + +func TestIsSidecarInitContainer(t *testing.T) { + always_ := v1.ContainerRestartPolicyAlways + testCases := []struct { + name string + cnt *v1.Container + expected bool + }{ + { + name: "zero value", + cnt: &v1.Container{}, + expected: false, + }, + { + name: "explicit nil", + cnt: &v1.Container{ + RestartPolicy: nil, + }, + expected: false, + }, + { + name: "true sidecar container", + cnt: &v1.Container{ + Name: "init-1", + RestartPolicy: &always_, + }, + expected: true, + }, + } + for _, testCase := range testCases { + t.Run(string(testCase.name), func(t *testing.T) { + got := IsSidecarInitContainer(testCase.cnt) + if got != testCase.expected { + t.Fatalf("expected %t to equal %t", got, testCase.expected) + } + }) + } +}