diff --git a/api/delegation/pods/messages.go b/api/delegation/pods/messages.go index b1a30799..a3c9fd95 100644 --- a/api/delegation/pods/messages.go +++ b/api/delegation/pods/messages.go @@ -12,6 +12,8 @@ import ( type PodPipelineRequest struct { // The available nodes. Nodes []corev1.Node `json:"nodes"` + // The pod to be scheduled. + Pod corev1.Pod `json:"pod"` } func (r PodPipelineRequest) GetSubjects() []string { diff --git a/helm/bundles/cortex-pods/templates/pipelines.yaml b/helm/bundles/cortex-pods/templates/pipelines.yaml index aec8db63..ed41ca3b 100644 --- a/helm/bundles/cortex-pods/templates/pipelines.yaml +++ b/helm/bundles/cortex-pods/templates/pipelines.yaml @@ -17,3 +17,17 @@ spec: It is used as a placeholder step in the pods scheduler pipeline. knowledges: [] mandatory: false + - type: filter + impl: taint + description: | + Filters nodes based on taints, excluding nodes with NoSchedule taints + unless the pod has matching tolerations. + knowledges: [] + mandatory: true + - type: filter + impl: nodeaffinity + description: | + Filters nodes based on pod's node affinity requirements, matching + nodes that satisfy the specified label selectors. + knowledges: [] + mandatory: true diff --git a/internal/scheduling/decisions/pods/pipeline_controller.go b/internal/scheduling/decisions/pods/pipeline_controller.go index 1787b22e..63a143d2 100644 --- a/internal/scheduling/decisions/pods/pipeline_controller.go +++ b/internal/scheduling/decisions/pods/pipeline_controller.go @@ -141,6 +141,20 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al return errors.New("pipeline not found or not ready") } + // Check if the pod is already assigned to a node. + pod := &corev1.Pod{} + if err := c.Get(ctx, client.ObjectKey{ + Name: decision.Spec.PodRef.Name, + Namespace: decision.Spec.PodRef.Namespace, + }, pod); err != nil { + log.Error(err, "failed to fetch pod for decision") + return err + } + if pod.Spec.NodeName != "" { + log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName) + return nil + } + // Find all available nodes. nodes := &corev1.NodeList{} if err := c.List(ctx, nodes); err != nil { @@ -151,7 +165,7 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al } // Execute the scheduling pipeline. - request := pods.PodPipelineRequest{Nodes: nodes.Items} + request := pods.PodPipelineRequest{Nodes: nodes.Items, Pod: *pod} result, err := pipeline.Run(request) if err != nil { log.V(1).Error(err, "failed to run scheduler pipeline") @@ -160,20 +174,6 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al decision.Status.Result = &result log.Info("decision processed successfully", "duration", time.Since(startedAt)) - // Check if the pod is already assigned to a node. - pod := &corev1.Pod{} - if err := c.Get(ctx, client.ObjectKey{ - Name: decision.Spec.PodRef.Name, - Namespace: decision.Spec.PodRef.Namespace, - }, pod); err != nil { - log.Error(err, "failed to fetch pod for decision") - return err - } - if pod.Spec.NodeName != "" { - log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName) - return nil - } - // Assign the first node returned by the pipeline using a Binding. binding := &corev1.Binding{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/scheduling/decisions/pods/plugins/filters/filter_node_affinity.go b/internal/scheduling/decisions/pods/plugins/filters/filter_node_affinity.go new file mode 100644 index 00000000..265bffa2 --- /dev/null +++ b/internal/scheduling/decisions/pods/plugins/filters/filter_node_affinity.go @@ -0,0 +1,118 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package filters + +import ( + "context" + "log/slog" + "strconv" + + "github.com/cobaltcore-dev/cortex/api/delegation/pods" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type NodeAffinityFilter struct { + Alias string +} + +func (f *NodeAffinityFilter) Init(ctx context.Context, client client.Client, step v1alpha1.StepSpec) error { + return nil +} + +func (NodeAffinityFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.StepResult, error) { + activations := make(map[string]float64) + stats := make(map[string]lib.StepStatistics) + + for _, node := range request.Nodes { + if matchesNodeAffinity(node, request.Pod) { + activations[node.Name] = 0.0 + } + } + + return &lib.StepResult{Activations: activations, Statistics: stats}, nil +} + +func matchesNodeAffinity(node corev1.Node, pod corev1.Pod) bool { + if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil { + return true + } + + required := pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution + if required == nil { + return true + } + + for _, term := range required.NodeSelectorTerms { + if matchesNodeSelectorTerm(node, term) { + return true + } + } + + return false +} + +func matchesNodeSelectorTerm(node corev1.Node, term corev1.NodeSelectorTerm) bool { + for _, expr := range term.MatchExpressions { + if !matchesNodeSelectorRequirement(node, expr) { + return false + } + } + return true +} + +func matchesNodeSelectorRequirement(node corev1.Node, req corev1.NodeSelectorRequirement) bool { + nodeValue, exists := node.Labels[req.Key] + + switch req.Operator { + case corev1.NodeSelectorOpIn: + if !exists { + return false + } + for _, value := range req.Values { + if nodeValue == value { + return true + } + } + return false + case corev1.NodeSelectorOpNotIn: + if !exists { + return true + } + for _, value := range req.Values { + if nodeValue == value { + return false + } + } + return true + case corev1.NodeSelectorOpExists: + return exists + case corev1.NodeSelectorOpDoesNotExist: + return !exists + case corev1.NodeSelectorOpGt: + if !exists || len(req.Values) == 0 { + return false + } + nodeInt, err1 := strconv.Atoi(nodeValue) + reqInt, err2 := strconv.Atoi(req.Values[0]) + if err1 != nil || err2 != nil { + return false + } + return nodeInt > reqInt + case corev1.NodeSelectorOpLt: + if !exists || len(req.Values) == 0 { + return false + } + nodeInt, err1 := strconv.Atoi(nodeValue) + reqInt, err2 := strconv.Atoi(req.Values[0]) + if err1 != nil || err2 != nil { + return false + } + return nodeInt < reqInt + default: + return false + } +} diff --git a/internal/scheduling/decisions/pods/plugins/filters/filter_node_affinity_test.go b/internal/scheduling/decisions/pods/plugins/filters/filter_node_affinity_test.go new file mode 100644 index 00000000..5cf24deb --- /dev/null +++ b/internal/scheduling/decisions/pods/plugins/filters/filter_node_affinity_test.go @@ -0,0 +1,737 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package filters + +import ( + "log/slog" + "testing" + + "github.com/cobaltcore-dev/cortex/api/delegation/pods" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestNodeAffinityFilter_Run(t *testing.T) { + tests := []struct { + name string + request pods.PodPipelineRequest + expected map[string]float64 + }{ + { + name: "no node affinity", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2"}, + }, + }, + Pod: corev1.Pod{}, + }, + expected: map[string]float64{ + "node1": 0.0, + "node2": 0.0, + }, + }, + { + name: "zone affinity with In operator", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-east1", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-west1", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-north1", + }, + }, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "topology.kubernetes.io/zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"antarctica-east1", "antarctica-west1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node1": 0.0, + "node2": 0.0, + }, + }, + { + name: "node type affinity with NotIn operator", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "node.kubernetes.io/instance-type": "m5.large", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + "node.kubernetes.io/instance-type": "t3.micro", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node3"}, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "node.kubernetes.io/instance-type", + Operator: corev1.NodeSelectorOpNotIn, + Values: []string{"t3.micro"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node1": 0.0, + "node3": 0.0, + }, + }, + { + name: "exists operator", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "gpu": "true", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2"}, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "gpu", + Operator: corev1.NodeSelectorOpExists, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node1": 0.0, + }, + }, + { + name: "does not exist operator", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "gpu": "true", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2"}, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "gpu", + Operator: corev1.NodeSelectorOpDoesNotExist, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node2": 0.0, + }, + }, + { + name: "multiple expressions in single term (AND logic)", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-east1", + "node.kubernetes.io/instance-type": "m5.large", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-east1", + "node.kubernetes.io/instance-type": "t3.micro", + }, + }, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "topology.kubernetes.io/zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"antarctica-east1"}, + }, + { + Key: "node.kubernetes.io/instance-type", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"m5.large"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node1": 0.0, + }, + }, + { + name: "multiple terms (OR logic)", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-east1", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + "node.kubernetes.io/instance-type": "m5.large", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node3"}, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "topology.kubernetes.io/zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"antarctica-east1"}, + }, + }, + }, + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "node.kubernetes.io/instance-type", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"m5.large"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node1": 0.0, + "node2": 0.0, + }, + }, + { + name: "no matching nodes", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-north1", + }, + }, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "topology.kubernetes.io/zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"antarctica-east1", "antarctica-west1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]float64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filter := &NodeAffinityFilter{} + result, err := filter.Run(slog.Default(), tt.request) + + if err != nil { + t.Errorf("expected Run() to succeed, got error: %v", err) + return + } + + if result == nil { + t.Fatal("expected result to be non-nil") + } + + if len(result.Activations) != len(tt.expected) { + t.Errorf("expected %d activations, got %d", len(tt.expected), len(result.Activations)) + return + } + + for nodeName, expectedWeight := range tt.expected { + actualWeight, ok := result.Activations[nodeName] + if !ok { + t.Errorf("expected activation for node %q, but not found", nodeName) + continue + } + + if actualWeight != expectedWeight { + t.Errorf("expected weight for node %q to be %f, got %f", nodeName, expectedWeight, actualWeight) + } + } + + if result.Statistics == nil { + t.Error("expected Statistics to be non-nil") + } + }) + } +} + +func TestMatchesNodeAffinity(t *testing.T) { + tests := []struct { + name string + node corev1.Node + pod corev1.Pod + expected bool + }{ + { + name: "no affinity", + node: corev1.Node{}, + pod: corev1.Pod{}, + expected: true, + }, + { + name: "matching zone", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-east1", + }, + }, + }, + pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "topology.kubernetes.io/zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"antarctica-east1", "antarctica-west1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "non-matching zone", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "topology.kubernetes.io/zone": "antarctica-north1", + }, + }, + }, + pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "topology.kubernetes.io/zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"antarctica-east1", "antarctica-west1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matchesNodeAffinity(tt.node, tt.pod) + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestMatchesNodeSelectorRequirement(t *testing.T) { + tests := []struct { + name string + node corev1.Node + requirement corev1.NodeSelectorRequirement + expected bool + }{ + { + name: "In operator - matching value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "zone": "east1", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"east1", "west1"}, + }, + expected: true, + }, + { + name: "In operator - non-matching value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "zone": "north1", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"east1", "west1"}, + }, + expected: false, + }, + { + name: "In operator - missing label", + node: corev1.Node{}, + requirement: corev1.NodeSelectorRequirement{ + Key: "zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"east1", "west1"}, + }, + expected: false, + }, + { + name: "NotIn operator - non-matching value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "zone": "north1", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "zone", + Operator: corev1.NodeSelectorOpNotIn, + Values: []string{"east1", "west1"}, + }, + expected: true, + }, + { + name: "NotIn operator - matching value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "zone": "east1", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "zone", + Operator: corev1.NodeSelectorOpNotIn, + Values: []string{"east1", "west1"}, + }, + expected: false, + }, + { + name: "Exists operator - label exists", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "gpu": "true", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "gpu", + Operator: corev1.NodeSelectorOpExists, + }, + expected: true, + }, + { + name: "Exists operator - label missing", + node: corev1.Node{}, + requirement: corev1.NodeSelectorRequirement{ + Key: "gpu", + Operator: corev1.NodeSelectorOpExists, + }, + expected: false, + }, + { + name: "DoesNotExist operator - label missing", + node: corev1.Node{}, + requirement: corev1.NodeSelectorRequirement{ + Key: "gpu", + Operator: corev1.NodeSelectorOpDoesNotExist, + }, + expected: true, + }, + { + name: "DoesNotExist operator - label exists", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "gpu": "true", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "gpu", + Operator: corev1.NodeSelectorOpDoesNotExist, + }, + expected: false, + }, + { + name: "Gt operator - greater value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "cpu-cores": "8", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "cpu-cores", + Operator: corev1.NodeSelectorOpGt, + Values: []string{"4"}, + }, + expected: true, + }, + { + name: "Gt operator - equal value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "cpu-cores": "4", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "cpu-cores", + Operator: corev1.NodeSelectorOpGt, + Values: []string{"4"}, + }, + expected: false, + }, + { + name: "Gt operator - non-integer value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "cpu-cores": "invalid", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "cpu-cores", + Operator: corev1.NodeSelectorOpGt, + Values: []string{"4"}, + }, + expected: false, + }, + { + name: "Lt operator - smaller value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "memory-gb": "16", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "memory-gb", + Operator: corev1.NodeSelectorOpLt, + Values: []string{"32"}, + }, + expected: true, + }, + { + name: "Lt operator - equal value", + node: corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "memory-gb": "32", + }, + }, + }, + requirement: corev1.NodeSelectorRequirement{ + Key: "memory-gb", + Operator: corev1.NodeSelectorOpLt, + Values: []string{"32"}, + }, + expected: false, + }, + { + name: "Lt operator - missing label", + node: corev1.Node{}, + requirement: corev1.NodeSelectorRequirement{ + Key: "memory-gb", + Operator: corev1.NodeSelectorOpLt, + Values: []string{"32"}, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matchesNodeSelectorRequirement(tt.node, tt.requirement) + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} diff --git a/internal/scheduling/decisions/pods/noop.go b/internal/scheduling/decisions/pods/plugins/filters/filter_noop.go similarity index 91% rename from internal/scheduling/decisions/pods/noop.go rename to internal/scheduling/decisions/pods/plugins/filters/filter_noop.go index 55f04174..3cd328a5 100644 --- a/internal/scheduling/decisions/pods/noop.go +++ b/internal/scheduling/decisions/pods/plugins/filters/filter_noop.go @@ -1,4 +1,7 @@ -package pods +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package filters import ( "context" @@ -29,7 +32,7 @@ func (NoopFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (* stats := make(map[string]lib.StepStatistics) // Usually you would do some filtering here, or adjust the weights. for _, node := range request.Nodes { - activations[node.Name] = 1.0 + activations[node.Name] = 0.0 } return &lib.StepResult{Activations: activations, Statistics: stats}, nil } diff --git a/internal/scheduling/decisions/pods/noop_test.go b/internal/scheduling/decisions/pods/plugins/filters/filter_noop_test.go similarity index 92% rename from internal/scheduling/decisions/pods/noop_test.go rename to internal/scheduling/decisions/pods/plugins/filters/filter_noop_test.go index 07d4c92f..167a6d87 100644 --- a/internal/scheduling/decisions/pods/noop_test.go +++ b/internal/scheduling/decisions/pods/plugins/filters/filter_noop_test.go @@ -1,7 +1,7 @@ // Copyright SAP SE // SPDX-License-Identifier: Apache-2.0 -package pods +package filters import ( "log/slog" @@ -22,6 +22,7 @@ func TestNoopFilter_Run(t *testing.T) { name: "empty nodes", request: pods.PodPipelineRequest{ Nodes: []corev1.Node{}, + Pod: corev1.Pod{}, }, expected: map[string]float64{}, }, @@ -33,9 +34,10 @@ func TestNoopFilter_Run(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "node1"}, }, }, + Pod: corev1.Pod{}, }, expected: map[string]float64{ - "node1": 1.0, + "node1": 0.0, }, }, { @@ -52,11 +54,12 @@ func TestNoopFilter_Run(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "node3"}, }, }, + Pod: corev1.Pod{}, }, expected: map[string]float64{ - "node1": 1.0, - "node2": 1.0, - "node3": 1.0, + "node1": 0.0, + "node2": 0.0, + "node3": 0.0, }, }, } diff --git a/internal/scheduling/decisions/pods/plugins/filters/filter_taint.go b/internal/scheduling/decisions/pods/plugins/filters/filter_taint.go new file mode 100644 index 00000000..82135b16 --- /dev/null +++ b/internal/scheduling/decisions/pods/plugins/filters/filter_taint.go @@ -0,0 +1,61 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package filters + +import ( + "context" + "log/slog" + + "github.com/cobaltcore-dev/cortex/api/delegation/pods" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type TaintFilter struct { + Alias string +} + +func (f *TaintFilter) Init(ctx context.Context, client client.Client, step v1alpha1.StepSpec) error { + return nil +} + +func (TaintFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.StepResult, error) { + activations := make(map[string]float64) + stats := make(map[string]lib.StepStatistics) + + for _, node := range request.Nodes { + if canScheduleOnNode(node, request.Pod) { + activations[node.Name] = 0.0 + } + } + + return &lib.StepResult{Activations: activations, Statistics: stats}, nil +} + +func canScheduleOnNode(node corev1.Node, pod corev1.Pod) bool { + for _, taint := range node.Spec.Taints { + if taint.Effect == corev1.TaintEffectNoSchedule { + if !hasToleration(pod, taint) { + return false + } + } + } + return true +} + +func hasToleration(pod corev1.Pod, taint corev1.Taint) bool { + for _, toleration := range pod.Spec.Tolerations { + if toleration.Key == taint.Key { + if toleration.Operator == corev1.TolerationOpExists { + return true + } + if toleration.Operator == corev1.TolerationOpEqual && toleration.Value == taint.Value { + return true + } + } + } + return false +} diff --git a/internal/scheduling/decisions/pods/plugins/filters/filter_taint_test.go b/internal/scheduling/decisions/pods/plugins/filters/filter_taint_test.go new file mode 100644 index 00000000..fc02e774 --- /dev/null +++ b/internal/scheduling/decisions/pods/plugins/filters/filter_taint_test.go @@ -0,0 +1,420 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package filters + +import ( + "log/slog" + "testing" + + "github.com/cobaltcore-dev/cortex/api/delegation/pods" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestTaintFilter_Run(t *testing.T) { + tests := []struct { + name string + request pods.PodPipelineRequest + expected map[string]float64 + }{ + { + name: "no taints, no tolerations", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2"}, + }, + }, + Pod: corev1.Pod{}, + }, + expected: map[string]float64{ + "node1": 0.0, + "node2": 0.0, + }, + }, + { + name: "node with NoSchedule taint, pod without tolerations", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "node-role.kubernetes.io/master", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2"}, + }, + }, + Pod: corev1.Pod{}, + }, + expected: map[string]float64{ + "node2": 0.0, + }, + }, + { + name: "node with NoSchedule taint, pod with matching toleration (Equal operator)", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "node-role.kubernetes.io/master", + Value: "true", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "node-role.kubernetes.io/master", + Value: "true", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node1": 0.0, + }, + }, + { + name: "node with NoSchedule taint, pod with matching toleration (Exists operator)", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "node-role.kubernetes.io/master", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "node-role.kubernetes.io/master", + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node1": 0.0, + }, + }, + { + name: "node with NoSchedule taint, pod with non-matching toleration", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "node-role.kubernetes.io/master", + Value: "true", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "different-key", + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + expected: map[string]float64{}, + }, + { + name: "node with NoExecute taint (should not be filtered)", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "node.kubernetes.io/not-ready", + Effect: corev1.TaintEffectNoExecute, + }, + }, + }, + }, + }, + Pod: corev1.Pod{}, + }, + expected: map[string]float64{ + "node1": 0.0, + }, + }, + { + name: "mixed nodes with different taints", + request: pods.PodPipelineRequest{ + Nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "node-role.kubernetes.io/master", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node3"}, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "app", + Value: "database", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + Pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "app", + Value: "database", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + expected: map[string]float64{ + "node2": 0.0, + "node3": 0.0, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filter := &TaintFilter{} + result, err := filter.Run(slog.Default(), tt.request) + + if err != nil { + t.Errorf("expected Run() to succeed, got error: %v", err) + return + } + + if result == nil { + t.Fatal("expected result to be non-nil") + } + + if len(result.Activations) != len(tt.expected) { + t.Errorf("expected %d activations, got %d", len(tt.expected), len(result.Activations)) + return + } + + for nodeName, expectedWeight := range tt.expected { + actualWeight, ok := result.Activations[nodeName] + if !ok { + t.Errorf("expected activation for node %q, but not found", nodeName) + continue + } + + if actualWeight != expectedWeight { + t.Errorf("expected weight for node %q to be %f, got %f", nodeName, expectedWeight, actualWeight) + } + } + + if result.Statistics == nil { + t.Error("expected Statistics to be non-nil") + } + }) + } +} + +func TestCanScheduleOnNode(t *testing.T) { + tests := []struct { + name string + node corev1.Node + pod corev1.Pod + expected bool + }{ + { + name: "no taints", + node: corev1.Node{}, + pod: corev1.Pod{}, + expected: true, + }, + { + name: "NoSchedule taint without matching toleration", + node: corev1.Node{ + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "test-key", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + pod: corev1.Pod{}, + expected: false, + }, + { + name: "NoSchedule taint with matching toleration", + node: corev1.Node{ + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: "test-key", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "test-key", + Operator: corev1.TolerationOpExists, + }, + }, + }, + }, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := canScheduleOnNode(tt.node, tt.pod) + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestHasToleration(t *testing.T) { + tests := []struct { + name string + pod corev1.Pod + taint corev1.Taint + expected bool + }{ + { + name: "no tolerations", + pod: corev1.Pod{}, + taint: corev1.Taint{ + Key: "test-key", + Effect: corev1.TaintEffectNoSchedule, + }, + expected: false, + }, + { + name: "matching toleration with Exists operator", + pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "test-key", + Operator: corev1.TolerationOpExists, + }, + }, + }, + }, + taint: corev1.Taint{ + Key: "test-key", + Effect: corev1.TaintEffectNoSchedule, + }, + expected: true, + }, + { + name: "matching toleration with Equal operator and matching value", + pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "test-key", + Value: "test-value", + Operator: corev1.TolerationOpEqual, + }, + }, + }, + }, + taint: corev1.Taint{ + Key: "test-key", + Value: "test-value", + Effect: corev1.TaintEffectNoSchedule, + }, + expected: true, + }, + { + name: "non-matching toleration with Equal operator", + pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "test-key", + Value: "different-value", + Operator: corev1.TolerationOpEqual, + }, + }, + }, + }, + taint: corev1.Taint{ + Key: "test-key", + Value: "test-value", + Effect: corev1.TaintEffectNoSchedule, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := hasToleration(tt.pod, tt.taint) + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} diff --git a/internal/scheduling/decisions/pods/supported_steps.go b/internal/scheduling/decisions/pods/supported_steps.go index f7f07141..6f4c6daa 100644 --- a/internal/scheduling/decisions/pods/supported_steps.go +++ b/internal/scheduling/decisions/pods/supported_steps.go @@ -5,6 +5,7 @@ package pods import ( "github.com/cobaltcore-dev/cortex/api/delegation/pods" + "github.com/cobaltcore-dev/cortex/internal/scheduling/decisions/pods/plugins/filters" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) @@ -13,5 +14,7 @@ type PodStep = lib.Step[pods.PodPipelineRequest] // Configuration of steps supported by the scheduling. // The steps actually used by the scheduler are defined through the configuration file. var supportedSteps = map[string]func() PodStep{ - "noop": func() PodStep { return &NoopFilter{} }, + "noop": func() PodStep { return &filters.NoopFilter{} }, + "taint": func() PodStep { return &filters.TaintFilter{} }, + "nodeaffinity": func() PodStep { return &filters.NodeAffinityFilter{} }, }