diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml index fa8be11047f..35d7c912eb0 100644 --- a/helm-chart/kuberay-operator/values.yaml +++ b/helm-chart/kuberay-operator/values.yaml @@ -70,12 +70,16 @@ logging: # 4. Use PodGroup # batchScheduler: # name: scheduler-plugins -# + +# 5. Use Kai Scheduler +# batchScheduler: +# name: kai-scheduler + batchScheduler: # Deprecated. This option will be removed in the future. # Note, for backwards compatibility. When it sets to true, it enables volcano scheduler integration. enabled: false - # Set the customized scheduler name, supported values are "volcano", "yunikorn" or "scheduler-plugins", do not set + # Set the customized scheduler name, supported values are "volcano", "yunikorn", "kai-scheduler" or "scheduler-plugins", do not set # "batchScheduler.enabled=true" at the same time as it will override this option. name: "" diff --git a/ray-operator/apis/config/v1alpha1/config_utils.go b/ray-operator/apis/config/v1alpha1/config_utils.go index 53e4f02a581..24ed417ce93 100644 --- a/ray-operator/apis/config/v1alpha1/config_utils.go +++ b/ray-operator/apis/config/v1alpha1/config_utils.go @@ -5,6 +5,7 @@ import ( "github.com/go-logr/logr" + kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler" schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" @@ -23,7 +24,7 @@ func ValidateBatchSchedulerConfig(logger logr.Logger, config Configuration) erro if len(config.BatchScheduler) > 0 { // if a customized scheduler is configured, check it is supported - if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() || config.BatchScheduler == schedulerplugins.GetPluginName() { + if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() || config.BatchScheduler == schedulerplugins.GetPluginName() || config.BatchScheduler == kaischeduler.GetPluginName() { logger.Info("Feature flag batch-scheduler is enabled", "scheduler name", config.BatchScheduler) } else { diff --git a/ray-operator/apis/config/v1alpha1/config_utils_test.go b/ray-operator/apis/config/v1alpha1/config_utils_test.go index f6a73cbbbb0..76b983eb75e 100644 --- a/ray-operator/apis/config/v1alpha1/config_utils_test.go +++ b/ray-operator/apis/config/v1alpha1/config_utils_test.go @@ -6,6 +6,7 @@ import ( "github.com/go-logr/logr" "github.com/go-logr/logr/testr" + kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler" schedulerPlugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" @@ -71,6 +72,16 @@ func TestValidateBatchSchedulerConfig(t *testing.T) { }, wantErr: false, }, + { + name: "valid option, batch-scheduler=kai-scheduler", + args: args{ + logger: testr.New(t), + config: Configuration{ + BatchScheduler: kaischeduler.GetPluginName(), + }, + }, + wantErr: false, + }, { name: "invalid option, invalid scheduler name", args: args{ diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 975521242b9..c58864d85ff 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -44,7 +44,7 @@ type Configuration struct { LogStdoutEncoder string `json:"logStdoutEncoder,omitempty"` // BatchScheduler enables the batch scheduler integration with a specific scheduler - // based on the given name, currently, supported values are volcano and yunikorn. + // based on the given name, currently, supported values are volcano, yunikorn, kai-scheduler. BatchScheduler string `json:"batchScheduler,omitempty"` // HeadSidecarContainers includes specification for a sidecar container diff --git a/ray-operator/config/samples/ray-cluster.kai-gpu-sharing.yaml b/ray-operator/config/samples/ray-cluster.kai-gpu-sharing.yaml new file mode 100644 index 00000000000..3aa8b961186 --- /dev/null +++ b/ray-operator/config/samples/ray-cluster.kai-gpu-sharing.yaml @@ -0,0 +1,98 @@ +# KAI Scheduler Example - GPU Sharing with RayCluster +# KAI Scheduler uses a hierarchical queue system for resource management and fair sharing. +# These queues must be created before any RayCluster can be scheduled by KAI. + +# NOTE: This is a DEMO configuration with unlimited quotas (-1) for easy testing. +# In real-world deployments, you should set appropriate CPU/GPU/memory quotas and limits +# based on your cluster's actual resources and organizational needs. + +# GPU Sharing Note: This example utilizes time slicing GPU sharing. +# KAI Scheduler also supports MPS and other GPU sharing methods. +# For more information, check the KAI Scheduler documentation. + +# Parent queue: Represents a department or high-level organizational unit +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: department-1 +spec: + # priority: 100 # Optional: Higher priority queues get surplus resources first + resources: + # quota: Guaranteed resources for this queue + # limit: Maximum resources this queue can use + # overQuotaWeight: How surplus resources are shared among queues + # Note: Using -1 (unlimited) for demo purposes + cpu: + quota: -1 + limit: -1 + overQuotaWeight: 1 + gpu: + quota: -1 + limit: -1 + overQuotaWeight: 1 + memory: + quota: -1 + limit: -1 + overQuotaWeight: 1 +--- +# Child queue: Represents a team within the department-1 +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: team-a +spec: + parentQueue: department-1 # Inherits from parent queue + # priority: 50 # Optional: Team priority within department + resources: + # quota: Guaranteed resources for this queue + # limit: Maximum resources this queue can use + # overQuotaWeight: How surplus resources are shared among queues + # Note: Using -1 (unlimited) for demo purposes + cpu: + quota: -1 + limit: -1 + overQuotaWeight: 1 + gpu: + quota: -1 + limit: -1 + overQuotaWeight: 1 + memory: + quota: -1 + limit: -1 + overQuotaWeight: 1 +--- +# RayCluster with KAI Scheduler and GPU Sharing +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: raycluster-half-gpu + labels: + kai.scheduler/queue: team-a # REQUIRED: Queue assignment for scheduling +spec: + headGroupSpec: + template: + spec: + containers: + - name: head + image: rayproject/ray:2.46.0 + resources: + limits: + cpu: "1" + memory: "2Gi" + # ---- Two workers share one GPU (0.5 each) ---- + workerGroupSpecs: + - groupName: shared-gpu + replicas: 2 + minReplicas: 2 + template: + metadata: + annotations: + gpu-fraction: "0.5" # Request 0.5 GPU per pod (two pods share one GPU) + spec: + containers: + - name: worker + image: rayproject/ray:2.46.0 + resources: + limits: + cpu: "1" + memory: "2Gi" diff --git a/ray-operator/config/samples/ray-cluster.kai-scheduler.yaml b/ray-operator/config/samples/ray-cluster.kai-scheduler.yaml new file mode 100644 index 00000000000..da5507fcc3b --- /dev/null +++ b/ray-operator/config/samples/ray-cluster.kai-scheduler.yaml @@ -0,0 +1,90 @@ +# KAI Scheduler Example - Basic RayCluster +# KAI Scheduler uses a hierarchical queue system for resource management and fair sharing. +# These queues must be created before any RayCluster can be scheduled by KAI. + +# NOTE: This is a DEMO configuration with unlimited quotas (-1) for easy testing. +# In real-world deployments, you should set appropriate CPU/GPU/memory quotas and limits +# based on your cluster's actual resources and organizational needs. + +# Parent queue: Represents a department or high-level organizational unit +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: department-1 +spec: + # priority: 100 # Optional: Higher priority queues get surplus resources first + resources: + # quota: Guaranteed resources for this queue + # limit: Maximum resources this queue can use + # overQuotaWeight: How surplus resources are shared among queues + # Note: Using -1 (unlimited) for demo purposes + cpu: + quota: -1 + limit: -1 + overQuotaWeight: 1 + gpu: + quota: -1 + limit: -1 + overQuotaWeight: 1 + memory: + quota: -1 + limit: -1 + overQuotaWeight: 1 +--- +# Child queue: Represents a team within the department-1 +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: team-a +spec: + parentQueue: department-1 # Inherits from parent queue + # priority: 50 # Optional: Team priority within department + resources: + # quota: Guaranteed resources for this queue + # limit: Maximum resources this queue can use + # overQuotaWeight: How surplus resources are shared among queues + # Note: Using -1 (unlimited) for demo purposes + cpu: + quota: -1 + limit: -1 + overQuotaWeight: 1 + gpu: + quota: -1 + limit: -1 + overQuotaWeight: 1 + memory: + quota: -1 + limit: -1 + overQuotaWeight: 1 +--- +# RayCluster with KAI Scheduler +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: raycluster-sample + labels: + kai.scheduler/queue: team-a # REQUIRED: Queue assignment for scheduling +spec: + headGroupSpec: + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.46.0 + resources: + requests: + cpu: "1" + memory: "2Gi" + workerGroupSpecs: + - groupName: worker + replicas: 2 + minReplicas: 2 + template: + spec: + containers: + - name: ray-worker + image: rayproject/ray:2.46.0 + resources: + requests: + cpu: "1" + memory: "1Gi" diff --git a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go new file mode 100644 index 00000000000..8ea57d2f5d0 --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go @@ -0,0 +1,70 @@ +package kaischeduler + +// This KAI plugin relies on KAI-Scheduler's +// built-in PodGrouper to create PodGroups at +// runtime, so the plugin itself only needs to: +// 1. expose the scheduler name, +// 2. stamp pods with schedulerName + queue label. +// No PodGroup create/patch logic is included. + +import ( + "context" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" +) + +const ( + QueueLabelName = "kai.scheduler/queue" +) + +type KaiScheduler struct { + log logr.Logger +} + +type KaiSchedulerFactory struct{} + +func GetPluginName() string { return "kai-scheduler" } + +func (k *KaiScheduler) Name() string { return GetPluginName() } + +func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error { + return nil +} + +func (k *KaiScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) { + pod.Spec.SchedulerName = k.Name() + + queue, ok := app.Labels[QueueLabelName] + if !ok || queue == "" { + k.log.Info("Queue label missing from RayCluster; pods will remain pending", + "requiredLabel", QueueLabelName, + "rayCluster", app.Name) + return + } + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[QueueLabelName] = queue +} + +func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { + return &KaiScheduler{ + log: logf.Log.WithName("kai-scheduler"), + }, nil +} + +func (kf *KaiSchedulerFactory) AddToScheme(_ *runtime.Scheme) { +} + +func (kf *KaiSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { + return b +} diff --git a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go new file mode 100644 index 00000000000..ed9eaf9549a --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go @@ -0,0 +1,141 @@ +package kaischeduler + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" +) + +func createTestRayCluster(labels map[string]string) *rayv1.RayCluster { + return &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + Labels: labels, + }, + } +} + +func createTestPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Labels: map[string]string{ + "ray.io/cluster": "test-cluster", + "ray.io/node-type": "worker", + "app": "ray", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "ray-worker", + Image: "rayproject/ray:latest", + }}, + }, + } +} + +func TestAddMetadataToPod_WithQueueLabel(t *testing.T) { + a := assert.New(t) + scheduler := &KaiScheduler{} + ctx := context.Background() + + // Create RayCluster with queue label + rayCluster := createTestRayCluster(map[string]string{ + QueueLabelName: "test-queue", + }) + pod := createTestPod() + + // Call AddMetadataToPod + scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) + + // Assert scheduler name is set to kai-scheduler + a.Equal("kai-scheduler", pod.Spec.SchedulerName) + + // Assert queue label is propagated to pod + a.NotNil(pod.Labels) + a.Equal("test-queue", pod.Labels[QueueLabelName]) +} + +func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) { + a := assert.New(t) + scheduler := &KaiScheduler{} + ctx := context.Background() + + // Create RayCluster without queue label + rayCluster := createTestRayCluster(map[string]string{}) + pod := createTestPod() + + // Call AddMetadataToPod + scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) + + // Assert scheduler name is still set (always required) + a.Equal("kai-scheduler", pod.Spec.SchedulerName) + + // Assert queue label is not added to pod when missing from RayCluster + if pod.Labels != nil { + _, exists := pod.Labels[QueueLabelName] + a.False(exists) + } +} + +func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) { + a := assert.New(t) + scheduler := &KaiScheduler{} + ctx := context.Background() + + // Create RayCluster with empty queue label + rayCluster := createTestRayCluster(map[string]string{ + QueueLabelName: "", + }) + pod := createTestPod() + + // Call AddMetadataToPod + scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) + + // Assert scheduler name is still set + a.Equal("kai-scheduler", pod.Spec.SchedulerName) + + // Assert empty queue label is treated as missing + if pod.Labels != nil { + _, exists := pod.Labels[QueueLabelName] + a.False(exists) + } +} + +func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) { + a := assert.New(t) + scheduler := &KaiScheduler{} + ctx := context.Background() + + // Create RayCluster with queue label + rayCluster := createTestRayCluster(map[string]string{ + QueueLabelName: "test-queue", + }) + + // Create pod with existing labels + pod := createTestPod() + pod.Labels = map[string]string{ + "existing-label": "existing-value", + "app": "ray", + } + + // Call AddMetadataToPod + scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) + + // Assert scheduler name is set + a.Equal("kai-scheduler", pod.Spec.SchedulerName) + + // Assert queue label is added + a.Equal("test-queue", pod.Labels[QueueLabelName]) + + // Assert existing labels are preserved + a.Equal("existing-value", pod.Labels["existing-label"]) + a.Equal("ray", pod.Labels["app"]) +} diff --git a/ray-operator/controllers/ray/batchscheduler/schedulermanager.go b/ray-operator/controllers/ray/batchscheduler/schedulermanager.go index 050c08b5fc8..42d0a8e8aa4 100644 --- a/ray-operator/controllers/ray/batchscheduler/schedulermanager.go +++ b/ray-operator/controllers/ray/batchscheduler/schedulermanager.go @@ -12,6 +12,7 @@ import ( configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler" schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" @@ -60,6 +61,8 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface factory = &volcano.VolcanoBatchSchedulerFactory{} case yunikorn.GetPluginName(): factory = &yunikorn.YuniKornSchedulerFactory{} + case kaischeduler.GetPluginName(): + factory = &kaischeduler.KaiSchedulerFactory{} case schedulerplugins.GetPluginName(): factory = &schedulerplugins.KubeSchedulerFactory{} default: diff --git a/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go b/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go index 193c00b81e0..6f1bc4f9627 100644 --- a/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go +++ b/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go @@ -8,6 +8,7 @@ import ( "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" ) @@ -16,6 +17,7 @@ func TestGetSchedulerFactory(t *testing.T) { DefaultFactory := &schedulerinterface.DefaultBatchSchedulerFactory{} VolcanoFactory := &volcano.VolcanoBatchSchedulerFactory{} YuniKornFactory := &yunikorn.YuniKornSchedulerFactory{} + KaiFactory := &kaischeduler.KaiSchedulerFactory{} type args struct { rayConfigs v1alpha1.Configuration @@ -65,6 +67,16 @@ func TestGetSchedulerFactory(t *testing.T) { }, want: reflect.TypeOf(VolcanoFactory), }, + { + name: "enableBatchScheduler=false, batchScheduler set to kai-scheduler", + args: args{ + rayConfigs: v1alpha1.Configuration{ + EnableBatchScheduler: false, + BatchScheduler: kaischeduler.GetPluginName(), + }, + }, + want: reflect.TypeOf(KaiFactory), + }, { name: "enableBatchScheduler not set, batchScheduler set to yunikorn", args: args{ @@ -83,6 +95,15 @@ func TestGetSchedulerFactory(t *testing.T) { }, want: reflect.TypeOf(VolcanoFactory), }, + { + name: "enableBatchScheduler not set, batchScheduler set to kai-scheduler", + args: args{ + rayConfigs: v1alpha1.Configuration{ + BatchScheduler: kaischeduler.GetPluginName(), + }, + }, + want: reflect.TypeOf(KaiFactory), + }, { name: "enableBatchScheduler not set, batchScheduler set to unknown value", args: args{ diff --git a/ray-operator/main.go b/ray-operator/main.go index 1f1aa717c0b..c74da9e75c1 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -94,7 +94,7 @@ func main() { flag.BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "(Deprecated) Enable batch scheduler. Currently is volcano, which supports gang scheduler policy. Please use --batch-scheduler instead.") flag.StringVar(&batchScheduler, "batch-scheduler", "", - "Batch scheduler name, supported values are volcano and yunikorn.") + "Batch scheduler name, supported values are volcano, yunikorn, kai-scheduler.") flag.StringVar(&configFile, "config", "", "Path to structured config file. Flags are ignored if config file is set.") flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use Kubernetes proxy subresource when connecting to the Ray Head node.")