Skip to content

Commit 2285202

Browse files
dushulindushulinkevin85421
authored
[ray-operator][Bug] Rayjob is Failed or Succeed, but Raycluster status(jobDeploymentStatus) is still Running(#3553) (#3642)
* fix: Fix raycluster grace period exec(#3553) * fix: Add event to notice ray bug(#3553) * fix: Change get SUBMITTER_DEFAULT_GRACE_PERIOD_TIME env(#3553) * feat: Add backup gitlab yml(#0) * fix: Clean log and fix transition logix(#0) * fix: Add EndTime when jobStatus is terminal(#0) * Update ray-operator/controllers/ray/rayjob_controller.go Co-authored-by: Kai-Hsun Chen <[email protected]> Signed-off-by: dsl <[email protected]> * Update ray-operator/controllers/ray/utils/constant.go Co-authored-by: Kai-Hsun Chen <[email protected]> Signed-off-by: dsl <[email protected]> * Update ray-operator/controllers/ray/utils/constant.go Co-authored-by: Kai-Hsun Chen <[email protected]> Signed-off-by: dsl <[email protected]> * Update ray-operator/apis/ray/v1/rayjob_types.go Co-authored-by: Kai-Hsun Chen <[email protected]> Signed-off-by: dsl <[email protected]> * Update ray-operator/controllers/ray/utils/constant.go Co-authored-by: Kai-Hsun Chen <[email protected]> Signed-off-by: dsl <[email protected]> * Update ray-operator/controllers/ray/utils/constant.go Co-authored-by: Kai-Hsun Chen <[email protected]> Signed-off-by: dsl <[email protected]> * Update ray-operator/controllers/ray/utils/constant.go Co-authored-by: Kai-Hsun Chen <[email protected]> Signed-off-by: dsl <[email protected]> * fix: Use RayJobStatusInfo.EndTime and optimization of some var name(#0) * fix: Use RayJobStatusInfo.Endtime(#0) * fix: Unify the logic of JobDeploymentStatus from JobStatus(#0) * fix: Unify the logic of JobDeploymentStatus from JobStatus(#0) * fix: Delete the logic of JobDeploymentStatus from JobStatus(#0) * fix envtest Signed-off-by: kaihsun <[email protected]> --------- Signed-off-by: dsl <[email protected]> Signed-off-by: kaihsun <[email protected]> Co-authored-by: dushulin <[email protected]> Co-authored-by: Kai-Hsun Chen <[email protected]> Co-authored-by: kaihsun <[email protected]>
1 parent 846416e commit 2285202

File tree

4 files changed

+50
-11
lines changed

4 files changed

+50
-11
lines changed

ray-operator/apis/ray/v1/rayjob_types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ func IsJobDeploymentTerminal(status JobDeploymentStatus) bool {
7070
type JobFailedReason string
7171

7272
const (
73-
SubmissionFailed JobFailedReason = "SubmissionFailed"
74-
DeadlineExceeded JobFailedReason = "DeadlineExceeded"
75-
AppFailed JobFailedReason = "AppFailed"
73+
SubmissionFailed JobFailedReason = "SubmissionFailed"
74+
DeadlineExceeded JobFailedReason = "DeadlineExceeded"
75+
AppFailed JobFailedReason = "AppFailed"
76+
JobDeploymentStatusTransitionGracePeriodExceeded JobFailedReason = "JobDeploymentStatusTransitionGracePeriodExceeded"
7677
)
7778

7879
type JobSubmissionMode string

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"os"
7+
"strconv"
78
"strings"
89
"time"
910

@@ -233,6 +234,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
233234
break
234235
}
235236

237+
if shouldUpdate := checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate {
238+
break
239+
}
240+
236241
job := &batchv1.Job{}
237242
if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
238243
// If the submitting Kubernetes Job reaches the backoff limit, transition the status to `Complete` or `Failed`.
@@ -921,3 +926,26 @@ func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *ray
921926
rayJob.Status.Message = fmt.Sprintf("The RayJob has passed the activeDeadlineSeconds. StartTime: %v. ActiveDeadlineSeconds: %d", rayJob.Status.StartTime, *rayJob.Spec.ActiveDeadlineSeconds)
922927
return true
923928
}
929+
930+
func checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
931+
logger := ctrl.LoggerFrom(ctx)
932+
if rayv1.IsJobTerminal(rayJob.Status.JobStatus) && rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusRunning {
933+
rayJobDeploymentGracePeriodTime, err := strconv.Atoi(os.Getenv(utils.RAYJOB_DEPLOYMENT_STATUS_TRANSITION_GRACE_PERIOD_SECONDS))
934+
if err != nil {
935+
rayJobDeploymentGracePeriodTime = utils.DEFAULT_RAYJOB_DEPLOYMENT_STATUS_TRANSITION_GRACE_PERIOD_SECONDS
936+
}
937+
938+
if time.Now().Before(rayJob.Status.RayJobStatusInfo.EndTime.Add(time.Duration(rayJobDeploymentGracePeriodTime) * time.Second)) {
939+
return false
940+
}
941+
logger.Info("JobDeploymentStatus does not transition to Complete or Failed within the grace period after JobStatus reaches a terminal state.", "EndTime", rayJob.Status.RayJobStatusInfo.EndTime, "rayJobDeploymentGracePeriodTime", rayJobDeploymentGracePeriodTime)
942+
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusComplete
943+
if rayJob.Status.JobStatus == rayv1.JobStatusFailed {
944+
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
945+
}
946+
rayJob.Status.Reason = rayv1.JobDeploymentStatusTransitionGracePeriodExceeded
947+
rayJob.Status.Message = "JobDeploymentStatus does not transition to Complete or Failed within the grace period after JobStatus reaches a terminal state."
948+
return true
949+
}
950+
return false
951+
}

ray-operator/controllers/ray/rayjob_controller_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ var _ = Context("RayJob with different submission modes", func() {
271271
It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
272272
// Update fake dashboard client to return job info with "Succeeded" status.
273273
getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
274-
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
274+
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded, EndTime: uint64(time.Now().UnixMilli())}, nil
275275
}
276276
fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
277277
defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
@@ -496,7 +496,7 @@ var _ = Context("RayJob with different submission modes", func() {
496496
It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
497497
// Update fake dashboard client to return job info with "Succeeded" status.
498498
getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
499-
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
499+
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded, EndTime: uint64(time.Now().UnixMilli())}, nil
500500
}
501501
fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
502502
defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
@@ -661,7 +661,7 @@ var _ = Context("RayJob with different submission modes", func() {
661661
// Update fake dashboard client to return job info with "Failed" status.
662662
//nolint:unparam // this is a mock and the function signature cannot change
663663
getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) {
664-
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusFailed}, nil
664+
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusFailed, EndTime: uint64(time.Now().UnixMilli())}, nil
665665
}
666666
fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
667667
defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
@@ -756,7 +756,7 @@ var _ = Context("RayJob with different submission modes", func() {
756756
// Update fake dashboard client to return job info with "Failed" status.
757757
//nolint:unparam // this is a mock and the function signature cannot change
758758
getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) {
759-
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
759+
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded, EndTime: uint64(time.Now().UnixMilli())}, nil
760760
}
761761
fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
762762
defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
@@ -966,7 +966,7 @@ var _ = Context("RayJob with different submission modes", func() {
966966
By("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
967967
// Update fake dashboard client to return job info with "Succeeded" status.
968968
getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
969-
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
969+
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded, EndTime: uint64(time.Now().UnixMilli())}, nil
970970
}
971971
fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
972972
defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
@@ -1088,7 +1088,7 @@ var _ = Context("RayJob with different submission modes", func() {
10881088
By("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
10891089
// Update fake dashboard client to return job info with "Succeeded" status.
10901090
getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
1091-
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
1091+
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded, EndTime: uint64(time.Now().UnixMilli())}, nil
10921092
}
10931093
fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
10941094
defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
@@ -1223,7 +1223,7 @@ var _ = Context("RayJob with different submission modes", func() {
12231223
By("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
12241224
// Update fake dashboard client to return job info with "Succeeded" status.
12251225
getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
1226-
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
1226+
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded, EndTime: uint64(time.Now().UnixMilli())}, nil
12271227
}
12281228
fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
12291229

@@ -1333,7 +1333,7 @@ var _ = Context("RayJob with different submission modes", func() {
13331333
By("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
13341334
// Update fake dashboard client to return job info with "Succeeded" status.
13351335
getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
1336-
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
1336+
return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded, EndTime: uint64(time.Now().UnixMilli())}, nil
13371337
}
13381338
fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
13391339
defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)

ray-operator/controllers/ray/utils/constant.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,16 @@ const (
148148
// If set to true, the RayJob CR itself will be deleted if shutdownAfterJobFinishes is set to true. Note that all resources created by the RayJob CR will be deleted, including the K8s Job.
149149
DELETE_RAYJOB_CR_AFTER_JOB_FINISHES = "DELETE_RAYJOB_CR_AFTER_JOB_FINISHES"
150150

151+
// If `JobDeploymentStatus` does not transition to `Complete` or `Failed` within
152+
// `RAYJOB_DEPLOYMENT_STATUS_TRANSITION_GRACE_PERIOD_SECONDS` seconds after `JobStatus`
153+
// reaches a terminal state, KubeRay will update `JobDeploymentStatus` to either
154+
// `Complete` or `Failed` directly.
155+
156+
// If this occurs, it is likely due to a system-level issue (e.g., a Ray bug) that prevents the
157+
// `ray job submit` process in the Kubernetes Job submitter from exiting.
158+
RAYJOB_DEPLOYMENT_STATUS_TRANSITION_GRACE_PERIOD_SECONDS = "RAYJOB_DEPLOYMENT_STATUS_TRANSITION_GRACE_PERIOD_SECONDS"
159+
DEFAULT_RAYJOB_DEPLOYMENT_STATUS_TRANSITION_GRACE_PERIOD_SECONDS = 300
160+
151161
// Ray core default configurations
152162
DefaultWorkerRayGcsReconnectTimeoutS = "600"
153163

0 commit comments

Comments
 (0)