Skip to content

Commit d025792

Browse files
authored
[Feature][RayCluster]: Generate GCS FT Redis Cleanup Job creation events (#2382)
Signed-off-by: Rueian <[email protected]>
1 parent b08a5ae commit d025792

File tree

3 files changed

+95
-0
lines changed

3 files changed

+95
-0
lines changed

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,13 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
302302
logger.Info("Redis cleanup Job already exists. Requeue the RayCluster CR.")
303303
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, nil
304304
}
305+
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToCreateRedisCleanupJob),
306+
"Failed to create Redis cleanup Job %s/%s, %v", redisCleanupJob.Namespace, redisCleanupJob.Name, err)
305307
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
306308
}
307309
logger.Info("Created Redis cleanup Job", "name", redisCleanupJob.Name)
310+
r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.CreatedRedisCleanupJob),
311+
"Created Redis cleanup Job %s/%s", redisCleanupJob.Namespace, redisCleanupJob.Name)
308312
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, nil
309313
}
310314
}

ray-operator/controllers/ray/raycluster_controller_unit_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2428,6 +2428,93 @@ func Test_RedisCleanupFeatureFlag(t *testing.T) {
24282428
}
24292429
}
24302430

2431+
func TestEvents_RedisCleanup(t *testing.T) {
2432+
setupTest(t)
2433+
newScheme := runtime.NewScheme()
2434+
_ = rayv1.AddToScheme(newScheme)
2435+
_ = corev1.AddToScheme(newScheme)
2436+
_ = batchv1.AddToScheme(newScheme)
2437+
2438+
// Prepare a RayCluster with the GCS FT enabled and Autoscaling disabled.
2439+
gcsFTEnabledCluster := testRayCluster.DeepCopy()
2440+
if gcsFTEnabledCluster.Annotations == nil {
2441+
gcsFTEnabledCluster.Annotations = make(map[string]string)
2442+
}
2443+
gcsFTEnabledCluster.Annotations[utils.RayFTEnabledAnnotationKey] = "true"
2444+
gcsFTEnabledCluster.Spec.EnableInTreeAutoscaling = nil
2445+
2446+
// Add the Redis cleanup finalizer to the RayCluster and modify the RayCluster's DeleteTimestamp to trigger the Redis cleanup.
2447+
controllerutil.AddFinalizer(gcsFTEnabledCluster, utils.GCSFaultToleranceRedisCleanupFinalizer)
2448+
now := metav1.Now()
2449+
gcsFTEnabledCluster.DeletionTimestamp = &now
2450+
errInjected := errors.New("random error")
2451+
2452+
tests := map[string]struct {
2453+
fakeClientFn func(client.Object) client.Client
2454+
errInjected error
2455+
}{
2456+
"Created Redis cleanup Job": {
2457+
fakeClientFn: func(obj client.Object) client.Client {
2458+
return clientFake.NewClientBuilder().
2459+
WithScheme(newScheme).
2460+
WithRuntimeObjects([]runtime.Object{obj}...).
2461+
Build()
2462+
},
2463+
errInjected: nil,
2464+
},
2465+
"Failed to create Redis cleanup Job": {
2466+
fakeClientFn: func(obj client.Object) client.Client {
2467+
return clientFake.NewClientBuilder().
2468+
WithScheme(newScheme).
2469+
WithRuntimeObjects([]runtime.Object{obj}...).
2470+
WithInterceptorFuncs(interceptor.Funcs{
2471+
Create: func(_ context.Context, _ client.WithWatch, _ client.Object, _ ...client.CreateOption) error {
2472+
return errInjected
2473+
},
2474+
}).
2475+
Build()
2476+
},
2477+
errInjected: errInjected,
2478+
},
2479+
}
2480+
2481+
for message, tc := range tests {
2482+
t.Run(message, func(t *testing.T) {
2483+
cluster := gcsFTEnabledCluster.DeepCopy()
2484+
ctx := context.Background()
2485+
2486+
fakeClient := tc.fakeClientFn(cluster)
2487+
2488+
// Buffer length of 100 is arbitrary here. We should have only 1 event generated, but we keep 100
2489+
// if that isn't the case in the future. If this test starts timing out because of a full
2490+
// channel, this is probably the reason, and we should change our approach or increase buffer length.
2491+
recorder := record.NewFakeRecorder(100)
2492+
2493+
testRayClusterReconciler := &RayClusterReconciler{
2494+
Client: fakeClient,
2495+
Recorder: recorder,
2496+
Scheme: newScheme,
2497+
}
2498+
2499+
request := ctrl.Request{NamespacedName: types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}}
2500+
_, err := testRayClusterReconciler.rayClusterReconcile(ctx, request, cluster)
2501+
assert.ErrorIs(t, err, tc.errInjected)
2502+
2503+
var foundEvent bool
2504+
var events []string
2505+
for len(recorder.Events) > 0 {
2506+
event := <-recorder.Events
2507+
if strings.Contains(event, message) {
2508+
foundEvent = true
2509+
break
2510+
}
2511+
events = append(events, event)
2512+
}
2513+
assert.Truef(t, foundEvent, "Expected event to be generated for redis cleanup job creation, got events: %s", strings.Join(events, "\n"))
2514+
})
2515+
}
2516+
}
2517+
24312518
func Test_RedisCleanup(t *testing.T) {
24322519
setupTest(t)
24332520
newScheme := runtime.NewScheme()

ray-operator/controllers/ray/utils/constant.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,10 @@ const (
243243
DeletedWorkerPod K8sEventType = "DeletedWorkerPod"
244244
FailedToDeleteWorkerPod K8sEventType = "FailedToDeleteWorkerPod"
245245

246+
// Redis Cleanup Job event list
247+
CreatedRedisCleanupJob K8sEventType = "CreatedRedisCleanupJob"
248+
FailedToCreateRedisCleanupJob K8sEventType = "FailedToCreateRedisCleanupJob"
249+
246250
// Generic Pod event list
247251
DeletedPod K8sEventType = "DeletedPod"
248252
FailedToDeletePod K8sEventType = "FailedToDeletePod"

0 commit comments

Comments
 (0)