-
Notifications
You must be signed in to change notification settings - Fork 594
Integration: KAI Scheduler #3886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 7 commits
13dc672
e80c426
fcbd27b
6032771
dfea787
e6634bb
1a63219
cfe36e6
3c5b2f8
28b592d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
apiVersion: ray.io/v1 | ||
kind: RayCluster | ||
metadata: | ||
name: raycluster-half-gpu | ||
labels: | ||
kai.scheduler/queue: team-a | ||
spec: | ||
headGroupSpec: | ||
template: | ||
spec: | ||
containers: | ||
- name: head | ||
image: rayproject/ray:2.46.0 | ||
resources: | ||
limits: | ||
cpu: "1" | ||
memory: "2Gi" | ||
|
||
# ---- Two workers share one GPU (0.5 each) ---- | ||
workerGroupSpecs: | ||
- groupName: shared-gpu | ||
replicas: 2 | ||
minReplicas: 2 | ||
template: | ||
metadata: | ||
annotations: | ||
gpu-fraction: "0.5" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean? Are you using DRA to mount the same GPU to two different Pods? Additionally, do we need to specify GPUs in the resource requests and limits? If not, KubeRay won’t pass GPU information to Ray, and Ray will be unable to map physical GPU resources in Kubernetes to logical resources within Ray. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add comments for the KAI Scheduler–specific configuration so that users can understand what this YAML is for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The example is using KAI Scheduler's native GPU sharing feature that is through time slicing. I made it clear in the comments with the new changes that I pushed. We do not need to specify it - When using I added comments to the YAML files to explain the KAI specific configurations now. Let me know what you think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Can you try testing whether Ray tasks or actors are actually using the GPUs? Since the CR doesn't specify @ray.remote(num_gpus=0.5)
def f():
# check GPU
ref = f.remote()
ray.get(ref) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the pointer. My tests showed it getting allocated, but the test-cluster has since been removed, so i have no quick way to try it again rn. I will try later again. If you have a way to test it in the mean time, please let me know if you find anything. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tested this now. It's working fine. Ray tasks can access GPUs and the workers are sharing the GPU correctly.
The test pattern you suggested also works. One quirk I noticed: Ray shows 2.0 GPU total (1 per worker) instead of recognizing the fractional allocations. I think this happens because, as you pointed out, without There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If GPU sharing is achieved by time slicing, does that mean each worker would feel they own the entire GPU? Is that why Ray shows 2.0 GPU? |
||
spec: | ||
containers: | ||
- name: worker | ||
image: rayproject/ray:2.46.0 | ||
resources: | ||
limits: | ||
cpu: "1" | ||
memory: "2Gi" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
apiVersion: scheduling.run.ai/v2 | ||
EkinKarabulut marked this conversation as resolved.
Show resolved
Hide resolved
|
||
kind: Queue | ||
metadata: | ||
name: department-1 | ||
spec: | ||
resources: | ||
cpu: | ||
quota: -1 | ||
limit: -1 | ||
overQuotaWeight: 1 | ||
gpu: | ||
quota: -1 | ||
limit: -1 | ||
overQuotaWeight: 1 | ||
memory: | ||
quota: -1 | ||
limit: -1 | ||
overQuotaWeight: 1 | ||
--- | ||
apiVersion: scheduling.run.ai/v2 | ||
kind: Queue | ||
metadata: | ||
name: team-a | ||
spec: | ||
parentQueue: department-1 | ||
EkinKarabulut marked this conversation as resolved.
Show resolved
Hide resolved
|
||
resources: | ||
cpu: | ||
quota: -1 | ||
limit: -1 | ||
overQuotaWeight: 1 | ||
gpu: | ||
quota: -1 | ||
limit: -1 | ||
overQuotaWeight: 1 | ||
memory: | ||
quota: -1 | ||
limit: -1 | ||
overQuotaWeight: 1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
#A simple example raycluster with KAI | ||
apiVersion: ray.io/v1 | ||
kind: RayCluster | ||
metadata: | ||
name: raycluster-sample | ||
labels: | ||
kai.scheduler/queue: team-a | ||
spec: | ||
headGroupSpec: | ||
template: | ||
spec: | ||
containers: | ||
- name: ray-head | ||
image: rayproject/ray:2.46.0 | ||
resources: | ||
requests: | ||
cpu: "1" | ||
memory: "2Gi" | ||
workerGroupSpecs: | ||
- groupName: worker | ||
replicas: 2 | ||
minReplicas: 2 | ||
template: | ||
spec: | ||
containers: | ||
- name: ray-worker | ||
image: rayproject/ray:2.46.0 | ||
resources: | ||
requests: | ||
cpu: "1" | ||
memory: "1Gi" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package kaischeduler | ||
|
||
// This KAI plugin relies on KAI-Scheduler's | ||
EkinKarabulut marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// built-in PodGrouper to create PodGroups at | ||
// runtime, so the plugin itself only needs to: | ||
// 1. expose the scheduler name, | ||
// 2. stamp pods with schedulerName + queue label. | ||
// No PodGroup create/patch logic is included. | ||
|
||
import ( | ||
"context" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/rest" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/builder" | ||
|
||
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" | ||
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
) | ||
|
||
const ( | ||
QueueLabelName = "kai.scheduler/queue" | ||
) | ||
|
||
type KaiScheduler struct{} | ||
|
||
type KaiSchedulerFactory struct{} | ||
|
||
func GetPluginName() string { return "kai-scheduler" } | ||
|
||
func (k *KaiScheduler) Name() string { return GetPluginName() } | ||
|
||
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error { | ||
return nil | ||
} | ||
|
||
func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) { | ||
pod.Spec.SchedulerName = k.Name() | ||
|
||
queue, ok := app.Labels[QueueLabelName] | ||
if !ok || queue == "" { | ||
logger := ctrl.LoggerFrom(ctx).WithName("kai-scheduler") | ||
EkinKarabulut marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.Info("Queue label missing from RayCluster; pods will remain pending", | ||
"requiredLabel", QueueLabelName, | ||
"rayCluster", app.Name) | ||
return | ||
} | ||
if pod.Labels == nil { | ||
pod.Labels = make(map[string]string) | ||
} | ||
pod.Labels[QueueLabelName] = queue | ||
} | ||
|
||
func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { | ||
EkinKarabulut marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return &KaiScheduler{}, nil | ||
} | ||
|
||
func (kf *KaiSchedulerFactory) AddToScheme(_ *runtime.Scheme) { | ||
} | ||
|
||
func (kf *KaiSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { | ||
return b | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package kaischeduler | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
|
||
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" | ||
) | ||
|
||
func createTestRayCluster(name, namespace string, labels map[string]string) *rayv1.RayCluster { | ||
return &rayv1.RayCluster{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: name, | ||
Namespace: namespace, | ||
Labels: labels, | ||
}, | ||
} | ||
} | ||
|
||
func createTestPod(name, namespace string) *corev1.Pod { | ||
return &corev1.Pod{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: name, | ||
Namespace: namespace, | ||
Labels: map[string]string{ | ||
"ray.io/cluster": "test-cluster", | ||
"ray.io/node-type": "worker", | ||
"app": "ray", | ||
}, | ||
}, | ||
Spec: corev1.PodSpec{ | ||
Containers: []corev1.Container{{ | ||
Name: "ray-worker", | ||
Image: "rayproject/ray:latest", | ||
}}, | ||
}, | ||
} | ||
} | ||
|
||
func TestAddMetadataToPod_WithQueueLabel(t *testing.T) { | ||
a := assert.New(t) | ||
scheduler := &KaiScheduler{} | ||
ctx := context.Background() | ||
|
||
// Create RayCluster with queue label | ||
rayCluster := createTestRayCluster("test-cluster", "default", map[string]string{ | ||
QueueLabelName: "test-queue", | ||
}) | ||
pod := createTestPod("test-pod", "default") | ||
|
||
// Call AddMetadataToPod | ||
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) | ||
|
||
// Assert scheduler name is set to kai-scheduler | ||
a.Equal("kai-scheduler", pod.Spec.SchedulerName) | ||
|
||
// Assert queue label is propagated to pod | ||
a.NotNil(pod.Labels) | ||
a.Equal("test-queue", pod.Labels[QueueLabelName]) | ||
} | ||
|
||
func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) { | ||
a := assert.New(t) | ||
scheduler := &KaiScheduler{} | ||
ctx := context.Background() | ||
|
||
// Create RayCluster without queue label | ||
rayCluster := createTestRayCluster("test-cluster", "default", map[string]string{}) | ||
pod := createTestPod("test-pod", "default") | ||
|
||
// Call AddMetadataToPod | ||
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) | ||
|
||
// Assert scheduler name is still set (always required) | ||
a.Equal("kai-scheduler", pod.Spec.SchedulerName) | ||
|
||
// Assert queue label is not added to pod when missing from RayCluster | ||
if pod.Labels != nil { | ||
_, exists := pod.Labels[QueueLabelName] | ||
a.False(exists) | ||
} | ||
} | ||
|
||
func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) { | ||
a := assert.New(t) | ||
scheduler := &KaiScheduler{} | ||
ctx := context.Background() | ||
|
||
// Create RayCluster with empty queue label | ||
rayCluster := createTestRayCluster("test-cluster", "default", map[string]string{ | ||
QueueLabelName: "", | ||
}) | ||
pod := createTestPod("test-pod", "default") | ||
|
||
// Call AddMetadataToPod | ||
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) | ||
|
||
// Assert scheduler name is still set | ||
a.Equal("kai-scheduler", pod.Spec.SchedulerName) | ||
|
||
// Assert empty queue label is treated as missing | ||
if pod.Labels != nil { | ||
_, exists := pod.Labels[QueueLabelName] | ||
a.False(exists) | ||
} | ||
} | ||
|
||
func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) { | ||
a := assert.New(t) | ||
scheduler := &KaiScheduler{} | ||
ctx := context.Background() | ||
|
||
// Create RayCluster with queue label | ||
rayCluster := createTestRayCluster("test-cluster", "default", map[string]string{ | ||
QueueLabelName: "test-queue", | ||
}) | ||
|
||
// Create pod with existing labels | ||
pod := createTestPod("test-pod", "default") | ||
pod.Labels = map[string]string{ | ||
"existing-label": "existing-value", | ||
"app": "ray", | ||
} | ||
|
||
// Call AddMetadataToPod | ||
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) | ||
|
||
// Assert scheduler name is set | ||
a.Equal("kai-scheduler", pod.Spec.SchedulerName) | ||
|
||
// Assert queue label is added | ||
a.Equal("test-queue", pod.Labels[QueueLabelName]) | ||
|
||
// Assert existing labels are preserved | ||
a.Equal("existing-value", pod.Labels["existing-label"]) | ||
a.Equal("ray", pod.Labels["app"]) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you share what the Pod looks like after it's created, using
kubectl describe pod ...
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we go: