Skip to content

Commit 6432601

Browse files
dc-yingphantom5125
authored andcommitted
[Feat] Add RayJob metrics cleanup
1 parent 9c10224 commit 6432601

File tree

4 files changed

+276
-6
lines changed

4 files changed

+276
-6
lines changed

ray-operator/controllers/ray/metrics/ray_cluster_metrics_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ func TestCleanupExpiredRayClusterMetrics(t *testing.T) {
156156
assert.Empty(t, manager.cleanupQueue)
157157
}
158158

159-
// TestCleanupLoop tests the background cleanup loop
160-
func TestCleanupLoop(t *testing.T) {
159+
// TestRayClusterCleanupLoop tests the background cleanup loop
160+
func TestRayClusterCleanupLoop(t *testing.T) {
161161
// Create a context that we can cancel
162162
ctx, cancel := context.WithCancel(context.Background())
163163
defer cancel()

ray-operator/controllers/ray/metrics/ray_job_metrics.go

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package metrics
33
import (
44
"context"
55
"strconv"
6+
"sync"
7+
"time"
68

79
"github.com/go-logr/logr"
810
"github.com/prometheus/client_golang/prometheus"
@@ -17,18 +19,30 @@ type RayJobMetricsObserver interface {
1719
ObserveRayJobExecutionDuration(name, namespace string, jobDeploymentStatus rayv1.JobDeploymentStatus, retryCount int, duration float64)
1820
}
1921

22+
// RayJobMetricCleanupItem represents an item in the RayJob metric cleanup queue
23+
type RayJobMetricCleanupItem struct {
24+
Name string
25+
Namespace string
26+
DeleteAt time.Time
27+
}
28+
2029
// RayJobMetricsManager implements the prometheus.Collector and RayJobMetricsObserver interface to collect ray job metrics.
2130
type RayJobMetricsManager struct {
2231
rayJobExecutionDurationSeconds *prometheus.GaugeVec
2332
rayJobInfo *prometheus.Desc
2433
rayJobDeploymentStatus *prometheus.Desc
2534
client client.Client
2635
log logr.Logger
36+
37+
// Cleanup queue and related fields specific to RayJob metrics
38+
cleanupQueue []RayJobMetricCleanupItem
39+
queueMutex sync.Mutex
40+
metricTTL time.Duration
2741
}
2842

2943
// NewRayJobMetricsManager creates a new RayJobMetricsManager instance.
3044
func NewRayJobMetricsManager(ctx context.Context, client client.Client) *RayJobMetricsManager {
31-
collector := &RayJobMetricsManager{
45+
manager := &RayJobMetricsManager{
3246
rayJobExecutionDurationSeconds: prometheus.NewGaugeVec(
3347
prometheus.GaugeOpts{
3448
Name: "kuberay_job_execution_duration_seconds",
@@ -50,10 +64,15 @@ func NewRayJobMetricsManager(ctx context.Context, client client.Client) *RayJobM
5064
[]string{"name", "namespace", "deployment_status"},
5165
nil,
5266
),
53-
client: client,
54-
log: ctrl.LoggerFrom(ctx),
67+
client: client,
68+
log: ctrl.LoggerFrom(ctx),
69+
cleanupQueue: make([]RayJobMetricCleanupItem, 0),
70+
metricTTL: 5 * time.Minute, // Keep metrics for 5 minutes
5571
}
56-
return collector
72+
73+
// Start the cleanup goroutine
74+
go manager.startRayJobCleanupLoop(ctx)
75+
return manager
5776
}
5877

5978
// Describe implements prometheus.Collector interface Describe method.
@@ -84,6 +103,63 @@ func (r *RayJobMetricsManager) ObserveRayJobExecutionDuration(name, namespace st
84103
r.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, string(jobDeploymentStatus), strconv.Itoa(retryCount)).Set(duration)
85104
}
86105

106+
// ScheduleRayJobMetricForCleanup schedules a RayJob metric for cleanup after the TTL
107+
func (r *RayJobMetricsManager) ScheduleRayJobMetricForCleanup(name, namespace string) {
108+
r.queueMutex.Lock()
109+
defer r.queueMutex.Unlock()
110+
111+
// Add to cleanup queue
112+
item := RayJobMetricCleanupItem{
113+
Name: name,
114+
Namespace: namespace,
115+
DeleteAt: time.Now().Add(r.metricTTL),
116+
}
117+
r.cleanupQueue = append(r.cleanupQueue, item)
118+
r.log.Info("Scheduled RayJob metric for cleanup", "name", name, "namespace", namespace, "deleteAt", item.DeleteAt)
119+
}
120+
121+
// startRayJobCleanupLoop starts a loop to clean up expired RayJob metrics
122+
func (r *RayJobMetricsManager) startRayJobCleanupLoop(ctx context.Context) {
123+
// Check for expired metrics every minute
124+
ticker := time.NewTicker(1 * time.Minute)
125+
defer ticker.Stop()
126+
127+
for {
128+
select {
129+
case <-ticker.C:
130+
r.cleanupExpiredRayJobMetrics()
131+
case <-ctx.Done():
132+
return
133+
}
134+
}
135+
}
136+
137+
// cleanupExpiredRayJobMetrics removes RayJob metrics that have expired
138+
func (r *RayJobMetricsManager) cleanupExpiredRayJobMetrics() {
139+
r.queueMutex.Lock()
140+
defer r.queueMutex.Unlock()
141+
142+
now := time.Now()
143+
remainingItems := make([]RayJobMetricCleanupItem, 0)
144+
145+
for _, item := range r.cleanupQueue {
146+
if now.After(item.DeleteAt) {
147+
// Remove all metrics associated with this job
148+
r.rayJobExecutionDurationSeconds.DeletePartialMatch(prometheus.Labels{
149+
"name": item.Name,
150+
"namespace": item.Namespace,
151+
})
152+
r.log.Info("Cleaned up expired RayJob metrics", "name", item.Name, "namespace", item.Namespace)
153+
} else {
154+
// Keep non-expired items
155+
remainingItems = append(remainingItems, item)
156+
}
157+
}
158+
159+
// Update queue
160+
r.cleanupQueue = remainingItems
161+
}
162+
87163
func (r *RayJobMetricsManager) collectRayJobInfo(rayJob *rayv1.RayJob, ch chan<- prometheus.Metric) {
88164
ch <- prometheus.MustNewConstMetric(
89165
r.rayJobInfo,

ray-operator/controllers/ray/metrics/ray_job_metrics_test.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"net/http"
66
"net/http/httptest"
7+
"sync"
78
"testing"
9+
"time"
810

911
"github.com/prometheus/client_golang/prometheus"
1012
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -171,3 +173,191 @@ func TestMetricRayJobDeploymentStatus(t *testing.T) {
171173
})
172174
}
173175
}
176+
177+
// TestRayJobMetricCleanupItem tests the structure of RayJobMetricCleanupItem
178+
func TestRayJobMetricCleanupItem(t *testing.T) {
179+
// Create a test cleanup item
180+
item := RayJobMetricCleanupItem{
181+
Name: "test-job",
182+
Namespace: "default",
183+
DeleteAt: time.Now().Add(5 * time.Minute),
184+
}
185+
186+
// Verify the fields are correctly set
187+
assert.Equal(t, "test-job", item.Name)
188+
assert.Equal(t, "default", item.Namespace)
189+
assert.WithinDuration(t, time.Now().Add(5*time.Minute), item.DeleteAt, time.Second)
190+
}
191+
192+
// TestScheduleRayJobMetricForCleanup tests adding items to the cleanup queue
193+
func TestScheduleRayJobMetricForCleanup(t *testing.T) {
194+
// Create a fake client
195+
k8sScheme := runtime.NewScheme()
196+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
197+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
198+
// Create a metrics manager
199+
manager := NewRayJobMetricsManager(context.Background(), client)
200+
201+
// Schedule a cleanup for a job
202+
manager.ScheduleRayJobMetricForCleanup("test-job", "default")
203+
204+
// Verify the item was added to the queue
205+
manager.queueMutex.Lock()
206+
defer manager.queueMutex.Unlock()
207+
208+
assert.Len(t, manager.cleanupQueue, 1)
209+
assert.Equal(t, "test-job", manager.cleanupQueue[0].Name)
210+
assert.Equal(t, "default", manager.cleanupQueue[0].Namespace)
211+
assert.WithinDuration(t, time.Now().Add(5*time.Minute), manager.cleanupQueue[0].DeleteAt, time.Second)
212+
}
213+
214+
// TestCleanupExpiredRayJobMetrics tests the cleanup of expired metrics
215+
func TestCleanupExpiredRayJobMetrics(t *testing.T) {
216+
// Create a registry, fake client and metrics manager
217+
registry := prometheus.NewRegistry()
218+
k8sScheme := runtime.NewScheme()
219+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
220+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
221+
manager := NewRayJobMetricsManager(context.Background(), client)
222+
223+
// Register the manager with the registry
224+
registry.MustRegister(manager)
225+
226+
// Record a metric for a job
227+
manager.ObserveRayJobExecutionDuration("test-job", "default", rayv1.JobDeploymentStatusComplete, 0, 10.5)
228+
229+
// Verify the metric exists
230+
metrics, err := registry.Gather()
231+
require.NoError(t, err)
232+
assert.Len(t, metrics, 1)
233+
assert.Equal(t, "kuberay_job_execution_duration_seconds", metrics[0].GetName())
234+
235+
// Add the job to cleanup queue with a past delete time
236+
manager.queueMutex.Lock()
237+
manager.cleanupQueue = append(manager.cleanupQueue, RayJobMetricCleanupItem{
238+
Name: "test-job",
239+
Namespace: "default",
240+
DeleteAt: time.Now().Add(-1 * time.Minute), // Expired
241+
})
242+
manager.queueMutex.Unlock()
243+
244+
// Run cleanup
245+
manager.cleanupExpiredRayJobMetrics()
246+
247+
// Verify the metric was deleted
248+
metrics, err = registry.Gather()
249+
require.NoError(t, err)
250+
// The metric should still exist but have no samples
251+
assert.Len(t, metrics, 1)
252+
assert.Equal(t, 0, len(metrics[0].GetMetric()))
253+
}
254+
255+
// TestRayJobCleanupLoop tests the background cleanup loop
256+
func TestRayJobCleanupLoop(t *testing.T) {
257+
// Create a fake client and metrics manager
258+
k8sScheme := runtime.NewScheme()
259+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
260+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
261+
manager := NewRayJobMetricsManager(context.Background(), client)
262+
263+
// Start the cleanup loop
264+
ctx, cancel := context.WithCancel(context.Background())
265+
var wg sync.WaitGroup
266+
wg.Add(1)
267+
go func() {
268+
defer wg.Done()
269+
manager.startRayJobCleanupLoop(ctx)
270+
}()
271+
272+
// Schedule a cleanup for a job
273+
manager.ScheduleRayJobMetricForCleanup("test-job", "default")
274+
275+
// Wait for the TTL to expire and cleanup to run
276+
time.Sleep(2 * time.Second)
277+
278+
// Verify the cleanup queue is empty
279+
manager.queueMutex.Lock()
280+
defer manager.queueMutex.Unlock()
281+
assert.Len(t, manager.cleanupQueue, 0)
282+
283+
// Stop the cleanup loop
284+
cancel()
285+
wg.Wait()
286+
}
287+
288+
// TestRayJobConditionProvisioned tests metrics when job is provisioned and cleaned up
289+
func TestRayJobConditionProvisioned(t *testing.T) {
290+
// Create a registry, fake client and metrics manager
291+
registry := prometheus.NewRegistry()
292+
k8sScheme := runtime.NewScheme()
293+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
294+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
295+
manager := NewRayJobMetricsManager(context.Background(), client)
296+
registry.MustRegister(manager)
297+
298+
// Simulate a job becoming provisioned
299+
job := &rayv1.RayJob{
300+
ObjectMeta: metav1.ObjectMeta{
301+
Name: "test-job",
302+
Namespace: "default",
303+
},
304+
Status: rayv1.RayJobStatus{
305+
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
306+
JobStatus: rayv1.JobStatusRunning,
307+
StartTime: &metav1.Time{Time: time.Now().Add(-10 * time.Second)},
308+
},
309+
}
310+
311+
// Simulate job completion and emit metrics
312+
oldStatus := job.Status
313+
job.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusComplete
314+
job.Status.JobStatus = rayv1.JobStatusSucceeded
315+
316+
// Emit metrics and schedule cleanup
317+
emitRayJobExecutionDuration(manager, job.Name, job.Namespace, oldStatus, job.Status)
318+
319+
// Verify the metric was recorded
320+
metrics, err := registry.Gather()
321+
require.NoError(t, err)
322+
assert.Len(t, metrics, 1)
323+
assert.Equal(t, "kuberay_job_execution_duration_seconds", metrics[0].GetName())
324+
325+
// Fast-forward time and run cleanup
326+
manager.queueMutex.Lock()
327+
for i := range manager.cleanupQueue {
328+
manager.cleanupQueue[i].DeleteAt = time.Now().Add(-1 * time.Minute) // Force expire
329+
}
330+
manager.queueMutex.Unlock()
331+
332+
manager.cleanupExpiredRayJobMetrics()
333+
334+
// Verify the metric was cleaned up
335+
metrics, err = registry.Gather()
336+
require.NoError(t, err)
337+
assert.Len(t, metrics[0].GetMetric(), 0)
338+
}
339+
340+
// Helper function to match the one in controller
341+
func emitRayJobExecutionDuration(rayJobMetricsObserver RayJobMetricsObserver, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
342+
if rayJobStatus.StartTime == nil {
343+
// Set a default start time if not provided
344+
now := time.Now()
345+
rayJobStatus.StartTime = &metav1.Time{Time: now.Add(-10 * time.Second)}
346+
}
347+
if !rayv1.IsJobDeploymentTerminal(originalRayJobStatus.JobDeploymentStatus) && (rayv1.IsJobDeploymentTerminal(rayJobStatus.JobDeploymentStatus) ||
348+
rayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusRetrying) {
349+
350+
retryCount := 0
351+
if originalRayJobStatus.Failed != nil {
352+
retryCount += int(*originalRayJobStatus.Failed)
353+
}
354+
355+
rayJobMetricsObserver.ObserveRayJobExecutionDuration(
356+
rayJobName,
357+
rayJobNamespace,
358+
rayJobStatus.JobDeploymentStatus,
359+
retryCount,
360+
time.Since(rayJobStatus.StartTime.Time).Seconds(),
361+
)
362+
}
363+
}

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
465465
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
466466
}
467467
emitRayJobMetrics(r.options.RayJobMetricsManager, rayJobInstance.Name, rayJobInstance.Namespace, originalRayJobInstance.Status, rayJobInstance.Status)
468+
// Schedule metrics cleanup when job reaches terminal status
469+
if !rayv1.IsJobDeploymentTerminal(originalRayJobInstance.Status.JobDeploymentStatus) && rayv1.IsJobDeploymentTerminal(rayJobInstance.Status.JobDeploymentStatus) {
470+
r.options.RayJobMetricsManager.ScheduleRayJobMetricForCleanup(rayJobInstance.Name, rayJobInstance.Namespace)
471+
}
468472
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
469473
}
470474

0 commit comments

Comments
 (0)