Skip to content

Commit 4dcdc8e

Browse files
[cherry-pick] Generate RayCluster Hash on KubeRay Version Change (#2320) (#2339)
* Re-generate hash when KubeRay version changes * Change logic to DoNothing on KubeRay version mismatch * Add KubeRay version annotation to test * Move update logic * Update rayservice_controller.go * Add unit test * Add period * Go vet changes * Update rayservice_controller_unit_test.go * Address test comments --------- Signed-off-by: Ryan O'Leary <[email protected]> Signed-off-by: ryanaoleary <[email protected]> Co-authored-by: ryanaoleary <[email protected]>
1 parent c63dddf commit 4dcdc8e

File tree

3 files changed

+39
-4
lines changed

3 files changed

+39
-4
lines changed

ray-operator/controllers/ray/rayservice_controller.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,14 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r
531531
return RolloutNew
532532
}
533533

534-
// Case 1: If everything is identical except for the Replicas and WorkersToDelete of
534+
// Case 1: If the KubeRay version has changed, update the RayCluster to get the cluster hash and new KubeRay version.
535+
activeKubeRayVersion := activeRayCluster.ObjectMeta.Annotations[utils.KubeRayVersion]
536+
if activeKubeRayVersion != utils.KUBERAY_VERSION {
537+
logger.Info("Active RayCluster config doesn't match goal config due to mismatched KubeRay versions. Updating RayCluster.")
538+
return Update
539+
}
540+
541+
// Case 2: If everything is identical except for the Replicas and WorkersToDelete of
535542
// each WorkerGroup, then do nothing.
536543
activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
537544
goalClusterHash, err := generateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec)
@@ -548,7 +555,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r
548555
return DoNothing
549556
}
550557

551-
// Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of
558+
// Case 3: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of
552559
// the existing workergroups, and one or more new workergroups are added at the end, then update the cluster.
553560
activeClusterNumWorkerGroups, err := strconv.Atoi(activeRayCluster.ObjectMeta.Annotations[utils.NumWorkerGroupsKey])
554561
if err != nil {
@@ -576,7 +583,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r
576583
}
577584
}
578585

579-
// Case 3: Otherwise, rollout a new cluster.
586+
// Case 4: Otherwise, rollout a new cluster.
580587
logger.Info("Active RayCluster config doesn't match goal config. " +
581588
"RayService operator should prepare a new Ray cluster.\n" +
582589
"* Active RayCluster config hash: " + activeClusterHash + "\n" +
@@ -733,6 +740,9 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(ctx context.Cont
733740
}
734741
rayClusterAnnotations[utils.NumWorkerGroupsKey] = strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs))
735742

743+
// set the KubeRay version used to create the RayCluster
744+
rayClusterAnnotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION
745+
736746
rayCluster := &rayv1.RayCluster{
737747
ObjectMeta: metav1.ObjectMeta{
738748
Labels: rayClusterLabel,

ray-operator/controllers/ray/rayservice_controller_unit_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,15 +734,18 @@ func TestReconcileRayCluster(t *testing.T) {
734734
Annotations: map[string]string{
735735
utils.HashWithoutReplicasAndWorkersToDeleteKey: hash,
736736
utils.NumWorkerGroupsKey: strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs)),
737+
utils.KubeRayVersion: utils.KUBERAY_VERSION,
737738
},
738739
},
739740
}
740741

741742
tests := map[string]struct {
742743
activeCluster *rayv1.RayCluster
744+
kubeRayVersion string
743745
updateRayClusterSpec bool
744746
enableZeroDowntime bool
745747
shouldPrepareNewCluster bool
748+
updateKubeRayVersion bool
746749
}{
747750
// Test 1: Neither active nor pending clusters exist. The `markRestart` function will be called, so the `PendingServiceStatus.RayClusterName` should be set.
748751
"Zero-downtime upgrade is enabled. Neither active nor pending clusters exist.": {
@@ -779,6 +782,17 @@ func TestReconcileRayCluster(t *testing.T) {
779782
enableZeroDowntime: false,
780783
shouldPrepareNewCluster: true,
781784
},
785+
// Test 6: If the active KubeRay version doesn't match the KubeRay version annotation on the RayCluster, update the RayCluster's hash and KubeRay version
786+
// annotations first before checking whether to trigger a zero downtime upgrade. This behavior occurs because when we upgrade the KubeRay CRD, the hash
787+
// generated by different KubeRay versions may differ, which can accidentally trigger a zero downtime upgrade.
788+
"Active RayCluster exists. KubeRay version is mismatched. Update the RayCluster.": {
789+
activeCluster: activeCluster.DeepCopy(),
790+
updateRayClusterSpec: true,
791+
enableZeroDowntime: true,
792+
shouldPrepareNewCluster: false,
793+
updateKubeRayVersion: true,
794+
kubeRayVersion: "new-version",
795+
},
782796
}
783797

784798
for name, tc := range tests {
@@ -790,11 +804,16 @@ func TestReconcileRayCluster(t *testing.T) {
790804
}
791805
runtimeObjects := []runtime.Object{}
792806
if tc.activeCluster != nil {
807+
// Update 'ray.io/kuberay-version' to a new version if kubeRayVersion is set.
808+
if tc.updateKubeRayVersion {
809+
tc.activeCluster.Annotations[utils.KubeRayVersion] = tc.kubeRayVersion
810+
}
793811
runtimeObjects = append(runtimeObjects, tc.activeCluster.DeepCopy())
794812
}
795813
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
796814
r := RayServiceReconciler{
797815
Client: fakeClient,
816+
Scheme: newScheme,
798817
}
799818
service := rayService.DeepCopy()
800819
if tc.updateRayClusterSpec {
@@ -804,9 +823,14 @@ func TestReconcileRayCluster(t *testing.T) {
804823
service.Status.ActiveServiceStatus.RayClusterName = tc.activeCluster.Name
805824
}
806825
assert.Equal(t, "", service.Status.PendingServiceStatus.RayClusterName)
807-
_, _, err = r.reconcileRayCluster(ctx, service)
826+
activeRayCluster, _, err := r.reconcileRayCluster(ctx, service)
808827
assert.Nil(t, err)
809828

829+
// If the KubeRay version has changed, check that the RayCluster annotations have been updated to the correct version.
830+
if tc.updateKubeRayVersion && activeRayCluster != nil {
831+
assert.Equal(t, utils.KUBERAY_VERSION, activeRayCluster.Annotations[utils.KubeRayVersion])
832+
}
833+
810834
// If KubeRay operator is preparing a new cluster, the `PendingServiceStatus.RayClusterName` should be set by calling the function `markRestart`.
811835
if tc.shouldPrepareNewCluster {
812836
assert.NotEqual(t, "", service.Status.PendingServiceStatus.RayClusterName)

ray-operator/controllers/ray/utils/constant.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const (
2525
RayClusterHeadlessServiceLabelKey = "ray.io/headless-worker-svc"
2626
HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete"
2727
NumWorkerGroupsKey = "ray.io/num-worker-groups"
28+
KubeRayVersion = "ray.io/kuberay-version"
2829

2930
// In KubeRay, the Ray container must be the first application container in a head or worker Pod.
3031
RayContainerIndex = 0

0 commit comments

Comments
 (0)