Skip to content

Commit 8f674ea

Browse files
implement cortex support for kubernetes native gang scheduling
1 parent ded3a09 commit 8f674ea

File tree

8 files changed

+174
-15
lines changed

8 files changed

+174
-15
lines changed

Tiltfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ if 'pods' in ACTIVE_DEPLOYMENTS:
186186
# Deploy example resources
187187
k8s_yaml('samples/pods/node.yaml')
188188
k8s_yaml('samples/pods/pod.yaml')
189+
k8s_yaml('samples/pods/gang-scheduling.yaml')
190+
k8s_resource('gang-pod-1', labels=['Cortex-Pods'])
189191
k8s_resource('test-pod', labels=['Cortex-Pods'])
190192

191193
########### Dev Dependencies

api/delegation/pods/messages.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
type PodPipelineRequest struct {
1313
// The available nodes.
1414
Nodes []corev1.Node `json:"nodes"`
15+
// The pod to schedule.
16+
Pod *corev1.Pod `json:"pod"`
1517
}
1618

1719
func (r PodPipelineRequest) GetSubjects() []string {

helm/bundles/cortex-pods/templates/pipelines.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@ spec:
1010
type: filter-weigher
1111
createDecisions: true
1212
steps:
13+
- ref: { name: pods-gang }
14+
mandatory: true
1315
- ref: { name: pods-noop }
1416
mandatory: false

helm/bundles/cortex-pods/templates/steps.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,16 @@ spec:
1111
This is only a passthrough step which lets all pod candidates through.
1212
It is used as a placeholder step in the pods scheduler pipeline.
1313
knowledges: []
14+
---
15+
apiVersion: cortex.cloud/v1alpha1
16+
kind: Step
17+
metadata:
18+
name: pods-gang
19+
spec:
20+
operator: cortex
21+
type: filter
22+
impl: gang
23+
description: |
24+
This filter ensures that pods belonging to a PodGroup are only scheduled
25+
if the PodGroup resource exists.
26+
knowledges: []
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package pods
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
8+
"github.com/cobaltcore-dev/cortex/api/delegation/pods"
9+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
10+
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
11+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
14+
)
15+
16+
// GangFilter ensures that pods belonging to a PodGroup are only scheduled
17+
// if the PodGroup resource exists.
18+
type GangFilter struct {
19+
client client.Client
20+
}
21+
22+
func (f *GangFilter) Init(ctx context.Context, client client.Client, step v1alpha1.Step) error {
23+
f.client = client
24+
return nil
25+
}
26+
27+
func (f *GangFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.StepResult, error) {
28+
activations := make(map[string]float64, len(request.Nodes))
29+
stats := make(map[string]lib.StepStatistics)
30+
31+
pod := request.Pod
32+
if pod == nil {
33+
traceLog.Warn("gang-filter: pod is nil in request")
34+
return nil, fmt.Errorf("pod is nil in request")
35+
}
36+
37+
// Check for Workload API
38+
// Fetch the full pod object to inspect new fields if they are not in the struct
39+
workloadName := ""
40+
// Note: We cannot access pod.Spec.WorkloadRef directly if the struct is old.
41+
// Use unstructured to attempt to find it.
42+
uPod := &unstructured.Unstructured{}
43+
uPod.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"})
44+
if err := f.client.Get(context.Background(), client.ObjectKey{Name: pod.Name, Namespace: pod.Namespace}, uPod); err == nil {
45+
val, found, _ := unstructured.NestedString(uPod.Object, "spec", "workloadRef", "name")
46+
if found {
47+
workloadName = val
48+
}
49+
}
50+
51+
if workloadName != "" {
52+
traceLog.Info("gang-filter: checking for workload", "workloadName", workloadName)
53+
workload := &unstructured.Unstructured{}
54+
workload.SetGroupVersionKind(schema.GroupVersionKind{
55+
Group: "scheduling.k8s.io",
56+
Version: "v1alpha1",
57+
Kind: "Workload",
58+
})
59+
if err := f.client.Get(context.Background(), client.ObjectKey{Name: workloadName, Namespace: pod.Namespace}, workload); err != nil {
60+
traceLog.Error("gang-filter: failed to fetch workload", "error", err)
61+
// Deny all nodes if the gang resource is missing or cannot be fetched.
62+
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
63+
}
64+
traceLog.Info("gang-filter: workload found, allowing scheduling")
65+
for _, node := range request.Nodes {
66+
activations[node.Name] = 1.0
67+
}
68+
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
69+
}
70+
71+
// Fallback: Check if the pod belongs to a gang via Label
72+
// We use the label "pod-group.scheduling.k8s.io/name" which is standard for gang scheduling.
73+
gangName, ok := pod.Labels["pod-group.scheduling.k8s.io/name"]
74+
if !ok {
75+
// Not a gang pod, allow it.
76+
for _, node := range request.Nodes {
77+
activations[node.Name] = 1.0
78+
}
79+
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
80+
}
81+
82+
traceLog.Info("gang-filter: checking for pod group", "gangName", gangName)
83+
84+
// Fetch the PodGroup.
85+
// We use Unstructured because the PodGroup CRD might not be compiled into this binary.
86+
// We assume the group is scheduling.k8s.io
87+
podGroup := &unstructured.Unstructured{}
88+
podGroup.SetGroupVersionKind(schema.GroupVersionKind{
89+
Group: "scheduling.k8s.io",
90+
Version: "v1alpha1",
91+
Kind: "PodGroup",
92+
})
93+
94+
err := f.client.Get(context.Background(), client.ObjectKey{
95+
Name: gangName,
96+
Namespace: pod.Namespace,
97+
}, podGroup)
98+
99+
if err != nil {
100+
traceLog.Error("gang-filter: failed to fetch pod group", "error", err)
101+
// Deny all nodes if the gang resource is missing or cannot be fetched.
102+
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
103+
}
104+
105+
// If we found the PodGroup, we currently allow scheduling.
106+
// In a full implementation, we would check 'minMember' and other status fields here.
107+
traceLog.Info("gang-filter: pod group found, allowing scheduling")
108+
for _, node := range request.Nodes {
109+
activations[node.Name] = 1.0
110+
}
111+
112+
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
113+
}

internal/scheduling/decisions/pods/pipeline_controller.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
125125
return errors.New("pipeline not found or not ready")
126126
}
127127

128+
// Fetch the pod to schedule.
129+
pod := &corev1.Pod{}
130+
if err := c.Get(ctx, client.ObjectKey{
131+
Name: decision.Spec.PodRef.Name,
132+
Namespace: decision.Spec.PodRef.Namespace,
133+
}, pod); err != nil {
134+
log.Error(err, "failed to fetch pod for decision")
135+
return err
136+
}
137+
if pod.Spec.NodeName != "" {
138+
log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName)
139+
return nil
140+
}
141+
128142
// Find all available nodes.
129143
nodes := &corev1.NodeList{}
130144
if err := c.List(ctx, nodes); err != nil {
@@ -135,7 +149,7 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
135149
}
136150

137151
// Execute the scheduling pipeline.
138-
request := pods.PodPipelineRequest{Nodes: nodes.Items}
152+
request := pods.PodPipelineRequest{Nodes: nodes.Items, Pod: pod}
139153
result, err := pipeline.Run(request)
140154
if err != nil {
141155
log.V(1).Error(err, "failed to run scheduler pipeline")
@@ -145,20 +159,6 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al
145159
decision.Status.Took = metav1.Duration{Duration: time.Since(startedAt)}
146160
log.Info("decision processed successfully", "duration", time.Since(startedAt))
147161

148-
// Check if the pod is already assigned to a node.
149-
pod := &corev1.Pod{}
150-
if err := c.Get(ctx, client.ObjectKey{
151-
Name: decision.Spec.PodRef.Name,
152-
Namespace: decision.Spec.PodRef.Namespace,
153-
}, pod); err != nil {
154-
log.Error(err, "failed to fetch pod for decision")
155-
return err
156-
}
157-
if pod.Spec.NodeName != "" {
158-
log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName)
159-
return nil
160-
}
161-
162162
// Assign the first node returned by the pipeline using a Binding.
163163
binding := &corev1.Binding{
164164
ObjectMeta: metav1.ObjectMeta{

internal/scheduling/decisions/pods/supported_steps.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ type PodStep = lib.Step[pods.PodPipelineRequest]
1414
// The steps actually used by the scheduler are defined through the configuration file.
1515
var supportedSteps = map[string]func() PodStep{
1616
"noop": func() PodStep { return &NoopFilter{} },
17+
"gang": func() PodStep { return &GangFilter{} },
1718
}

samples/pods/gang-scheduling.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
apiVersion: scheduling.k8s.io/v1alpha1
2+
kind: Workload
3+
metadata:
4+
name: test-workload
5+
namespace: default
6+
spec:
7+
podGroups:
8+
- name: test-group
9+
policy: gang
10+
gang:
11+
minCount: 2
12+
---
13+
apiVersion: v1
14+
kind: Pod
15+
metadata:
16+
name: gang-pod
17+
namespace: cortex-system
18+
spec:
19+
schedulerName: cortex
20+
workloadRef:
21+
name: test-workload
22+
kind: Workload
23+
apiGroup: scheduling.k8s.io
24+
containers:
25+
- name: nginx
26+
image: nginx:latest

0 commit comments

Comments
 (0)