Skip to content

Commit cf4a877

Browse files
[Feat][RayJob] UserMode SubmissionMode (#2364)
1 parent 39d42fb commit cf4a877

File tree

7 files changed

+605
-479
lines changed

7 files changed

+605
-479
lines changed

helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/apis/ray/v1/rayjob_types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
JobDeploymentStatusSuspending JobDeploymentStatus = "Suspending"
4343
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
4444
JobDeploymentStatusRetrying JobDeploymentStatus = "Retrying"
45+
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
4546
)
4647

4748
// JobFailedReason indicates the reason the RayJob changes its JobDeploymentStatus to 'Failed'
@@ -58,6 +59,7 @@ type JobSubmissionMode string
5859
const (
5960
K8sJobMode JobSubmissionMode = "K8sJobMode" // Submit job via Kubernetes Job
6061
HTTPMode JobSubmissionMode = "HTTPMode" // Submit job via HTTP request
62+
UserMode JobSubmissionMode = "UserMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID
6163
)
6264

6365
type SubmitterConfig struct {
@@ -86,7 +88,7 @@ type RayJobSpec struct {
8688
SubmitterConfig *SubmitterConfig `json:"submitterConfig,omitempty"`
8789
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
8890
// Important: Run "make" to regenerate code after modifying this file
89-
Entrypoint string `json:"entrypoint"`
91+
Entrypoint string `json:"entrypoint,omitempty"`
9092
// RuntimeEnvYAML represents the runtime environment configuration
9193
// provided as a multi-line YAML string.
9294
RuntimeEnvYAML string `json:"runtimeEnvYAML,omitempty"`

ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
129129
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
130130
}
131131

132+
if err := validateRayJobStatus(rayJobInstance); err != nil {
133+
logger.Error(err, "The RayJob status is invalid")
134+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
135+
}
136+
132137
// Please do NOT modify `originalRayJobInstance` in the following code.
133138
originalRayJobInstance := rayJobInstance.DeepCopy()
134139

@@ -177,15 +182,30 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
177182
rayJobInstance.Status.DashboardURL = clientURL
178183
}
179184

185+
if rayJobInstance.Spec.SubmissionMode == rayv1.UserMode {
186+
logger.Info("SubmissionMode is UserMode and the RayCluster is created. Transition the status from `Initializing` to `Waiting`.")
187+
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusWaiting
188+
break
189+
}
190+
180191
if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
181192
if err := r.createK8sJobIfNeed(ctx, rayJobInstance, rayClusterInstance); err != nil {
182193
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
183194
}
184195
}
185196

186-
logger.Info("Both RayCluster and the submitter K8s Job are created. Transition the status from `Initializing` to `Running`.",
197+
logger.Info("Both RayCluster and the submitter K8s Job are created. Transition the status from `Initializing` to `Running`.", "SubmissionMode", rayJobInstance.Spec.SubmissionMode,
187198
"RayJob", rayJobInstance.Name, "RayCluster", rayJobInstance.Status.RayClusterName)
188199
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning
200+
case rayv1.JobDeploymentStatusWaiting:
201+
// Try to get the Ray job id from the Ray job annotations.
202+
rayJobId, found := rayJobInstance.ObjectMeta.Annotations[utils.RayJobSubmissionIdLabelKey]
203+
logger.Info("Get Ray job id from the Ray job annotations", "RayJobId", rayJobId, "Found", found)
204+
if !found {
205+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
206+
}
207+
rayJobInstance.Status.JobId = rayJobId
208+
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning
189209
case rayv1.JobDeploymentStatusRunning:
190210
if shouldUpdate := r.updateStatusToSuspendingIfNeeded(ctx, rayJobInstance); shouldUpdate {
191211
break
@@ -606,6 +626,7 @@ func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurren
606626
// This function is the sole place where `JobDeploymentStatusInitializing` is defined. It initializes `Status.JobId` and `Status.RayClusterName`
607627
// prior to job submissions and RayCluster creations. This is used to avoid duplicate job submissions and cluster creations. In addition, this
608628
// function also sets `Status.StartTime` to support `ActiveDeadlineSeconds`.
629+
// This function will set or generate JobId if SubmissionMode is not UserMode.
609630
func (r *RayJobReconciler) initRayJobStatusIfNeed(ctx context.Context, rayJob *rayv1.RayJob) error {
610631
logger := ctrl.LoggerFrom(ctx)
611632
shouldUpdateStatus := rayJob.Status.JobId == "" || rayJob.Status.RayClusterName == "" || rayJob.Status.JobStatus == ""
@@ -615,7 +636,7 @@ func (r *RayJobReconciler) initRayJobStatusIfNeed(ctx context.Context, rayJob *r
615636
return nil
616637
}
617638

618-
if rayJob.Status.JobId == "" {
639+
if rayJob.Spec.SubmissionMode != rayv1.UserMode && rayJob.Status.JobId == "" {
619640
if rayJob.Spec.JobId != "" {
620641
rayJob.Status.JobId = rayJob.Spec.JobId
621642
} else {
@@ -811,3 +832,11 @@ func validateRayJobSpec(rayJob *rayv1.RayJob) error {
811832
}
812833
return nil
813834
}
835+
836+
func validateRayJobStatus(rayJob *rayv1.RayJob) error {
837+
if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusWaiting && rayJob.Spec.SubmissionMode != rayv1.UserMode {
838+
return fmt.Errorf("invalid RayJob State: JobDeploymentStatus cannot be `Waiting` when SubmissionMode is not UserMode")
839+
}
840+
841+
return nil
842+
}

0 commit comments

Comments
 (0)