Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/delegation/pods/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
type PodPipelineRequest struct {
// The available nodes.
Nodes []corev1.Node `json:"nodes"`
// The pod to be scheduled.
Pod corev1.Pod `json:"pod"`
}

func (r PodPipelineRequest) GetSubjects() []string {
Expand Down
14 changes: 14 additions & 0 deletions helm/bundles/cortex-pods/templates/pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,17 @@ spec:
It is used as a placeholder step in the pods scheduler pipeline.
knowledges: []
mandatory: false
- type: filter
impl: taint
description: |
Filters nodes based on taints, excluding nodes with NoSchedule taints
unless the pod has matching tolerations.
knowledges: []
mandatory: true
- type: filter
impl: nodeaffinity
description: |
Filters nodes based on pod's node affinity requirements, matching
nodes that satisfy the specified label selectors.
knowledges: []
mandatory: true
30 changes: 15 additions & 15 deletions internal/scheduling/decisions/pods/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
return errors.New("pipeline not found or not ready")
}

// Check if the pod is already assigned to a node.
pod := &corev1.Pod{}
if err := c.Get(ctx, client.ObjectKey{
Name: decision.Spec.PodRef.Name,
Namespace: decision.Spec.PodRef.Namespace,
}, pod); err != nil {
log.Error(err, "failed to fetch pod for decision")
return err
}
if pod.Spec.NodeName != "" {
log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName)
return nil
}

// Find all available nodes.
nodes := &corev1.NodeList{}
if err := c.List(ctx, nodes); err != nil {
Expand All @@ -151,7 +165,7 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
}

// Execute the scheduling pipeline.
request := pods.PodPipelineRequest{Nodes: nodes.Items}
request := pods.PodPipelineRequest{Nodes: nodes.Items, Pod: *pod}
result, err := pipeline.Run(request)
if err != nil {
log.V(1).Error(err, "failed to run scheduler pipeline")
Expand All @@ -160,20 +174,6 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
decision.Status.Result = &result
log.Info("decision processed successfully", "duration", time.Since(startedAt))

// Check if the pod is already assigned to a node.
pod := &corev1.Pod{}
if err := c.Get(ctx, client.ObjectKey{
Name: decision.Spec.PodRef.Name,
Namespace: decision.Spec.PodRef.Namespace,
}, pod); err != nil {
log.Error(err, "failed to fetch pod for decision")
return err
}
if pod.Spec.NodeName != "" {
log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName)
return nil
}

// Assign the first node returned by the pipeline using a Binding.
binding := &corev1.Binding{
ObjectMeta: metav1.ObjectMeta{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package filters

import (
"context"
"log/slog"
"strconv"

"github.com/cobaltcore-dev/cortex/api/delegation/pods"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type NodeAffinityFilter struct {
Alias string
}

func (f *NodeAffinityFilter) Init(ctx context.Context, client client.Client, step v1alpha1.StepSpec) error {
return nil
}

func (NodeAffinityFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.StepResult, error) {
activations := make(map[string]float64)
stats := make(map[string]lib.StepStatistics)

for _, node := range request.Nodes {
if matchesNodeAffinity(node, request.Pod) {
activations[node.Name] = 0.0
}
}

return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}

func matchesNodeAffinity(node corev1.Node, pod corev1.Pod) bool {
if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
return true
}

required := pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
if required == nil {
return true
}

for _, term := range required.NodeSelectorTerms {
if matchesNodeSelectorTerm(node, term) {
return true
}
}

return false
}

func matchesNodeSelectorTerm(node corev1.Node, term corev1.NodeSelectorTerm) bool {
for _, expr := range term.MatchExpressions {
if !matchesNodeSelectorRequirement(node, expr) {
return false
}
}
return true
}

func matchesNodeSelectorRequirement(node corev1.Node, req corev1.NodeSelectorRequirement) bool {
nodeValue, exists := node.Labels[req.Key]

switch req.Operator {
case corev1.NodeSelectorOpIn:
if !exists {
return false
}
for _, value := range req.Values {
if nodeValue == value {
return true
}
}
return false
case corev1.NodeSelectorOpNotIn:
if !exists {
return true
}
for _, value := range req.Values {
if nodeValue == value {
return false
}
}
return true
case corev1.NodeSelectorOpExists:
return exists
case corev1.NodeSelectorOpDoesNotExist:
return !exists
case corev1.NodeSelectorOpGt:
if !exists || len(req.Values) == 0 {
return false
}
nodeInt, err1 := strconv.Atoi(nodeValue)
reqInt, err2 := strconv.Atoi(req.Values[0])
if err1 != nil || err2 != nil {
return false
}
return nodeInt > reqInt
case corev1.NodeSelectorOpLt:
if !exists || len(req.Values) == 0 {
return false
}
nodeInt, err1 := strconv.Atoi(nodeValue)
reqInt, err2 := strconv.Atoi(req.Values[0])
if err1 != nil || err2 != nil {
return false
}
return nodeInt < reqInt
default:
return false
}
}
Loading
Loading