Skip to content

Commit 73e6c5d

Browse files
authored
[Feature][RayJob]: Generate submitter and RayCluster creation/deletion events (#2389)
1 parent 8f06197 commit 73e6c5d

File tree

4 files changed

+238
-18
lines changed

4 files changed

+238
-18
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
224224
// mode is not stuck in the `Running` status indefinitely.
225225
namespacedName := common.RayJobK8sJobNamespacedName(rayJobInstance)
226226
if err := r.Client.Get(ctx, namespacedName, job); err != nil {
227-
logger.Error(err, "Failed to get the submitter Kubernetes Job", "NamespacedName", namespacedName)
227+
logger.Error(err, "Failed to get the submitter Kubernetes Job for RayJob", "NamespacedName", namespacedName)
228228
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
229229
}
230230
if shouldUpdate := r.checkK8sJobAndUpdateStatusIfNeeded(ctx, rayJobInstance, job); shouldUpdate {
@@ -443,7 +443,7 @@ func (r *RayJobReconciler) createK8sJobIfNeed(ctx context.Context, rayJobInstanc
443443
return err
444444
}
445445

446-
logger.Info("Kubernetes Job already exists", "RayJob", rayJobInstance.Name, "Kubernetes Job", job.Name)
446+
logger.Info("The submitter Kubernetes Job for RayJob already exists", "RayJob", rayJobInstance.Name, "Kubernetes Job", job.Name)
447447
return nil
448448
}
449449

@@ -528,12 +528,12 @@ func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *
528528

529529
// Create the Kubernetes Job
530530
if err := r.Client.Create(ctx, job); err != nil {
531-
logger.Error(err, "Failed to create new Kubernetes Job")
532-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, "k8sJobCreationFailed", "Failed to create new Kubernetes Job %s: %v", job.Name, err)
531+
logger.Error(err, "Failed to create new submitter Kubernetes Job for RayJob")
532+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToCreateRayJobSubmitter), "Failed to create new Kubernetes Job %s/%s: %v", job.Namespace, job.Name, err)
533533
return err
534534
}
535-
logger.Info("Kubernetes Job created", "RayJob", rayJobInstance.Name, "Kubernetes Job", job.Name)
536-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "k8sJobCreationCreated", "Created Kubernetes Job %s", job.Name)
535+
logger.Info("Created submitter Kubernetes Job for RayJob", "RayJob", rayJobInstance.Name, "Kubernetes Job", job.Name)
536+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.CreatedRayJobSubmitter), "Created Kubernetes Job %s/%s", job.Namespace, job.Name)
537537
return nil
538538
}
539539

@@ -559,13 +559,14 @@ func (r *RayJobReconciler) deleteSubmitterJob(ctx context.Context, rayJobInstanc
559559
}
560560
} else {
561561
if !job.DeletionTimestamp.IsZero() {
562-
logger.Info("The Job deletion is ongoing.", "RayJob", rayJobInstance.Name, "Submitter K8s Job", job.Name)
562+
logger.Info("The deletion of submitter Kubernetes Job for RayJob is ongoing.", "RayJob", rayJobInstance.Name, "Submitter K8s Job", job.Name)
563563
} else {
564564
if err := r.Client.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
565+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteRayJobSubmitter), "Failed to delete submitter K8s Job %s/%s: %v", job.Namespace, job.Name, err)
565566
return false, err
566567
}
567-
logger.Info("The associated submitter Job is deleted", "RayJob", rayJobInstance.Name, "Submitter K8s Job", job.Name)
568-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted submitter K8s Job %s", job.Name)
568+
logger.Info("The associated submitter Kubernetes Job for RayJob is deleted", "RayJob", rayJobInstance.Name, "Submitter K8s Job", job.Name)
569+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.DeletedRayJobSubmitter), "Deleted submitter K8s Job %s/%s", job.Namespace, job.Name)
569570
}
570571
}
571572

@@ -585,19 +586,20 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
585586
// If the cluster is not found, it means the cluster has been already deleted.
586587
// Don't return error to make this function idempotent.
587588
isClusterDeleted = true
588-
logger.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
589+
logger.Info("The associated RayCluster for RayJob has been already deleted and it can not be found", "RayCluster", clusterIdentifier, "RayJob", rayJobInstance.Name)
589590
} else {
590591
return false, err
591592
}
592593
} else {
593594
if !cluster.DeletionTimestamp.IsZero() {
594-
logger.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
595+
logger.Info("The deletion of the associated RayCluster for RayJob is ongoing.", "RayJob", rayJobInstance.Name, "RayCluster", cluster.Name)
595596
} else {
596597
if err := r.Delete(ctx, &cluster); err != nil {
598+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteRayCluster), "Failed to delete cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
597599
return false, err
598600
}
599-
logger.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
600-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
601+
logger.Info("The associated RayCluster for RayJob is deleted", "RayCluster", clusterIdentifier, "RayJob", rayJobInstance.Name)
602+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.DeletedRayCluster), "Deleted cluster %s/%s", cluster.Namespace, cluster.Name)
601603
}
602604
}
603605

@@ -712,14 +714,15 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
712714
return nil, err
713715
}
714716
if err := r.Create(ctx, rayClusterInstance); err != nil {
717+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToCreateRayCluster), "Failed to create RayCluster %s/%s: %v", rayClusterInstance.Namespace, rayClusterInstance.Name, err)
715718
return nil, err
716719
}
717-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Created", "Created RayCluster %s", rayJobInstance.Status.RayClusterName)
720+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.CreatedRayCluster), "Created RayCluster %s/%s", rayClusterInstance.Namespace, rayClusterInstance.Name)
718721
} else {
719722
return nil, err
720723
}
721724
}
722-
logger.Info("Found associated RayCluster for RayJob", "RayJob", rayJobInstance.Name, "RayCluster", rayClusterNamespacedName)
725+
logger.Info("Found the associated RayCluster for RayJob", "RayJob", rayJobInstance.Name, "RayCluster", rayClusterNamespacedName)
723726

724727
// Verify that RayJob is not in cluster selector mode first to avoid nil pointer dereference error during spec comparison.
725728
// This is checked by ensuring len(rayJobInstance.Spec.ClusterSelector) equals 0.

ray-operator/controllers/ray/rayjob_controller_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,67 @@ var _ = Context("RayJob with different submission modes", func() {
312312
})
313313
})
314314

315+
Describe("Invalid RayJob in K8sJobMode", Ordered, func() {
316+
ctx := context.Background()
317+
namespace := "default"
318+
rayJob := rayJobTemplate("rayjob-invalid-test", namespace)
319+
rayCluster := &rayv1.RayCluster{Spec: *rayJob.Spec.RayClusterSpec}
320+
template := common.GetDefaultSubmitterTemplate(rayCluster)
321+
template.Spec.RestartPolicy = "" // Make it invalid to create a submitter. Ref: https://github.com/ray-project/kuberay/pull/2389#issuecomment-2359564334
322+
rayJob.Spec.SubmitterPodTemplate = &template
323+
324+
It("Verify RayJob spec", func() {
325+
Expect(rayJob.Spec.SubmissionMode).To(Equal(rayv1.K8sJobMode))
326+
})
327+
328+
It("Create a RayJob custom resource", func() {
329+
err := k8sClient.Create(ctx, rayJob)
330+
Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob")
331+
Eventually(
332+
getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob),
333+
time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name)
334+
})
335+
336+
It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() {
337+
Eventually(
338+
getRayJobDeploymentStatus(ctx, rayJob),
339+
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
340+
})
341+
342+
It("In Initializing state, the RayCluster should eventually be created.", func() {
343+
Eventually(
344+
getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster),
345+
time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName)
346+
})
347+
348+
It("Make RayCluster.Status.State to be rayv1.Ready", func() {
349+
// The RayCluster is not 'Ready' yet because Pods are not running and ready.
350+
Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
351+
352+
updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
353+
updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
354+
355+
// The RayCluster.Status.State should be Ready.
356+
Eventually(
357+
getClusterState(ctx, namespace, rayCluster.Name),
358+
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
359+
})
360+
361+
It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() {
362+
Eventually(
363+
func() ([]corev1.Event, error) {
364+
events := &corev1.EventList{}
365+
if err := k8sClient.List(ctx, events, client.InNamespace(rayJob.Namespace)); err != nil {
366+
return nil, err
367+
}
368+
return events.Items, nil
369+
},
370+
time.Second*3, time.Millisecond*500).Should(ContainElement(HaveField("Message", ContainSubstring("Failed to create new Kubernetes Job default/rayjob-invalid-test"))))
371+
372+
_ = k8sClient.Delete(ctx, rayJob)
373+
})
374+
})
375+
315376
Describe("Successful RayJob in K8sjobMode with DELETE_RAYJOB_CR_AFTER_JOB_FINISHES", Ordered, func() {
316377
ctx := context.Background()
317378
namespace := "default"

ray-operator/controllers/ray/rayjob_controller_unit_test.go

Lines changed: 149 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ray
22

33
import (
44
"context"
5+
"errors"
56
"strings"
67
"testing"
78

@@ -22,7 +23,7 @@ import (
2223
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
2324
)
2425

25-
func TestCreateK8sJobIfNeed(t *testing.T) {
26+
func TestCreateRayJobSubmitterIfNeed(t *testing.T) {
2627
newScheme := runtime.NewScheme()
2728
_ = rayv1.AddToScheme(newScheme)
2829
_ = batchv1.AddToScheme(newScheme)
@@ -375,7 +376,7 @@ func TestValidateRayJobSpec(t *testing.T) {
375376
assert.Error(t, err, "The RayJob is invalid because the backoffLimit must be a positive integer.")
376377
}
377378

378-
func TestFailedCreatek8sJob(t *testing.T) {
379+
func TestFailedToCreateRayJobSubmitterEvent(t *testing.T) {
379380
rayJob := &rayv1.RayJob{
380381
ObjectMeta: metav1.ObjectMeta{
381382
Name: "test-rayjob",
@@ -400,7 +401,7 @@ func TestFailedCreatek8sJob(t *testing.T) {
400401

401402
fakeClient := clientFake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
402403
Create: func(_ context.Context, _ client.WithWatch, _ client.Object, _ ...client.CreateOption) error {
403-
return utils.ErrFailedCreateWorkerPod
404+
return errors.New("random")
404405
},
405406
}).WithScheme(scheme.Scheme).Build()
406407

@@ -429,3 +430,148 @@ func TestFailedCreatek8sJob(t *testing.T) {
429430

430431
assert.Truef(t, foundFailureEvent, "Expected event to be generated for job creation failure, got events: %s", strings.Join(events, "\n"))
431432
}
433+
434+
func TestFailedCreateRayClusterEvent(t *testing.T) {
435+
rayJob := &rayv1.RayJob{
436+
ObjectMeta: metav1.ObjectMeta{
437+
Name: "test-rayjob",
438+
Namespace: "default",
439+
},
440+
Spec: rayv1.RayJobSpec{
441+
RayClusterSpec: &rayv1.RayClusterSpec{},
442+
},
443+
}
444+
445+
fakeClient := clientFake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
446+
Create: func(_ context.Context, _ client.WithWatch, _ client.Object, _ ...client.CreateOption) error {
447+
return errors.New("random")
448+
},
449+
}).WithScheme(scheme.Scheme).Build()
450+
451+
recorder := record.NewFakeRecorder(100)
452+
453+
reconciler := &RayJobReconciler{
454+
Client: fakeClient,
455+
Recorder: recorder,
456+
Scheme: scheme.Scheme,
457+
}
458+
459+
_, err := reconciler.getOrCreateRayClusterInstance(context.Background(), rayJob)
460+
461+
assert.NotNil(t, err, "Expected error due to cluster creation failure")
462+
463+
var foundFailureEvent bool
464+
events := []string{}
465+
for len(recorder.Events) > 0 {
466+
event := <-recorder.Events
467+
if strings.Contains(event, "Failed to create RayCluster") {
468+
foundFailureEvent = true
469+
break
470+
}
471+
events = append(events, event)
472+
}
473+
474+
assert.Truef(t, foundFailureEvent, "Expected event to be generated for cluster creation failure, got events: %s", strings.Join(events, "\n"))
475+
}
476+
477+
func TestFailedDeleteRayJobSubmitterEvent(t *testing.T) {
478+
newScheme := runtime.NewScheme()
479+
_ = batchv1.AddToScheme(newScheme)
480+
481+
rayJob := &rayv1.RayJob{
482+
ObjectMeta: metav1.ObjectMeta{
483+
Name: "test-rayjob",
484+
Namespace: "default",
485+
},
486+
}
487+
submitter := &batchv1.Job{
488+
ObjectMeta: metav1.ObjectMeta{
489+
Name: "test-rayjob",
490+
Namespace: "default",
491+
},
492+
}
493+
494+
fakeClient := clientFake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
495+
Delete: func(_ context.Context, _ client.WithWatch, _ client.Object, _ ...client.DeleteOption) error {
496+
return errors.New("random")
497+
},
498+
}).WithScheme(newScheme).WithRuntimeObjects(submitter).Build()
499+
500+
recorder := record.NewFakeRecorder(100)
501+
502+
reconciler := &RayJobReconciler{
503+
Client: fakeClient,
504+
Recorder: recorder,
505+
Scheme: scheme.Scheme,
506+
}
507+
508+
_, err := reconciler.deleteSubmitterJob(context.Background(), rayJob)
509+
510+
assert.NotNil(t, err, "Expected error due to job deletion failure")
511+
512+
var foundFailureEvent bool
513+
events := []string{}
514+
for len(recorder.Events) > 0 {
515+
event := <-recorder.Events
516+
if strings.Contains(event, "Failed to delete submitter K8s Job") {
517+
foundFailureEvent = true
518+
break
519+
}
520+
events = append(events, event)
521+
}
522+
523+
assert.Truef(t, foundFailureEvent, "Expected event to be generated for cluster deletion failure, got events: %s", strings.Join(events, "\n"))
524+
}
525+
526+
func TestFailedDeleteRayClusterEvent(t *testing.T) {
527+
newScheme := runtime.NewScheme()
528+
_ = rayv1.AddToScheme(newScheme)
529+
530+
rayCluster := &rayv1.RayCluster{
531+
ObjectMeta: metav1.ObjectMeta{
532+
Name: "test-raycluster",
533+
Namespace: "default",
534+
},
535+
}
536+
537+
rayJob := &rayv1.RayJob{
538+
ObjectMeta: metav1.ObjectMeta{
539+
Name: "test-rayjob",
540+
Namespace: "default",
541+
},
542+
Status: rayv1.RayJobStatus{
543+
RayClusterName: "test-raycluster",
544+
},
545+
}
546+
547+
fakeClient := clientFake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
548+
Delete: func(_ context.Context, _ client.WithWatch, _ client.Object, _ ...client.DeleteOption) error {
549+
return errors.New("random")
550+
},
551+
}).WithScheme(newScheme).WithRuntimeObjects(rayCluster).Build()
552+
553+
recorder := record.NewFakeRecorder(100)
554+
555+
reconciler := &RayJobReconciler{
556+
Client: fakeClient,
557+
Recorder: recorder,
558+
Scheme: scheme.Scheme,
559+
}
560+
561+
_, err := reconciler.deleteClusterResources(context.Background(), rayJob)
562+
563+
assert.NotNil(t, err, "Expected error due to cluster deletion failure")
564+
565+
var foundFailureEvent bool
566+
events := []string{}
567+
for len(recorder.Events) > 0 {
568+
event := <-recorder.Events
569+
if strings.Contains(event, "Failed to delete cluster") {
570+
foundFailureEvent = true
571+
break
572+
}
573+
events = append(events, event)
574+
}
575+
576+
assert.Truef(t, foundFailureEvent, "Expected event to be generated for cluster deletion failure, got events: %s", strings.Join(events, "\n"))
577+
}

ray-operator/controllers/ray/utils/constant.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,16 @@ const (
248248
CreatedRedisCleanupJob K8sEventType = "CreatedRedisCleanupJob"
249249
FailedToCreateRedisCleanupJob K8sEventType = "FailedToCreateRedisCleanupJob"
250250

251+
// RayJob event list
252+
CreatedRayJobSubmitter K8sEventType = "CreatedRayJobSubmitter"
253+
DeletedRayJobSubmitter K8sEventType = "DeletedRayJobSubmitter"
254+
FailedToCreateRayJobSubmitter K8sEventType = "FailedToCreateRayJobSubmitter"
255+
FailedToDeleteRayJobSubmitter K8sEventType = "FailedToDeleteRayJobSubmitter"
256+
CreatedRayCluster K8sEventType = "CreatedRayCluster"
257+
DeletedRayCluster K8sEventType = "DeletedRayCluster"
258+
FailedToCreateRayCluster K8sEventType = "FailedToCreateRayCluster"
259+
FailedToDeleteRayCluster K8sEventType = "FailedToDeleteRayCluster"
260+
251261
// Generic Pod event list
252262
DeletedPod K8sEventType = "DeletedPod"
253263
FailedToDeletePod K8sEventType = "FailedToDeletePod"

0 commit comments

Comments
 (0)