Skip to content

Commit 8007e3a

Browse files
kevin85421KunWuLuanyueming.wk
authored
support scheduler plugins (#3612) (#3766)
* support scheduler plugins * add unit test update ValidateBatchSchedulerConfig() update helm chart Rename the function. * Update the role in helm chart. And ensure the crd is installed before start the operator. * Fix CI lint * Modify ray version in raycluster sample. * Update the scheduler name * Update the test * update and add TODOs --------- Signed-off-by: kunwuluan <[email protected]> Signed-off-by: KunWuLuan <[email protected]> Signed-off-by: kaihsun <[email protected]> Co-authored-by: GreenHand <[email protected]> Co-authored-by: yueming.wk <[email protected]>
1 parent 983927d commit 8007e3a

File tree

9 files changed

+317
-6
lines changed

9 files changed

+317
-6
lines changed

helm-chart/kuberay-operator/templates/_helpers.tpl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,4 +308,15 @@ rules:
308308
verbs:
309309
- get
310310
{{- end -}}
311+
{{- if or .batchSchedulerEnabled (eq .batchSchedulerName "scheduler-plugins") }}
312+
- apiGroups:
313+
- scheduling.x-k8s.io
314+
resources:
315+
- podgroups
316+
verbs:
317+
- create
318+
- get
319+
- list
320+
- watch
321+
{{- end -}}
311322
{{- end -}}

helm-chart/kuberay-operator/values.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ logging:
5050
# by the customized scheduler.
5151
# * "enabled" is the legacy option and will be deprecated soon.
5252
# * "name" is the standard option, expecting a scheduler name, supported values are
53-
# "default", "volcano", and "yunikorn".
53+
# "default", "volcano", "yunikorn", and "scheduler-plugins".
5454
#
5555
# Note: "enabled" and "name" should not be set at the same time. If both are set, an error will be thrown.
5656
#
@@ -67,6 +67,10 @@ logging:
6767
# batchScheduler:
6868
# name: yunikorn
6969
#
70+
# 4. Use PodGroup
71+
# batchScheduler:
72+
# name: scheduler-plugins
73+
#
7074
batchScheduler:
7175
# Deprecated. This option will be removed in the future.
7276
# Note, for backwards compatibility. When it sets to true, it enables volcano scheduler integration.

ray-operator/apis/config/v1alpha1/config_utils.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/go-logr/logr"
77

8+
schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins"
89
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
910
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
1011
)
@@ -22,7 +23,7 @@ func ValidateBatchSchedulerConfig(logger logr.Logger, config Configuration) erro
2223

2324
if len(config.BatchScheduler) > 0 {
2425
// if a customized scheduler is configured, check it is supported
25-
if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() {
26+
if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() || config.BatchScheduler == schedulerplugins.GetPluginName() {
2627
logger.Info("Feature flag batch-scheduler is enabled",
2728
"scheduler name", config.BatchScheduler)
2829
} else {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
apiVersion: ray.io/v1
2+
kind: RayCluster
3+
metadata:
4+
name: test-podgroup-0
5+
labels:
6+
ray.io/gang-scheduling-enabled: "true"
7+
spec:
8+
rayVersion: "2.46.0"
9+
headGroupSpec:
10+
rayStartParams: {}
11+
template:
12+
spec:
13+
containers:
14+
- name: ray-head
15+
image: rayproject/ray:2.46.0
16+
resources:
17+
limits:
18+
cpu: "1"
19+
memory: "2Gi"
20+
requests:
21+
cpu: "1"
22+
memory: "2Gi"
23+
workerGroupSpecs:
24+
- groupName: worker
25+
rayStartParams: {}
26+
replicas: 2
27+
minReplicas: 2
28+
maxReplicas: 2
29+
template:
30+
spec:
31+
containers:
32+
- name: ray-head
33+
image: rayproject/ray:2.46.0
34+
resources:
35+
limits:
36+
cpu: "1"
37+
memory: "1Gi"
38+
requests:
39+
cpu: "1"
40+
memory: "1Gi"
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package schedulerplugins
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
corev1 "k8s.io/api/core/v1"
8+
"k8s.io/apimachinery/pkg/api/errors"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/runtime"
11+
ktypes "k8s.io/apimachinery/pkg/types"
12+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
13+
"k8s.io/client-go/rest"
14+
"sigs.k8s.io/controller-runtime/pkg/builder"
15+
"sigs.k8s.io/controller-runtime/pkg/cache"
16+
"sigs.k8s.io/controller-runtime/pkg/client"
17+
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
18+
19+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
20+
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
21+
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
22+
)
23+
24+
const (
25+
schedulerName string = "scheduler-plugins"
26+
kubeSchedulerPodGroupLabelKey string = "scheduling.x-k8s.io/pod-group"
27+
)
28+
29+
type KubeScheduler struct {
30+
cli client.Client
31+
}
32+
33+
type KubeSchedulerFactory struct{}
34+
35+
func GetPluginName() string {
36+
return schedulerName
37+
}
38+
39+
func (k *KubeScheduler) Name() string {
40+
return GetPluginName()
41+
}
42+
43+
func createPodGroup(_ context.Context, app *rayv1.RayCluster) *v1alpha1.PodGroup {
44+
// we set replica as 1 for the head pod
45+
replica := int32(1)
46+
for _, workerGroup := range app.Spec.WorkerGroupSpecs {
47+
if workerGroup.Replicas == nil {
48+
continue
49+
}
50+
// TODO(kevin85421): We should consider the case of `numOfHosts` is not 1.
51+
replica += *workerGroup.Replicas
52+
}
53+
54+
podGroup := &v1alpha1.PodGroup{
55+
ObjectMeta: metav1.ObjectMeta{
56+
Namespace: app.Namespace,
57+
Name: app.Name,
58+
OwnerReferences: []metav1.OwnerReference{
59+
{
60+
Name: app.Name,
61+
UID: app.UID,
62+
APIVersion: app.APIVersion,
63+
Kind: app.Kind,
64+
},
65+
},
66+
},
67+
Spec: v1alpha1.PodGroupSpec{
68+
MinMember: replica,
69+
MinResources: utils.CalculateDesiredResources(app),
70+
},
71+
}
72+
return podGroup
73+
}
74+
75+
func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error {
76+
if !k.isGangSchedulingEnabled(app) {
77+
return nil
78+
}
79+
podGroup := &v1alpha1.PodGroup{}
80+
if err := k.cli.Get(ctx, ktypes.NamespacedName{Namespace: app.Namespace, Name: app.Name}, podGroup); err != nil {
81+
if !errors.IsNotFound(err) {
82+
return err
83+
}
84+
podGroup = createPodGroup(ctx, app)
85+
if err := k.cli.Create(ctx, podGroup); err != nil {
86+
if errors.IsAlreadyExists(err) {
87+
return nil
88+
}
89+
return fmt.Errorf("failed to create PodGroup: %w", err)
90+
}
91+
}
92+
return nil
93+
}
94+
95+
// AddMetadataToPod adds essential labels and annotations to the Ray pods
96+
// the scheduler needs these labels and annotations in order to do the scheduling properly
97+
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
98+
// when gang scheduling is enabled, extra labels need to be added to all pods
99+
if k.isGangSchedulingEnabled(app) {
100+
pod.Labels[kubeSchedulerPodGroupLabelKey] = app.Name
101+
}
102+
// TODO(kevin85421): Currently, we only support "single scheduler" mode. If we want to support
103+
// "second scheduler" mode, we need to add `schedulerName` to the pod spec.
104+
}
105+
106+
func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
107+
_, exist := app.Labels[utils.RayClusterGangSchedulingEnabled]
108+
return exist
109+
}
110+
111+
func (kf *KubeSchedulerFactory) New(ctx context.Context, c *rest.Config) (schedulerinterface.BatchScheduler, error) {
112+
// TODO(kevin85421): We should not initialize the informer cache here. We should reuse
113+
// the reconciler's cache instead.
114+
scheme := runtime.NewScheme()
115+
utilruntime.Must(v1alpha1.AddToScheme(scheme))
116+
ccache, err := cache.New(c, cache.Options{
117+
Scheme: scheme,
118+
})
119+
if err != nil {
120+
return nil, err
121+
}
122+
go func() {
123+
if err := ccache.Start(ctx); err != nil {
124+
panic(err)
125+
}
126+
}()
127+
if synced := ccache.WaitForCacheSync(ctx); !synced {
128+
return nil, fmt.Errorf("failed to sync cache")
129+
}
130+
cli, err := client.New(c, client.Options{
131+
Scheme: scheme,
132+
Cache: &client.CacheOptions{
133+
Reader: ccache,
134+
},
135+
})
136+
if err != nil {
137+
return nil, err
138+
}
139+
return &KubeScheduler{
140+
cli: cli,
141+
}, nil
142+
}
143+
144+
func (kf *KubeSchedulerFactory) AddToScheme(sche *runtime.Scheme) {
145+
utilruntime.Must(v1alpha1.AddToScheme(sche))
146+
}
147+
148+
func (kf *KubeSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder {
149+
return b
150+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package schedulerplugins
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
corev1 "k8s.io/api/core/v1"
9+
"k8s.io/apimachinery/pkg/api/resource"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/utils/ptr"
12+
13+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
14+
)
15+
16+
func createTestRayCluster(numOfHosts int32) rayv1.RayCluster {
17+
headSpec := corev1.PodSpec{
18+
Containers: []corev1.Container{
19+
{
20+
Name: "ray-head",
21+
Resources: corev1.ResourceRequirements{
22+
Limits: corev1.ResourceList{
23+
corev1.ResourceCPU: resource.MustParse("500m"),
24+
corev1.ResourceMemory: resource.MustParse("512Mi"),
25+
},
26+
Requests: corev1.ResourceList{
27+
corev1.ResourceCPU: resource.MustParse("256m"),
28+
corev1.ResourceMemory: resource.MustParse("256Mi"),
29+
},
30+
},
31+
},
32+
},
33+
}
34+
35+
workerSpec := corev1.PodSpec{
36+
Containers: []corev1.Container{
37+
{
38+
Name: "ray-worker",
39+
Resources: corev1.ResourceRequirements{
40+
Limits: corev1.ResourceList{
41+
corev1.ResourceCPU: resource.MustParse("500m"),
42+
corev1.ResourceMemory: resource.MustParse("512Mi"),
43+
"nvidia.com/gpu": resource.MustParse("1"),
44+
},
45+
Requests: corev1.ResourceList{
46+
corev1.ResourceCPU: resource.MustParse("256m"),
47+
corev1.ResourceMemory: resource.MustParse("256Mi"),
48+
},
49+
},
50+
},
51+
},
52+
}
53+
54+
return rayv1.RayCluster{
55+
ObjectMeta: metav1.ObjectMeta{
56+
Name: "raycluster-sample",
57+
Namespace: "default",
58+
},
59+
Spec: rayv1.RayClusterSpec{
60+
HeadGroupSpec: rayv1.HeadGroupSpec{
61+
Template: corev1.PodTemplateSpec{
62+
Spec: headSpec,
63+
},
64+
},
65+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
66+
{
67+
Template: corev1.PodTemplateSpec{
68+
Spec: workerSpec,
69+
},
70+
Replicas: ptr.To[int32](2),
71+
NumOfHosts: numOfHosts,
72+
MinReplicas: ptr.To[int32](1),
73+
MaxReplicas: ptr.To[int32](4),
74+
},
75+
},
76+
},
77+
}
78+
}
79+
80+
func TestCreatePodGroup(t *testing.T) {
81+
a := assert.New(t)
82+
83+
cluster := createTestRayCluster(1)
84+
85+
podGroup := createPodGroup(context.TODO(), &cluster)
86+
87+
// 256m * 3 (requests, not limits)
88+
a.Equal("768m", podGroup.Spec.MinResources.Cpu().String())
89+
90+
// 256Mi * 3 (requests, not limits)
91+
a.Equal("768Mi", podGroup.Spec.MinResources.Memory().String())
92+
93+
// 2 GPUs total
94+
a.Equal("2", podGroup.Spec.MinResources.Name("nvidia.com/gpu", resource.BinarySI).String())
95+
96+
// 1 head and 2 workers
97+
a.Equal(int32(3), podGroup.Spec.MinMember)
98+
}

ray-operator/controllers/ray/batchscheduler/schedulermanager.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
1313
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
14+
schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins"
1415
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
1516
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
1617
)
@@ -58,6 +59,8 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface
5859
factory = &volcano.VolcanoBatchSchedulerFactory{}
5960
case yunikorn.GetPluginName():
6061
factory = &yunikorn.YuniKornSchedulerFactory{}
62+
case schedulerplugins.GetPluginName():
63+
factory = &schedulerplugins.KubeSchedulerFactory{}
6164
default:
6265
return nil, fmt.Errorf("the scheduler is not supported, name=%s", rayConfigs.BatchScheduler)
6366
}

ray-operator/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ require (
2727
k8s.io/klog/v2 v2.130.1
2828
k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979
2929
sigs.k8s.io/controller-runtime v0.21.0
30+
sigs.k8s.io/scheduler-plugins v0.31.8
3031
sigs.k8s.io/structured-merge-diff/v4 v4.7.0
3132
sigs.k8s.io/yaml v1.4.0
3233
volcano.sh/apis v1.12.1
@@ -60,7 +61,7 @@ require (
6061
github.com/modern-go/reflect2 v1.0.2 // indirect
6162
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
6263
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
63-
github.com/pmezard/go-difflib v1.0.0 // indirect
64+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
6465
github.com/prometheus/client_model v0.6.1 // indirect
6566
github.com/prometheus/common v0.62.0 // indirect
6667
github.com/prometheus/procfs v0.15.1 // indirect

0 commit comments

Comments
 (0)