Skip to content

Integration: KAI Scheduler #3886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
8 changes: 6 additions & 2 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ logging:
# 4. Use PodGroup
# batchScheduler:
# name: scheduler-plugins
#

# 5. Use Kai Scheduler
# batchScheduler:
# name: kai-scheduler

batchScheduler:
# Deprecated. This option will be removed in the future.
# Note, for backwards compatibility. When it sets to true, it enables volcano scheduler integration.
enabled: false
# Set the customized scheduler name, supported values are "volcano", "yunikorn" or "scheduler-plugins", do not set
# Set the customized scheduler name, supported values are "volcano", "yunikorn", "kai-scheduler" or "scheduler-plugins", do not set
# "batchScheduler.enabled=true" at the same time as it will override this option.
name: ""

Expand Down
3 changes: 2 additions & 1 deletion ray-operator/apis/config/v1alpha1/config_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-logr/logr"

kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler"
schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
Expand All @@ -23,7 +24,7 @@ func ValidateBatchSchedulerConfig(logger logr.Logger, config Configuration) erro

if len(config.BatchScheduler) > 0 {
// if a customized scheduler is configured, check it is supported
if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() || config.BatchScheduler == schedulerplugins.GetPluginName() {
if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() || config.BatchScheduler == schedulerplugins.GetPluginName() || config.BatchScheduler == kaischeduler.GetPluginName() {
logger.Info("Feature flag batch-scheduler is enabled",
"scheduler name", config.BatchScheduler)
} else {
Expand Down
11 changes: 11 additions & 0 deletions ray-operator/apis/config/v1alpha1/config_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"

kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler"
schedulerPlugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
Expand Down Expand Up @@ -71,6 +72,16 @@ func TestValidateBatchSchedulerConfig(t *testing.T) {
},
wantErr: false,
},
{
name: "valid option, batch-scheduler=kai-scheduler",
args: args{
logger: testr.New(t),
config: Configuration{
BatchScheduler: kaischeduler.GetPluginName(),
},
},
wantErr: false,
},
{
name: "invalid option, invalid scheduler name",
args: args{
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Configuration struct {
LogStdoutEncoder string `json:"logStdoutEncoder,omitempty"`

// BatchScheduler enables the batch scheduler integration with a specific scheduler
// based on the given name, currently, supported values are volcano and yunikorn.
// based on the given name, currently, supported values are volcano, yunikorn, kai-scheduler.
BatchScheduler string `json:"batchScheduler,omitempty"`

// HeadSidecarContainers includes specification for a sidecar container
Expand Down
92 changes: 92 additions & 0 deletions ray-operator/config/samples/ray-cluster.kai-gpu-sharing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# KAI Scheduler Example - GPU Sharing with RayCluster
# KAI Scheduler uses a hierarchical queue system for resource management and fair sharing.
# These queues must be created before any RayCluster can be scheduled by KAI.

# NOTE: This is a DEMO configuration with unlimited quotas (-1) for easy testing.
# In real-world deployments, you should set appropriate CPU/GPU/memory quotas and limits
# based on your cluster's actual resources and organizational needs.

# GPU Sharing Note: This example utilizes time slicing GPU sharing.
# KAI Scheduler also supports MPS and other GPU sharing methods.
# For more information, check the KAI Scheduler documentation.

# Parent queue: Represents a department or high-level organizational unit
apiVersion: scheduling.run.ai/v2
kind: Queue
metadata:
name: department-1
spec:
# priority: 100 # Optional: Higher priority queues get surplus resources first
resources:
cpu:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Determines how leftover (surplus) CPU resources are distributed among
# queues that have the same priority. Higher weights receive larger portion
gpu:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Share of surplus resources among same-priority queues
memory:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Share of surplus resources among same-priority queues
---
# Child queue: Represents a team within the department-1
apiVersion: scheduling.run.ai/v2
kind: Queue
metadata:
name: team-a
spec:
parentQueue: department-1 # Inherits from parent queue
# priority: 50 # Optional: Team priority within department
resources:
cpu:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Determines how leftover (surplus) CPU resources are distributed among
# queues that have the same priority. Higher weights receive larger portion
gpu:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Share of surplus resources among same-priority queues
memory:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Share of surplus resources among same-priority queues
---
# RayCluster with KAI Scheduler and GPU Sharing
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-half-gpu
labels:
kai.scheduler/queue: team-a # REQUIRED: Queue assignment for scheduling
spec:
headGroupSpec:
template:
spec:
containers:
- name: head
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
memory: "2Gi"
# ---- Two workers share one GPU (0.5 each) ----
workerGroupSpecs:
- groupName: shared-gpu
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share what the Pod looks like after it's created, using kubectl describe pod ...?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we go:



Name:             raycluster-half-gpu-shared-gpu-worker-6sx5d
Namespace:        default
Priority:         0
Service Account:  default
Node:             ip-xxxxx
Start Time:       Wed, 06 Aug 2025 21:01:54 +0200
Labels:           app.kubernetes.io/created-by=kuberay-operator
                  app.kubernetes.io/name=kuberay
                  kai.scheduler/queue=team-a
                  ray.io/cluster=raycluster-half-gpu
                  ray.io/group=shared-gpu
                  ray.io/identifier=raycluster-half-gpu-worker
                  ray.io/is-ray-node=yes
                  ray.io/node-type=worker
                  runai-gpu-group=518b1881-bd3c-4593-9bf3-2e59e98d6cb9
Annotations:      gpu-fraction: 0.5
                  pod-group-name: pg-raycluster-half-gpu-b1ee6048-1369-4ee3-a5a5-a66a377e769f
                  received-resource-type: Fraction
                  runai/shared-gpu-configmap: raycluster-half-gpu-6hl5xvs-shared-gpu
Status:           Running
IP:               xxxxxx
IPs:
  IP:           xxxxxx
Controlled By:  RayCluster/raycluster-half-gpu
Init Containers:
  wait-gcs-ready:
    Container ID:  containerd://27bf1b6c4f5723594b77658697c8a9be3bf9f72579e2f230e2e8ae28d2d74459
    Image:         rayproject/ray:2.46.0
    Image ID:      docker.io/rayproject/ray@sha256:764d7d4bf276143fac2fe322fe41593bb36bbd4dbe7fe9a2d94b67acb736eae3
    Port:          <none>
    Host Port:     <none>
    Command:
      /bin/bash
      -c
      --
    Args:

                            SECONDS=0
                            while true; do
                              if (( SECONDS <= 120 )); then
                                if ray health-check --address raycluster-half-gpu-head-svc.default.svc.cluster.local:6379 > /dev/null 2>&1; then
                                  echo "GCS is ready."
                                  break
                                fi
                                echo "$SECONDS seconds elapsed: Waiting for GCS to be ready."
                              else
                                if ray health-check --address raycluster-half-gpu-head-svc.default.svc.cluster.local:6379; then
                                  echo "GCS is ready. Any error messages above can be safely ignored."
                                  break
                                fi
                                echo "$SECONDS seconds elapsed: Still waiting for GCS to be ready. For troubleshooting, refer to the FAQ at https://github.com/ray-project/kuberay/blob/master/docs/guidance/FAQ.md."
                              fi
                              sleep 5
                            done

    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Wed, 06 Aug 2025 21:01:55 +0200
      Finished:     Wed, 06 Aug 2025 21:02:20 +0200
    Ready:          True
    Restart Count:  0
    Limits:
      cpu:     200m
      memory:  256Mi
    Requests:
      cpu:     200m
      memory:  256Mi
    Environment:
      FQ_RAY_IP:  raycluster-half-gpu-head-svc.default.svc.cluster.local
      RAY_IP:     raycluster-half-gpu-head-svc
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-5wz6g (ro)
Containers:
  worker:
    Container ID:  containerd://ae2297c4dd07bfb89a4a2795915cd0b7d8aadd0aacd84f8859c448ab95927f86
    Image:         rayproject/ray:2.46.0
    Image ID:      docker.io/rayproject/ray@sha256:764d7d4bf276143fac2fe322fe41593bb36bbd4dbe7fe9a2d94b67acb736eae3
    Port:          8080/TCP
    Host Port:     0/TCP
    Command:
      /bin/bash
      -c
      --
    Args:
      ulimit -n 65536; ray start  --address=raycluster-half-gpu-head-svc.default.svc.cluster.local:6379  --block  --dashboard-agent-listen-port=52365  --memory=2147483648  --metrics-export-port=8080  --num-cpus=1
    State:          Running
      Started:      Wed, 06 Aug 2025 21:02:21 +0200
    Ready:          True
    Restart Count:  0
    Limits:
      cpu:     1
      memory:  2Gi
    Requests:
      cpu:      1
      memory:   2Gi
    Liveness:   exec [bash -c wget --tries 1 -T 2 -q -O- http://localhost:52365/api/local_raylet_healthz | grep success] delay=30s timeout=2s period=5s #success=1 #failure=120
    Readiness:  exec [bash -c wget --tries 1 -T 2 -q -O- http://localhost:52365/api/local_raylet_healthz | grep success] delay=10s timeout=2s period=5s #success=1 #failure=10
    Environment Variables from:
      raycluster-half-gpu-6hl5xvs-shared-gpu-0-evar  ConfigMap  Optional: false
    Environment:
      FQ_RAY_IP:                            raycluster-half-gpu-head-svc.default.svc.cluster.local
      RAY_IP:                               raycluster-half-gpu-head-svc
      RAY_CLUSTER_NAME:                      (v1:metadata.labels['ray.io/cluster'])
      RAY_CLOUD_INSTANCE_ID:                raycluster-half-gpu-shared-gpu-worker-6sx5d (v1:metadata.name)
      RAY_NODE_TYPE_NAME:                    (v1:metadata.labels['ray.io/group'])
      KUBERAY_GEN_RAY_START_CMD:            ray start  --address=raycluster-half-gpu-head-svc.default.svc.cluster.local:6379  --block  --dashboard-agent-listen-port=52365  --memory=2147483648  --metrics-export-port=8080  --num-cpus=1
      RAY_PORT:                             6379
      RAY_ADDRESS:                          raycluster-half-gpu-head-svc.default.svc.cluster.local:6379
      RAY_USAGE_STATS_KUBERAY_IN_USE:       1
      RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE:  1
      NVIDIA_VISIBLE_DEVICES:               <set to the key 'NVIDIA_VISIBLE_DEVICES' of config map 'raycluster-half-gpu-6hl5xvs-shared-gpu-0'>  Optional: false
      RUNAI_NUM_OF_GPUS:                    <set to the key 'RUNAI_NUM_OF_GPUS' of config map 'raycluster-half-gpu-6hl5xvs-shared-gpu-0'>       Optional: false
    Mounts:
      /dev/shm from shared-mem (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-5wz6g (ro)
Conditions:
  Type              Status
  PodBound          True
  Initialized       True
  Ready             True
  ContainersReady   True
  PodScheduled      True
Volumes:
  shared-mem:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     Memory
    SizeLimit:  2Gi
  kube-api-access-5wz6g:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
  raycluster-half-gpu-6hl5xvs-shared-gpu-0-vol:
    Type:        ConfigMap (a volume populated by a ConfigMap)
    Name:        raycluster-half-gpu-6hl5xvs-shared-gpu-0
    Optional:    false
QoS Class:       Guaranteed
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type    Reason     Age   From           Message
  ----    ------     ----  ----           -------
  Normal  Scheduled  100s  kai-scheduler  Successfully assigned pod default/raycluster-half-gpu-shared-gpu-worker-6sx5d to node ip-xxxxxxx at node-pool default
  Normal  Bound      100s  binder         Pod bound successfully to node ip-xxxxxxx
  Normal  Pulled     99s   kubelet        Container image "rayproject/ray:2.46.0" already present on machine
  Normal  Created    99s   kubelet        Created container wait-gcs-ready
  Normal  Started    99s   kubelet        Started container wait-gcs-ready
  Normal  Pulled     73s   kubelet        Container image "rayproject/ray:2.46.0" already present on machine
  Normal  Created    73s   kubelet        Created container worker
  Normal  Started    73s   kubelet        Started container worker

replicas: 2
minReplicas: 2
template:
metadata:
annotations:
gpu-fraction: "0.5" # Request 0.5 GPU per pod (two pods share one GPU)
spec:
containers:
- name: worker
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
memory: "2Gi"
84 changes: 84 additions & 0 deletions ray-operator/config/samples/ray-cluster.kai-scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# KAI Scheduler Example - Basic RayCluster
# KAI Scheduler uses a hierarchical queue system for resource management and fair sharing.
# These queues must be created before any RayCluster can be scheduled by KAI.

# NOTE: This is a DEMO configuration with unlimited quotas (-1) for easy testing.
# In real-world deployments, you should set appropriate CPU/GPU/memory quotas and limits
# based on your cluster's actual resources and organizational needs.

# Parent queue: Represents a department or high-level organizational unit
apiVersion: scheduling.run.ai/v2
kind: Queue
metadata:
name: department-1
spec:
# priority: 100 # Optional: Higher priority queues get surplus resources first
resources:
cpu:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Determines how leftover (surplus) CPU resources are distributed among
# queues that have the same priority. Higher weights receive larger portion
gpu:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Share of surplus resources among same-priority queues
memory:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Share of surplus resources among same-priority queues
---
# Child queue: Represents a team within the department-1
apiVersion: scheduling.run.ai/v2
kind: Queue
metadata:
name: team-a
spec:
parentQueue: department-1 # Inherits from parent queue
# priority: 50 # Optional: Team priority within department
resources:
cpu:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Determines how leftover (surplus) CPU resources are distributed among
# queues that have the same priority. Higher weights receive larger portion
gpu:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Share of surplus resources among same-priority teams
memory:
quota: -1 # Unlimited quota (demo setting)
limit: -1 # Unlimited burst limit (demo setting)
overQuotaWeight: 1 # Share of surplus resources among same-priority teams
---
# RayCluster with KAI Scheduler
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-sample
labels:
kai.scheduler/queue: team-a # REQUIRED: Queue assignment for scheduling
spec:
headGroupSpec:
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
resources:
requests:
cpu: "1"
memory: "2Gi"
workerGroupSpecs:
- groupName: worker
replicas: 2
minReplicas: 2
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.46.0
resources:
requests:
cpu: "1"
memory: "1Gi"
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kaischeduler

// This KAI plugin relies on KAI-Scheduler's
// built-in PodGrouper to create PodGroups at
// runtime, so the plugin itself only needs to:
// 1. expose the scheduler name,
// 2. stamp pods with schedulerName + queue label.
// No PodGroup create/patch logic is included.

import (
"context"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
)

const (
QueueLabelName = "kai.scheduler/queue"
)

type KaiScheduler struct {
log logr.Logger
}

type KaiSchedulerFactory struct{}

func GetPluginName() string { return "kai-scheduler" }

func (k *KaiScheduler) Name() string { return GetPluginName() }

func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
return nil
}

func (k *KaiScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
pod.Spec.SchedulerName = k.Name()

queue, ok := app.Labels[QueueLabelName]
if !ok || queue == "" {
k.log.Info("Queue label missing from RayCluster; pods will remain pending",
"requiredLabel", QueueLabelName,
"rayCluster", app.Name)
return
}
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[QueueLabelName] = queue
}

func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
return &KaiScheduler{
log: logf.Log.WithName("kai-scheduler"),
}, nil
}

func (kf *KaiSchedulerFactory) AddToScheme(_ *runtime.Scheme) {
}

func (kf *KaiSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder {
return b
}
Loading
Loading