Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
697a371
implement support for pods
kruegercharles Nov 30, 2025
5164655
add cortex-pods-controller-manager to cortex-remote-crb.yaml
kruegercharles Dec 1, 2025
1ef22cc
set rbac values for pods just like ironcore
kruegercharles Dec 1, 2025
f7904b6
fix linting issues
kruegercharles Dec 1, 2025
1deb35b
use kubernetes builtin corev1.Nodes instead of custom podsv1alpha1.Nodes
kruegercharles Dec 8, 2025
6e0935e
remove dev_pods and dev_nodes as crds
kruegercharles Dec 10, 2025
c82d0ea
fix wrong service name for cortex pods
kruegercharles Dec 10, 2025
b9b4409
remove postgres dependency from cortex-pods
kruegercharles Dec 10, 2025
9e79e21
fix(rbac): use core api group for pods and nodes in cortex-pods role
kruegercharles Dec 10, 2025
6996709
add missing method to pipeline_controller.go
kruegercharles Dec 12, 2025
7f51310
instead of directly changing NodeName for existing pod, change binding
kruegercharles Dec 13, 2025
cc83a3d
rename scheduler 'cortex-pods' to 'cortex'
kruegercharles Dec 15, 2025
235df4d
bump charts version to 0.0.4
kruegercharles Dec 18, 2025
056f779
remove redundant crd dev_pods and dev_nodes
kruegercharles Dec 18, 2025
5efe68a
Merge reservations with scheduling (#435)
PhilippMatthes Dec 18, 2025
124c78e
implement support for pods
kruegercharles Nov 30, 2025
859a99c
fix linting issues
kruegercharles Dec 1, 2025
75688c8
use kubernetes builtin corev1.Nodes instead of custom podsv1alpha1.Nodes
kruegercharles Dec 8, 2025
6e86081
implement cortex support for kubernetes native gang scheduling
kruegercharles Dec 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ if 'pods' in ACTIVE_DEPLOYMENTS:
# Deploy example resources
k8s_yaml('samples/pods/node.yaml')
k8s_yaml('samples/pods/pod.yaml')
k8s_yaml('samples/pods/gang-scheduling.yaml')
k8s_resource('gang-pod-1', labels=['Cortex-Pods'])
k8s_resource('test-pod', labels=['Cortex-Pods'])

########### Dev Dependencies
Expand Down
2 changes: 2 additions & 0 deletions api/delegation/pods/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
type PodPipelineRequest struct {
// The available nodes.
Nodes []corev1.Node `json:"nodes"`
// The pod to schedule.
Pod *corev1.Pod `json:"pod"`
}

func (r PodPipelineRequest) GetSubjects() []string {
Expand Down
551 changes: 551 additions & 0 deletions config/crd/cortex.dev_nodes.yaml

Large diffs are not rendered by default.

9,806 changes: 9,806 additions & 0 deletions config/crd/cortex.dev_pods.yaml

Large diffs are not rendered by default.

558 changes: 558 additions & 0 deletions dist/chart/templates/crd/cortex.dev_nodes.yaml

Large diffs are not rendered by default.

9,813 changes: 9,813 additions & 0 deletions dist/chart/templates/crd/cortex.dev_pods.yaml

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions helm/bundles/cortex-pods/templates/pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ spec:
type: filter-weigher
createDecisions: true
steps:
- ref: { name: pods-gang }
mandatory: true
- ref: { name: pods-noop }
mandatory: false
13 changes: 13 additions & 0 deletions helm/bundles/cortex-pods/templates/steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,16 @@ spec:
This is only a passthrough step which lets all pod candidates through.
It is used as a placeholder step in the pods scheduler pipeline.
knowledges: []
---
apiVersion: cortex.cloud/v1alpha1
kind: Step
metadata:
name: pods-gang
spec:
operator: cortex
type: filter
impl: gang
description: |
This filter ensures that pods belonging to a PodGroup are only scheduled
if the PodGroup resource exists.
knowledges: []
113 changes: 113 additions & 0 deletions internal/scheduling/decisions/pods/gang_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package pods

import (
"context"
"fmt"
"log/slog"

"github.com/cobaltcore-dev/cortex/api/delegation/pods"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// GangFilter ensures that pods belonging to a PodGroup are only scheduled
// if the PodGroup resource exists.
type GangFilter struct {
client client.Client
}

func (f *GangFilter) Init(ctx context.Context, client client.Client, step v1alpha1.Step) error {
f.client = client
return nil
}

func (f *GangFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.StepResult, error) {
activations := make(map[string]float64, len(request.Nodes))
stats := make(map[string]lib.StepStatistics)

pod := request.Pod
if pod == nil {
traceLog.Warn("gang-filter: pod is nil in request")
return nil, fmt.Errorf("pod is nil in request")

Check failure on line 34 in internal/scheduling/decisions/pods/gang_filter.go

View workflow job for this annotation

GitHub Actions / Checks

error-format: fmt.Errorf can be replaced with errors.New (perfsprint)
}

// Check for Workload API
// Fetch the full pod object to inspect new fields if they are not in the struct
workloadName := ""
// Note: We cannot access pod.Spec.WorkloadRef directly if the struct is old.
// Use unstructured to attempt to find it.
uPod := &unstructured.Unstructured{}
uPod.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"})
if err := f.client.Get(context.Background(), client.ObjectKey{Name: pod.Name, Namespace: pod.Namespace}, uPod); err == nil {
val, found, _ := unstructured.NestedString(uPod.Object, "spec", "workloadRef", "name")

Check failure on line 45 in internal/scheduling/decisions/pods/gang_filter.go

View workflow job for this annotation

GitHub Actions / Checks

Error return value of `unstructured.NestedString` is not checked (errcheck)
if found {
workloadName = val
}
}

if workloadName != "" {
traceLog.Info("gang-filter: checking for workload", "workloadName", workloadName)
workload := &unstructured.Unstructured{}
workload.SetGroupVersionKind(schema.GroupVersionKind{
Group: "scheduling.k8s.io",
Version: "v1alpha1",
Kind: "Workload",
})
if err := f.client.Get(context.Background(), client.ObjectKey{Name: workloadName, Namespace: pod.Namespace}, workload); err != nil {
traceLog.Error("gang-filter: failed to fetch workload", "error", err)
// Deny all nodes if the gang resource is missing or cannot be fetched.
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}
traceLog.Info("gang-filter: workload found, allowing scheduling")
for _, node := range request.Nodes {
activations[node.Name] = 1.0
}
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}

// Fallback: Check if the pod belongs to a gang via Label
// We use the label "pod-group.scheduling.k8s.io/name" which is standard for gang scheduling.
gangName, ok := pod.Labels["pod-group.scheduling.k8s.io/name"]
if !ok {
// Not a gang pod, allow it.
for _, node := range request.Nodes {
activations[node.Name] = 1.0
}
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}

traceLog.Info("gang-filter: checking for pod group", "gangName", gangName)

// Fetch the PodGroup.
// We use Unstructured because the PodGroup CRD might not be compiled into this binary.
// We assume the group is scheduling.k8s.io
podGroup := &unstructured.Unstructured{}
podGroup.SetGroupVersionKind(schema.GroupVersionKind{
Group: "scheduling.k8s.io",
Version: "v1alpha1",
Kind: "PodGroup",
})

err := f.client.Get(context.Background(), client.ObjectKey{
Name: gangName,
Namespace: pod.Namespace,
}, podGroup)

if err != nil {
traceLog.Error("gang-filter: failed to fetch pod group", "error", err)
// Deny all nodes if the gang resource is missing or cannot be fetched.
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}

// If we found the PodGroup, we currently allow scheduling.
// In a full implementation, we would check 'minMember' and other status fields here.
traceLog.Info("gang-filter: pod group found, allowing scheduling")
for _, node := range request.Nodes {
activations[node.Name] = 1.0
}

return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}
30 changes: 15 additions & 15 deletions internal/scheduling/decisions/pods/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
return errors.New("pipeline not found or not ready")
}

// Fetch the pod to schedule.
pod := &corev1.Pod{}
if err := c.Get(ctx, client.ObjectKey{
Name: decision.Spec.PodRef.Name,
Namespace: decision.Spec.PodRef.Namespace,
}, pod); err != nil {
log.Error(err, "failed to fetch pod for decision")
return err
}
if pod.Spec.NodeName != "" {
log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName)
return nil
}

// Find all available nodes.
nodes := &corev1.NodeList{}
if err := c.List(ctx, nodes); err != nil {
Expand All @@ -135,7 +149,7 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
}

// Execute the scheduling pipeline.
request := pods.PodPipelineRequest{Nodes: nodes.Items}
request := pods.PodPipelineRequest{Nodes: nodes.Items, Pod: pod}
result, err := pipeline.Run(request)
if err != nil {
log.V(1).Error(err, "failed to run scheduler pipeline")
Expand All @@ -145,20 +159,6 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
decision.Status.Took = metav1.Duration{Duration: time.Since(startedAt)}
log.Info("decision processed successfully", "duration", time.Since(startedAt))

// Check if the pod is already assigned to a node.
pod := &corev1.Pod{}
if err := c.Get(ctx, client.ObjectKey{
Name: decision.Spec.PodRef.Name,
Namespace: decision.Spec.PodRef.Namespace,
}, pod); err != nil {
log.Error(err, "failed to fetch pod for decision")
return err
}
if pod.Spec.NodeName != "" {
log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName)
return nil
}

// Assign the first node returned by the pipeline using a Binding.
binding := &corev1.Binding{
ObjectMeta: metav1.ObjectMeta{
Expand Down
1 change: 1 addition & 0 deletions internal/scheduling/decisions/pods/supported_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ type PodStep = lib.Step[pods.PodPipelineRequest]
// The steps actually used by the scheduler are defined through the configuration file.
var supportedSteps = map[string]func() PodStep{
"noop": func() PodStep { return &NoopFilter{} },
"gang": func() PodStep { return &GangFilter{} },
}
26 changes: 26 additions & 0 deletions samples/pods/gang-scheduling.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: scheduling.k8s.io/v1alpha1
kind: Workload
metadata:
name: test-workload
namespace: default
spec:
podGroups:
- name: test-group
policy: gang
gang:
minCount: 2
---
apiVersion: v1
kind: Pod
metadata:
name: gang-pod
namespace: cortex-system
spec:
schedulerName: cortex
workloadRef:
name: test-workload
kind: Workload
apiGroup: scheduling.k8s.io
containers:
- name: nginx
image: nginx:latest
Loading