Skip to content

Commit f6b4f17

Browse files
authored
Feature/cron scheduling rayjob 2426 (#3836)
* Add support for cron sheduling for ray jobs --------- Signed-off-by: Kenny Han <[email protected]>
1 parent 82fdef3 commit f6b4f17

File tree

18 files changed

+1198
-0
lines changed

18 files changed

+1198
-0
lines changed

docs/reference/api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ _Appears in:_
248248
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
249249
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
250250
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the<br />entrypoint command. | | |
251+
| `schedule` _string_ | Schedule specifies a cron like string for scheduling Ray jobs.<br />When shutdownAfterJobFinishes is set to true, a new cluster is provisioned<br />per scheduled job, otherwise the job is scheduled on an existing cluster. | | |
251252
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
252253
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |
253254
| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.<br />It's only working when ShutdownAfterJobFinishes set to true. | 0 | |

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ require (
8787
github.com/prometheus/client_model v0.6.1 // indirect
8888
github.com/prometheus/common v0.62.0 // indirect
8989
github.com/prometheus/procfs v0.15.1 // indirect
90+
github.com/robfig/cron/v3 v3.0.1 // indirect
9091
github.com/russross/blackfriday/v2 v2.1.0 // indirect
9192
github.com/x448/float16 v0.8.4 // indirect
9293
github.com/xlab/treeprint v1.2.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/apis/ray/v1/rayjob_types.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ const (
5454
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
5555
JobDeploymentStatusRetrying JobDeploymentStatus = "Retrying"
5656
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
57+
JobDeploymentStatusScheduling JobDeploymentStatus = "Scheduling"
58+
JobDeploymentStatusScheduled JobDeploymentStatus = "Scheduled"
5759
)
5860

5961
// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
@@ -181,6 +183,11 @@ type RayJobSpec struct {
181183
// entrypoint command.
182184
// +optional
183185
EntrypointResources string `json:"entrypointResources,omitempty"`
186+
// Schedule specifies a cron like string for scheduling Ray jobs.
187+
// When shutdownAfterJobFinishes is set to true, a new cluster is provisioned
188+
// per scheduled job, otherwise the job is scheduled on an existing cluster.
189+
// +optional
190+
Schedule string `json:"schedule,omitempty"`
184191
// EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command.
185192
// +optional
186193
EntrypointNumCpus float32 `json:"entrypointNumCpus,omitempty"`
@@ -233,6 +240,9 @@ type RayJobStatus struct {
233240
// or the submitter Job has failed.
234241
// +optional
235242
EndTime *metav1.Time `json:"endTime,omitempty"`
243+
// lastScheduledTime is the last time the job was successfully scheduled.
244+
// +optional
245+
LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
236246
// Succeeded is the number of times this job succeeded.
237247
// +kubebuilder:default:=0
238248
// +optional

ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
apiVersion: ray.io/v1
2+
kind: RayJob
3+
metadata:
4+
name: rayjob-schedule
5+
spec:
6+
# schedule specifires a cron scheduling string telling the rayjob when to start schedule and run new jobs
7+
# Here it runs at every 5 minutes of every hour of every day of every week of every year
8+
schedule: "*/5 * * * *"
9+
10+
entrypoint: python /home/ray/samples/sample_code.py
11+
12+
# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
13+
# NOTE that the expected behavior with schedule is that the cluster will be deleted and recreated at each schedule if set to true, and it will keep using the same cluster otherwise
14+
shutdownAfterJobFinishes: true
15+
16+
runtimeEnvYAML: |
17+
pip:
18+
- requests==2.26.0
19+
- pendulum==2.1.2
20+
env_vars:
21+
counter_name: "test_counter"
22+
23+
24+
rayClusterSpec:
25+
rayVersion: '2.46.0'
26+
headGroupSpec:
27+
rayStartParams: {}
28+
template:
29+
spec:
30+
containers:
31+
- name: ray-head
32+
image: rayproject/ray:2.46.0
33+
ports:
34+
- containerPort: 6379
35+
name: gcs-server
36+
- containerPort: 8265
37+
name: dashboard
38+
- containerPort: 10001
39+
name: client
40+
resources:
41+
limits:
42+
cpu: "1"
43+
requests:
44+
cpu: "200m"
45+
volumeMounts:
46+
- mountPath: /home/ray/samples
47+
name: code-sample
48+
volumes:
49+
- name: code-sample
50+
configMap:
51+
name: ray-job-code-sample
52+
items:
53+
- key: sample_code.py
54+
path: sample_code.py
55+
workerGroupSpecs:
56+
- replicas: 1
57+
minReplicas: 1
58+
maxReplicas: 5
59+
groupName: small-group
60+
rayStartParams: {}
61+
template:
62+
spec:
63+
containers:
64+
- name: ray-worker
65+
image: rayproject/ray:2.46.0
66+
resources:
67+
limits:
68+
cpu: "1"
69+
requests:
70+
cpu: "200m"
71+
# SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
72+
# If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
73+
# submitterPodTemplate:
74+
# spec:
75+
# restartPolicy: Never
76+
# containers:
77+
# - name: my-custom-rayjob-submitter-pod
78+
# image: rayproject/ray:2.46.0
79+
# # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field.
80+
# # Specifying Command is not recommended.
81+
# # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]
82+
83+
84+
######################Ray code sample#################################
85+
# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
86+
# it is mounted into the container and executed to show the Ray job at work
87+
---
88+
apiVersion: v1
89+
kind: ConfigMap
90+
metadata:
91+
name: ray-job-code-sample
92+
data:
93+
sample_code.py: |
94+
import ray
95+
import os
96+
import requests
97+
98+
ray.init()
99+
100+
@ray.remote
101+
class Counter:
102+
def __init__(self):
103+
# Used to verify runtimeEnv
104+
self.name = os.getenv("counter_name")
105+
assert self.name == "test_counter"
106+
self.counter = 0
107+
108+
def inc(self):
109+
self.counter += 1
110+
111+
def get_counter(self):
112+
return "{} got {}".format(self.name, self.counter)
113+
114+
counter = Counter.remote()
115+
116+
for _ in range(5):
117+
ray.get(counter.inc.remote())
118+
print(ray.get(counter.get_counter.remote()))
119+
120+
# Verify that the correct runtime env was used for the job.
121+
assert requests.__version__ == "2.26.0"

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/go-logr/logr"
12+
"github.com/robfig/cron/v3"
1213
batchv1 "k8s.io/api/batch/v1"
1314
corev1 "k8s.io/api/core/v1"
1415
"k8s.io/apimachinery/pkg/api/errors"
@@ -34,6 +35,8 @@ const (
3435
RayJobDefaultRequeueDuration = 3 * time.Second
3536
RayJobDefaultClusterSelectorKey = "ray.io/cluster"
3637
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
38+
// The buffer period in which a scheduled rajob can run since the last cron tick
39+
ScheduleBuffer = 100 * time.Millisecond
3740
)
3841

3942
// RayJobReconciler reconciles a RayJob object
@@ -168,6 +171,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
168171
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
169172
}
170173
}
174+
// We check the LastScheduleTime to know if its the first job
175+
if rayJobInstance.Spec.Schedule != "" && rayJobInstance.Status.LastScheduleTime == nil {
176+
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
177+
break
178+
}
171179
// Set `Status.JobDeploymentStatus` to `JobDeploymentStatusInitializing`, and initialize `Status.JobId`
172180
// and `Status.RayClusterName` prior to avoid duplicate job submissions and cluster creations.
173181
logger.Info("JobDeploymentStatusNew")
@@ -449,9 +457,58 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
449457
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
450458
}
451459
}
460+
if rayJobInstance.Spec.Schedule != "" {
461+
logger.Info("Rescheduling RayJob")
462+
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduling
463+
break
464+
}
452465

453466
// If the RayJob is completed, we should not requeue it.
454467
return ctrl.Result{}, nil
468+
case rayv1.JobDeploymentStatusScheduling:
469+
isJobDeleted, err := r.deleteSubmitterJob(ctx, rayJobInstance)
470+
if err != nil {
471+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
472+
}
473+
474+
if !isJobDeleted {
475+
logger.Info("The release of the compute resources has not been completed yet. " +
476+
"Wait for the resources to be deleted before the status transitions to avoid a resource leak.")
477+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
478+
}
479+
480+
if rayJobInstance.Spec.ShutdownAfterJobFinishes {
481+
rayJobInstance.Status.RayClusterStatus = rayv1.RayClusterStatus{}
482+
rayJobInstance.Status.RayClusterName = ""
483+
484+
}
485+
rayJobInstance.Status.DashboardURL = ""
486+
rayJobInstance.Status.JobId = ""
487+
rayJobInstance.Status.Message = ""
488+
rayJobInstance.Status.Reason = ""
489+
rayJobInstance.Status.RayJobStatusInfo = rayv1.RayJobStatusInfo{}
490+
491+
rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
492+
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
493+
case rayv1.JobDeploymentStatusScheduled:
494+
// We get the time from the current time to the previous and next cron schedule times
495+
// We pass in time.Now() as a parameter so easier unit testing and consistency
496+
t1, t2, err := r.getNextAndPreviousScheduleDistance(ctx, time.Now(), rayJobInstance)
497+
if err != nil {
498+
logger.Error(err, "Could not get the previous and next distances for a cron schedule")
499+
return ctrl.Result{}, err
500+
}
501+
// Checking if we are currently within a buffer to the previous cron schedule time
502+
if t2 <= ScheduleBuffer {
503+
logger.Info("The current time is within the buffer window of a cron tick", "NextScheduleTimeDuration", t1, "LastScheduleTimeDuration", t2, "Previous LastScheduleTime", rayJobInstance.Status.LastScheduleTime)
504+
rayJobInstance.Status.LastScheduleTime = &metav1.Time{Time: time.Now()}
505+
rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
506+
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusNew
507+
} else {
508+
logger.Info("Waiting until the next reconcile to determine schedule", "nextScheduleDuration", t1, "currentTime", time.Now(), "lastScheduleTimeDuration", t2)
509+
return ctrl.Result{RequeueAfter: t1}, nil
510+
}
511+
455512
default:
456513
logger.Info("Unknown JobDeploymentStatus", "JobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus)
457514
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
@@ -894,6 +951,23 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra
894951
return rayCluster, nil
895952
}
896953

954+
func (r *RayJobReconciler) getNextAndPreviousScheduleDistance(ctx context.Context, currentTime time.Time, rayJobInstance *rayv1.RayJob) (time.Duration, time.Duration, error) {
955+
logger := ctrl.LoggerFrom(ctx)
956+
formatedCron := utils.FormatSchedule(rayJobInstance, r.Recorder)
957+
cronSchedule, err := cron.ParseStandard(formatedCron)
958+
if err != nil {
959+
// this is likely a user error in defining the spec value
960+
// we should log the error and not reconcile this cronjob until an update to spec
961+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", rayJobInstance.Spec.Schedule, err)
962+
return 0, 0, fmt.Errorf("the cron schedule provided is unparseable: %w", err)
963+
}
964+
965+
t1 := utils.NextScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)
966+
t2 := utils.LastScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)
967+
968+
return t1, t2, nil
969+
}
970+
897971
func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
898972
logger := ctrl.LoggerFrom(ctx)
899973
if !rayJob.Spec.Suspend {

0 commit comments

Comments
 (0)