Skip to content

Commit 8eb3237

Browse files
dc-yingphantom5125
authored andcommitted
[Feat] Add Raycluster metrics cleanup
1 parent 50d02d9 commit 8eb3237

File tree

3 files changed

+185
-3
lines changed

3 files changed

+185
-3
lines changed

ray-operator/controllers/ray/metrics/ray_cluster_metrics.go

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package metrics
22

33
import (
44
"context"
5+
"sync"
6+
"time"
57
"strconv"
68

79
"github.com/go-logr/logr"
@@ -19,16 +21,28 @@ type RayClusterMetricsObserver interface {
1921
ObserveRayClusterProvisionedDuration(name, namespace string, duration float64)
2022
}
2123

24+
// RayClusterMetricCleanupItem represents an item in the RayCluster metric cleanup queue
25+
type RayClusterMetricCleanupItem struct {
26+
Name string
27+
Namespace string
28+
DeleteAt time.Time
29+
}
30+
2231
// RayClusterMetricsManager implements the prometheus.Collector and RayClusterMetricsObserver interface to collect ray cluster metrics.
2332
type RayClusterMetricsManager struct {
2433
rayClusterProvisionedDurationSeconds *prometheus.GaugeVec
2534
rayClusterInfo *prometheus.Desc
2635
rayClusterConditionProvisioned *prometheus.Desc
2736
client client.Client
2837
log logr.Logger
38+
39+
// Cleanup queue and related fields specific to RayCluster metrics
40+
cleanupQueue []RayClusterMetricCleanupItem
41+
queueMutex sync.Mutex
42+
metricTTL time.Duration
2943
}
3044

31-
// NewRayClusterMetricsManager creates a new RayClusterManager instance.
45+
// NewRayClusterMetricsManager creates a new RayClusterMetricsManager instance.
3246
func NewRayClusterMetricsManager(ctx context.Context, client client.Client) *RayClusterMetricsManager {
3347
manager := &RayClusterMetricsManager{
3448
rayClusterProvisionedDurationSeconds: prometheus.NewGaugeVec(
@@ -56,9 +70,14 @@ func NewRayClusterMetricsManager(ctx context.Context, client client.Client) *Ray
5670
[]string{"name", "namespace", "condition"},
5771
nil,
5872
),
59-
client: client,
60-
log: ctrl.LoggerFrom(ctx),
73+
client: client,
74+
log: ctrl.LoggerFrom(ctx),
75+
cleanupQueue: make([]RayClusterMetricCleanupItem, 0),
76+
metricTTL: 5 * time.Minute, // Keep metrics for 5 minutes
6177
}
78+
79+
// Start the cleanup goroutine
80+
go manager.startRayClusterCleanupLoop(ctx)
6281
return manager
6382
}
6483

@@ -85,10 +104,65 @@ func (r *RayClusterMetricsManager) Collect(ch chan<- prometheus.Metric) {
85104
}
86105
}
87106

107+
// ObserveRayClusterProvisionedDuration records the provisioned duration of a RayCluster
88108
func (r *RayClusterMetricsManager) ObserveRayClusterProvisionedDuration(name, namespace string, duration float64) {
89109
r.rayClusterProvisionedDurationSeconds.WithLabelValues(name, namespace).Set(duration)
90110
}
91111

112+
// ScheduleRayClusterMetricForCleanup schedules a RayCluster metric for cleanup after the TTL
113+
func (r *RayClusterMetricsManager) ScheduleRayClusterMetricForCleanup(name, namespace string) {
114+
r.queueMutex.Lock()
115+
defer r.queueMutex.Unlock()
116+
117+
// Add to cleanup queue
118+
item := RayClusterMetricCleanupItem{
119+
Name: name,
120+
Namespace: namespace,
121+
DeleteAt: time.Now().Add(r.metricTTL),
122+
}
123+
r.cleanupQueue = append(r.cleanupQueue, item)
124+
r.log.Info("Scheduled RayCluster metric for cleanup", "name", name, "namespace", namespace, "deleteAt", item.DeleteAt)
125+
}
126+
127+
// startRayClusterCleanupLoop starts a loop to clean up expired RayCluster metrics
128+
func (r *RayClusterMetricsManager) startRayClusterCleanupLoop(ctx context.Context) {
129+
// Check for expired metrics every minute
130+
ticker := time.NewTicker(1 * time.Minute)
131+
defer ticker.Stop()
132+
133+
for {
134+
select {
135+
case <-ticker.C:
136+
r.cleanupExpiredRayClusterMetrics()
137+
case <-ctx.Done():
138+
return
139+
}
140+
}
141+
}
142+
143+
// cleanupExpiredRayClusterMetrics removes RayCluster metrics that have expired
144+
func (r *RayClusterMetricsManager) cleanupExpiredRayClusterMetrics() {
145+
r.queueMutex.Lock()
146+
defer r.queueMutex.Unlock()
147+
148+
now := time.Now()
149+
remainingItems := make([]RayClusterMetricCleanupItem, 0)
150+
151+
for _, item := range r.cleanupQueue {
152+
if now.After(item.DeleteAt) {
153+
// Remove expired metric
154+
r.rayClusterProvisionedDurationSeconds.DeleteLabelValues(item.Name, item.Namespace)
155+
r.log.Info("Cleaned up expired RayCluster metric", "name", item.Name, "namespace", item.Namespace)
156+
} else {
157+
// Keep non-expired items
158+
remainingItems = append(remainingItems, item)
159+
}
160+
}
161+
162+
// Update queue
163+
r.cleanupQueue = remainingItems
164+
}
165+
92166
func (r *RayClusterMetricsManager) collectRayClusterInfo(cluster *rayv1.RayCluster, ch chan<- prometheus.Metric) {
93167
ownerKind := "None"
94168
if v, ok := cluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok {

ray-operator/controllers/ray/metrics/ray_cluster_metrics_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net/http"
66
"net/http/httptest"
77
"testing"
8+
"time"
89

910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -96,6 +97,106 @@ func TestRayClusterInfo(t *testing.T) {
9697
}
9798
}
9899

100+
// TestScheduleRayClusterMetricForCleanup tests scheduling a metric for cleanup
101+
func TestScheduleRayClusterMetricForCleanup(t *testing.T) {
102+
ctx := context.Background()
103+
k8sScheme := runtime.NewScheme()
104+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
105+
106+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
107+
manager := NewRayClusterMetricsManager(ctx, client)
108+
109+
// Schedule a metric for cleanup
110+
manager.ScheduleRayClusterMetricForCleanup("test-cluster", "test-namespace")
111+
112+
// Verify the cleanup queue has one item
113+
assert.Len(t, manager.cleanupQueue, 1)
114+
assert.Equal(t, "test-cluster", manager.cleanupQueue[0].Name)
115+
assert.Equal(t, "test-namespace", manager.cleanupQueue[0].Namespace)
116+
assert.WithinDuration(t, time.Now().Add(5*time.Minute), manager.cleanupQueue[0].DeleteAt, 1*time.Second)
117+
}
118+
119+
// TestCleanupExpiredRayClusterMetrics tests cleaning up expired metrics
120+
func TestCleanupExpiredRayClusterMetrics(t *testing.T) {
121+
ctx := context.Background()
122+
k8sScheme := runtime.NewScheme()
123+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
124+
125+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
126+
manager := NewRayClusterMetricsManager(ctx, client)
127+
128+
// Set up a metric
129+
manager.ObserveRayClusterProvisionedDuration("expired-cluster", "test-namespace", 123.45)
130+
131+
// Add an expired item to the cleanup queue
132+
manager.queueMutex.Lock()
133+
manager.cleanupQueue = append(manager.cleanupQueue, RayClusterMetricCleanupItem{
134+
Name: "expired-cluster",
135+
Namespace: "test-namespace",
136+
DeleteAt: time.Now().Add(-1 * time.Minute), // Expired 1 minute ago
137+
})
138+
manager.queueMutex.Unlock()
139+
140+
// Clean up expired metrics
141+
manager.cleanupExpiredRayClusterMetrics()
142+
143+
// Verify the metric was deleted by checking the registry
144+
reg := prometheus.NewRegistry()
145+
reg.MustRegister(manager)
146+
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "/metrics", nil)
147+
require.NoError(t, err)
148+
recorder := httptest.NewRecorder()
149+
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
150+
handler.ServeHTTP(recorder, req)
151+
assert.Equal(t, http.StatusOK, recorder.Code)
152+
body := recorder.Body.String()
153+
assert.NotContains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="expired-cluster",namespace="test-namespace"}`)
154+
155+
// Verify the cleanup queue is empty
156+
assert.Empty(t, manager.cleanupQueue)
157+
}
158+
159+
// TestCleanupLoop tests the background cleanup loop
160+
func TestCleanupLoop(t *testing.T) {
161+
// Create a context that we can cancel
162+
ctx, cancel := context.WithCancel(context.Background())
163+
defer cancel()
164+
165+
k8sScheme := runtime.NewScheme()
166+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
167+
168+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
169+
manager := NewRayClusterMetricsManager(ctx, client)
170+
171+
// Set up a metric
172+
manager.ObserveRayClusterProvisionedDuration("test-cluster", "test-namespace", 123.45)
173+
174+
// Add an item to the cleanup queue with a very short TTL
175+
manager.queueMutex.Lock()
176+
manager.metricTTL = 1 * time.Second // Set TTL to 1 second for testing
177+
manager.cleanupQueue = append(manager.cleanupQueue, RayClusterMetricCleanupItem{
178+
Name: "test-cluster",
179+
Namespace: "test-namespace",
180+
DeleteAt: time.Now().Add(manager.metricTTL),
181+
})
182+
manager.queueMutex.Unlock()
183+
184+
// Wait for the cleanup loop to run and process the item
185+
time.Sleep(2 * time.Second)
186+
187+
// Verify the metric was deleted by checking the registry
188+
reg := prometheus.NewRegistry()
189+
reg.MustRegister(manager)
190+
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "/metrics", nil)
191+
require.NoError(t, err)
192+
recorder := httptest.NewRecorder()
193+
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
194+
handler.ServeHTTP(recorder, req)
195+
assert.Equal(t, http.StatusOK, recorder.Code)
196+
body := recorder.Body.String()
197+
assert.NotContains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="test-cluster",namespace="test-namespace"}`)
198+
}
199+
99200
func TestRayClusterConditionProvisioned(t *testing.T) {
100201
tests := []struct {
101202
name string

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,13 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance
195195
// Please do NOT modify `originalRayClusterInstance` in the following code.
196196
originalRayClusterInstance := instance.DeepCopy()
197197

198+
if instance.DeletionTimestamp != nil && !instance.DeletionTimestamp.IsZero() {
199+
logger.Info("RayCluster is being deleted, scheduling metric for cleanup")
200+
if r.options.RayClusterMetricsManager != nil {
201+
r.options.RayClusterMetricsManager.ScheduleRayClusterMetricForCleanup(instance.Name, instance.Namespace)
202+
}
203+
}
204+
198205
// The `enableGCSFTRedisCleanup` is a feature flag introduced in KubeRay v1.0.0. It determines whether
199206
// the Redis cleanup job should be activated. Users can disable the feature by setting the environment
200207
// variable `ENABLE_GCS_FT_REDIS_CLEANUP` to `false`, and undertake the Redis storage namespace cleanup

0 commit comments

Comments
 (0)