Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions pkg/estimator/scheduling_simulator_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package estimator

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"

"github.com/karmada-io/karmada/pkg/estimator/pb"
nodeutil "github.com/karmada-io/karmada/pkg/estimator/server/nodes"
Expand Down Expand Up @@ -76,10 +77,10 @@ func (s *SchedulingSimulator) scheduleComponent(component pb.Component) bool {
requiredPerReplica := util.NewResource(component.ReplicaRequirements.ResourceRequest)
requiredPerReplica.AllowedPodNumber = 1
remaining := component.Replicas
nodeClaim := component.ReplicaRequirements.NodeClaim
affinity, tolerations := GetAffinityAndTolerations(component.ReplicaRequirements.NodeClaim)

for _, node := range s.nodes {
if !matchNode(nodeClaim, node) {
if !MatchNode(node, affinity, tolerations) {
continue
}

Expand All @@ -102,19 +103,22 @@ func (s *SchedulingSimulator) scheduleComponent(component pb.Component) bool {
return remaining == 0
}

// matchNode checks whether the node matches the scheduling constraints defined in the replica requirements.
func matchNode(nodeClaim *pb.NodeClaim, node *schedulerframework.NodeInfo) bool {
if node.Node() == nil {
// Always match since we lack node affinity/toleration info, so we skip these checks.
return true
}

// GetAffinityAndTolerations extracts node affinity and tolerations from a NodeClaim.
func GetAffinityAndTolerations(nodeClaim *pb.NodeClaim) (nodeaffinity.RequiredNodeAffinity, []corev1.Toleration) {
affinity := nodeutil.GetRequiredNodeAffinity(pb.ReplicaRequirements{NodeClaim: nodeClaim})
var tolerations []corev1.Toleration

if nodeClaim != nil {
tolerations = nodeClaim.Tolerations
}
return affinity, tolerations
}

// MatchNode checks whether the node matches the node affinity and tolerations specified in the component's replica requirements.
func MatchNode(node *schedulerframework.NodeInfo, affinity nodeaffinity.RequiredNodeAffinity, tolerations []corev1.Toleration) bool {
if node.Node() == nil {
// Always match since we lack node affinity/toleration info, so we skip these checks.
return true
}

return nodeutil.IsNodeAffinityMatched(node.Node(), affinity) && nodeutil.IsTolerationMatched(node.Node(), tolerations)
}
5 changes: 3 additions & 2 deletions pkg/estimator/scheduling_simulator_components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,10 @@ func TestMatchNode(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := matchNode(tt.replicaRequirements.NodeClaim, tt.node)
affinity, tolerations := GetAffinityAndTolerations(tt.replicaRequirements.NodeClaim)
result := MatchNode(tt.node, affinity, tolerations)
if result != tt.expected {
t.Errorf("matchNode() = %v, expected %v", result, tt.expected)
t.Errorf("MatchNode() = %v, expected %v", result, tt.expected)
}
})
}
Expand Down
39 changes: 21 additions & 18 deletions pkg/estimator/server/framework/plugins/noderesource/noderesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/karmada-io/karmada/pkg/estimator"
"github.com/karmada-io/karmada/pkg/estimator/pb"
"github.com/karmada-io/karmada/pkg/estimator/server/framework"
nodeutil "github.com/karmada-io/karmada/pkg/estimator/server/nodes"
"github.com/karmada-io/karmada/pkg/util"
schedcache "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/cache"
schedulerframework "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/framework"
Expand Down Expand Up @@ -70,27 +69,20 @@ func (pl *nodeResourceEstimator) Name() string {
// Estimate the replica allowed by the node resources for a given pb.ReplicaRequirements.
func (pl *nodeResourceEstimator) Estimate(ctx context.Context, snapshot *schedcache.Snapshot, requirements *pb.ReplicaRequirements) (int32, *framework.Result) {
if !pl.enabled {
klog.V(5).Info("Estimator Plugin", "name", Name, "enabled", pl.enabled)
return noNodeConstraint, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s is disabled", pl.Name()))
return pl.disabledResult()
}

allNodes, err := snapshot.NodeInfos().List()
if err != nil {
return 0, framework.AsResult(err)
}
var (
affinity = nodeutil.GetRequiredNodeAffinity(*requirements)
tolerations []corev1.Toleration
)

if requirements.NodeClaim != nil {
tolerations = requirements.NodeClaim.Tolerations
}
affinity, tolerations := estimator.GetAffinityAndTolerations(requirements.NodeClaim)

var res int32
processNode := func(i int) {
node := allNodes[i]
if !nodeutil.IsNodeAffinityMatched(node.Node(), affinity) || !nodeutil.IsTolerationMatched(node.Node(), tolerations) {
node := allNodes[i].Clone()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me why the copy operation is needed here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getNodeAvailableResource function is newly added. It has two calling points. One is clone in the getNodesAvailableResources function. This is the other calling point, and the clone function is added here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, got it

if !estimator.MatchNode(node, affinity, tolerations) {
return
}
maxReplica := pl.nodeMaxAvailableReplica(node, requirements.ResourceRequest)
Expand All @@ -102,21 +94,28 @@ func (pl *nodeResourceEstimator) Estimate(ctx context.Context, snapshot *schedca
}

func (pl *nodeResourceEstimator) nodeMaxAvailableReplica(node *schedulerframework.NodeInfo, rl corev1.ResourceList) int32 {
rest := node.Allocatable.Clone().SubResource(node.Requested)
rest := getNodeAvailableResource(node)
return int32(rest.MaxDivided(rl)) // #nosec G115: integer overflow conversion int64 -> int32
}

// getNodeAvailableResource calculates a node's available resources after deducting requested resources
// and adjusting the pod-count capacity, which isn't included in node.Requested.
func getNodeAvailableResource(node *schedulerframework.NodeInfo) *util.Resource {
rest := node.Allocatable
rest = rest.SubResource(node.Requested)
// The number of pods in a node is a kind of resource in node allocatable resources.
// However, total requested resources of all pods on this node, i.e. `node.Requested`,
// do not contain pod resources. So after subtraction, we should cope with allowed pod
// number manually which is the upper bound of this node available replicas.
rest.AllowedPodNumber = util.MaxInt64(rest.AllowedPodNumber-int64(len(node.Pods)), 0)
return int32(rest.MaxDivided(rl)) // #nosec G115: integer overflow conversion int64 -> int32
return rest
}

// EstimateComponents estimates the maximum number of complete component sets that can be scheduled.
// It returns the number of sets that can fit on the available node resources.
func (pl *nodeResourceEstimator) EstimateComponents(_ context.Context, snapshot *schedcache.Snapshot, components []pb.Component, _ string) (int32, *framework.Result) {
if !pl.enabled {
klog.V(5).Info("Estimator Plugin", "name", Name, "enabled", pl.enabled)
return noNodeConstraint, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s is disabled", pl.Name()))
return pl.disabledResult()
}

if len(components) == 0 {
Expand All @@ -136,6 +135,11 @@ func (pl *nodeResourceEstimator) EstimateComponents(_ context.Context, snapshot
return sets, framework.NewResult(framework.Success)
}

func (pl *nodeResourceEstimator) disabledResult() (int32, *framework.Result) {
klog.V(5).Info("Estimator Plugin", "name", Name, "enabled", pl.enabled)
return noNodeConstraint, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s is disabled", pl.Name()))
}

// getNodesAvailableResources retrieves and prepares the list of node information from the snapshot.
// It clones each node's info and adjusts the allocatable resources by subtracting the requested resources.
// So that the returned node infos reflect the actual available resources for scheduling.
Expand All @@ -148,8 +152,7 @@ func getNodesAvailableResources(snapshot *schedcache.Snapshot) ([]*schedulerfram
rest := make([]*schedulerframework.NodeInfo, 0, len(allNodes))
for _, node := range allNodes {
n := node.Clone()
n.Allocatable.SubResource(n.Requested)
n.Allocatable.AllowedPodNumber = util.MaxInt64(n.Allocatable.AllowedPodNumber-int64(len(node.Pods)), 0)
n.Allocatable = getNodeAvailableResource(n)
rest = append(rest, n)
}

Expand Down
Loading