@@ -135,10 +135,10 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
135
135
// Check both active and pending Ray clusters to see if the head Pod is ready to serve requests.
136
136
// This is important to ensure the reliability of the serve service because the head Pod cannot
137
137
// rely on readiness probes to determine serve readiness.
138
- if err := r .updateHeadPodServeLabel (ctx , activeRayClusterInstance , rayServiceInstance .Spec .ExcludeHeadPodFromServeSvc ); err != nil {
138
+ if err := r .updateHeadPodServeLabel (ctx , rayServiceInstance , activeRayClusterInstance , rayServiceInstance .Spec .ExcludeHeadPodFromServeSvc ); err != nil {
139
139
return ctrl.Result {RequeueAfter : ServiceDefaultRequeueDuration }, err
140
140
}
141
- if err := r .updateHeadPodServeLabel (ctx , pendingRayClusterInstance , rayServiceInstance .Spec .ExcludeHeadPodFromServeSvc ); err != nil {
141
+ if err := r .updateHeadPodServeLabel (ctx , rayServiceInstance , pendingRayClusterInstance , rayServiceInstance .Spec .ExcludeHeadPodFromServeSvc ); err != nil {
142
142
return ctrl.Result {RequeueAfter : ServiceDefaultRequeueDuration }, err
143
143
}
144
144
@@ -517,7 +517,7 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi
517
517
if activeRayCluster , err = constructRayClusterForRayService (rayServiceInstance , activeRayCluster .Name , r .Scheme ); err != nil {
518
518
return nil , nil , err
519
519
}
520
- err = r .updateRayClusterInstance (ctx , activeRayCluster )
520
+ err = r .updateRayClusterInstance (ctx , rayServiceInstance , activeRayCluster )
521
521
return activeRayCluster , pendingRayCluster , err
522
522
}
523
523
@@ -527,7 +527,7 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi
527
527
if pendingRayCluster , err = constructRayClusterForRayService (rayServiceInstance , pendingRayCluster .Name , r .Scheme ); err != nil {
528
528
return nil , nil , err
529
529
}
530
- err = r .updateRayClusterInstance (ctx , pendingRayCluster )
530
+ err = r .updateRayClusterInstance (ctx , rayServiceInstance , pendingRayCluster )
531
531
return activeRayCluster , pendingRayCluster , err
532
532
}
533
533
@@ -567,8 +567,10 @@ func (r *RayServiceReconciler) cleanUpRayClusterInstance(ctx context.Context, ra
567
567
if reasonForDeletion != "" {
568
568
logger .Info ("reconcileRayCluster" , "delete Ray cluster" , rayClusterInstance .Name , "reason" , reasonForDeletion )
569
569
if err := r .Delete (ctx , & rayClusterInstance , client .PropagationPolicy (metav1 .DeletePropagationBackground )); err != nil {
570
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeWarning , string (utils .FailedToDeleteRayCluster ), "Failed to delete the RayCluster %s/%s: %v" , rayClusterInstance .Namespace , rayClusterInstance .Name , err )
570
571
return err
571
572
}
573
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeNormal , string (utils .DeletedRayCluster ), "Deleted the RayCluster %s/%s" , rayClusterInstance .Namespace , rayClusterInstance .Name )
572
574
}
573
575
}
574
576
}
@@ -694,7 +696,7 @@ func shouldPrepareNewCluster(ctx context.Context, rayServiceInstance *rayv1.RayS
694
696
}
695
697
696
698
// updateRayClusterInstance updates the RayCluster instance.
697
- func (r * RayServiceReconciler ) updateRayClusterInstance (ctx context.Context , rayClusterInstance * rayv1.RayCluster ) error {
699
+ func (r * RayServiceReconciler ) updateRayClusterInstance (ctx context.Context , rayServiceInstance * rayv1. RayService , rayClusterInstance * rayv1.RayCluster ) error {
698
700
logger := ctrl .LoggerFrom (ctx )
699
701
logger .Info ("updateRayClusterInstance" , "Name" , rayClusterInstance .Name , "Namespace" , rayClusterInstance .Namespace )
700
702
@@ -721,7 +723,13 @@ func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, ray
721
723
currentRayCluster .Annotations = rayClusterInstance .Annotations
722
724
723
725
// Update the RayCluster
724
- return r .Update (ctx , currentRayCluster )
726
+ err = r .Update (ctx , currentRayCluster )
727
+ if err != nil {
728
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeWarning , string (utils .FailedToUpdateRayCluster ), "Failed to update the RayCluster %s/%s: %v" , currentRayCluster .Namespace , currentRayCluster .Name , err )
729
+ } else {
730
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeNormal , string (utils .UpdatedRayCluster ), "Updated the RayCluster %s/%s" , currentRayCluster .Namespace , currentRayCluster .Name )
731
+ }
732
+ return err
725
733
}
726
734
727
735
// createRayClusterInstance deletes the old RayCluster instance if exists. Only when no existing RayCluster, create a new RayCluster instance.
@@ -743,9 +751,11 @@ func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, ray
743
751
logger .Info ("Ray cluster already exists, config changes. Need to recreate. Delete the pending one now." , "key" , rayClusterKey .String (), "rayClusterInstance.Spec" , rayClusterInstance .Spec , "rayServiceInstance.Spec.RayClusterSpec" , rayServiceInstance .Spec .RayClusterSpec )
744
752
delErr := r .Delete (ctx , rayClusterInstance , client .PropagationPolicy (metav1 .DeletePropagationBackground ))
745
753
if delErr == nil {
754
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeNormal , string (utils .DeletedRayCluster ), "Deleted the RayCluster %s/%s" , rayClusterInstance .Namespace , rayClusterInstance .Name )
746
755
// Go to next loop and check if the ray cluster is deleted.
747
756
return nil , nil
748
757
} else if ! errors .IsNotFound (delErr ) {
758
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeWarning , string (utils .FailedToDeleteRayCluster ), "Failed to delete the RayCluster %s/%s: %v" , rayClusterInstance .Namespace , rayClusterInstance .Name , delErr )
749
759
return nil , delErr
750
760
}
751
761
// if error is `not found`, then continue.
@@ -760,10 +770,11 @@ func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, ray
760
770
return nil , err
761
771
}
762
772
if err = r .Create (ctx , rayClusterInstance ); err != nil {
773
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeWarning , string (utils .FailedToCreateRayCluster ), "Failed to create the RayCluster %s/%s: %v" , rayClusterInstance .Namespace , rayClusterInstance .Name , err )
763
774
return nil , err
764
775
}
765
776
logger .Info ("created rayCluster for rayService" , "rayCluster" , rayClusterInstance )
766
-
777
+ r . Recorder . Eventf ( rayServiceInstance , corev1 . EventTypeNormal , string ( utils . CreatedRayCluster ), "Created the RayCluster %s/%s" , rayClusterInstance . Namespace , rayClusterInstance . Name )
767
778
return rayClusterInstance , nil
768
779
}
769
780
@@ -980,8 +991,10 @@ func (r *RayServiceReconciler) reconcileServices(ctx context.Context, rayService
980
991
oldSvc .Spec = * newSvc .Spec .DeepCopy ()
981
992
logger .Info ("Update Kubernetes Service" , "serviceType" , serviceType )
982
993
if updateErr := r .Update (ctx , oldSvc ); updateErr != nil {
994
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeWarning , string (utils .FailedToUpdateService ), "Failed to update the service %s/%s, %v" , oldSvc .Namespace , oldSvc .Name , updateErr )
983
995
return nil , updateErr
984
996
}
997
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeNormal , string (utils .UpdatedService ), "Updated the service %s/%s" , oldSvc .Namespace , oldSvc .Name )
985
998
// Return the updated service.
986
999
return oldSvc , nil
987
1000
} else if errors .IsNotFound (err ) {
@@ -990,8 +1003,10 @@ func (r *RayServiceReconciler) reconcileServices(ctx context.Context, rayService
990
1003
return nil , err
991
1004
}
992
1005
if err := r .Create (ctx , newSvc ); err != nil {
1006
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeWarning , string (utils .FailedToCreateService ), "Failed to create the service %s/%s, %v" , newSvc .Namespace , newSvc .Name , err )
993
1007
return nil , err
994
1008
}
1009
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeNormal , string (utils .CreatedService ), "Created the service %s/%s" , newSvc .Namespace , newSvc .Name )
995
1010
return newSvc , nil
996
1011
}
997
1012
return nil , err
@@ -1045,13 +1060,15 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
1045
1060
1046
1061
if shouldUpdate {
1047
1062
if err = r .updateServeDeployment (ctx , rayServiceInstance , rayDashboardClient , rayClusterInstance .Name ); err != nil {
1063
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeWarning , string (utils .FailedToUpdateServeApplications ), "Failed to update serve applications to the RayCluster %s/%s: %v" , rayClusterInstance .Namespace , rayClusterInstance .Name , err )
1048
1064
return false , serveApplications , err
1049
1065
}
1066
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeNormal , string (utils .UpdatedServeApplications ), "Updated serve applications to the RayCluster %s/%s" , rayClusterInstance .Namespace , rayClusterInstance .Name )
1050
1067
}
1051
1068
return isReady , serveApplications , nil
1052
1069
}
1053
1070
1054
- func (r * RayServiceReconciler ) updateHeadPodServeLabel (ctx context.Context , rayClusterInstance * rayv1.RayCluster , excludeHeadPodFromServeSvc bool ) error {
1071
+ func (r * RayServiceReconciler ) updateHeadPodServeLabel (ctx context.Context , rayServiceInstance * rayv1. RayService , rayClusterInstance * rayv1.RayCluster , excludeHeadPodFromServeSvc bool ) error {
1055
1072
// `updateHeadPodServeLabel` updates the head Pod's serve label based on the health status of the proxy actor.
1056
1073
// If `excludeHeadPodFromServeSvc` is true, the head Pod will not be used to serve requests, regardless of proxy actor health.
1057
1074
// If `excludeHeadPodFromServeSvc` is false, the head Pod's serve label will be set based on the health check result.
@@ -1092,8 +1109,10 @@ func (r *RayServiceReconciler) updateHeadPodServeLabel(ctx context.Context, rayC
1092
1109
if oldLabel != newLabel {
1093
1110
headPod .Labels [utils .RayClusterServingServiceLabelKey ] = newLabel
1094
1111
if updateErr := r .Update (ctx , headPod ); updateErr != nil {
1112
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeWarning , string (utils .FailedToUpdateHeadPodServeLabel ), "Failed to update the serve label to %q for the Head Pod %s/%s: %v" , newLabel , headPod .Namespace , headPod .Name , updateErr )
1095
1113
return updateErr
1096
1114
}
1115
+ r .Recorder .Eventf (rayServiceInstance , corev1 .EventTypeNormal , string (utils .UpdatedHeadPodServeLabel ), "Updated the serve label to %q for the Head Pod %s/%s" , newLabel , headPod .Namespace , headPod .Name )
1097
1116
}
1098
1117
1099
1118
return nil
0 commit comments