Skip to content

Commit d56356b

Browse files
authored
[Feature] Add cleanup for terminated RayJob/RayCluster metrics (#3923)
* [Feature] Add cleanup for terminated RayJob/RayCluster metrics * [Tests] Add test code * Use Delete API for rayClusterProvisionedDurationSeconds * Add helper function to avoid nil MetricsManager * Fix wrong annotation * test: add helper function for executing metrics requests * Add comment for Delete API * refactor the helper * fix linting issue * revert Delete API
1 parent 0b0573b commit d56356b

File tree

8 files changed

+204
-78
lines changed

8 files changed

+204
-78
lines changed

ray-operator/controllers/ray/metrics/ray_cluster_metrics.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ func (r *RayClusterMetricsManager) ObserveRayClusterProvisionedDuration(name, na
8989
r.rayClusterProvisionedDurationSeconds.WithLabelValues(name, namespace).Set(duration)
9090
}
9191

92+
// DeleteRayClusterMetrics removes metrics that belongs to the specified RayCluster.
93+
func (r *RayClusterMetricsManager) DeleteRayClusterMetrics(name, namespace string) {
94+
numCleanedUpMetrics := r.rayClusterProvisionedDurationSeconds.DeletePartialMatch(prometheus.Labels{"name": name, "namespace": namespace})
95+
r.log.Info("Cleaned up expired RayCluster metric", "name", name, "namespace", namespace, "numCleanedUpMetrics", numCleanedUpMetrics)
96+
}
97+
9298
func (r *RayClusterMetricsManager) collectRayClusterInfo(cluster *rayv1.RayCluster, ch chan<- prometheus.Metric) {
9399
ownerKind := "None"
94100
if v, ok := cluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok {

ray-operator/controllers/ray/metrics/ray_cluster_metrics_test.go

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@ package metrics
33
import (
44
"context"
55
"net/http"
6-
"net/http/httptest"
76
"testing"
87

98
"github.com/prometheus/client_golang/prometheus"
10-
"github.com/prometheus/client_golang/prometheus/promhttp"
119
"github.com/stretchr/testify/assert"
1210
"github.com/stretchr/testify/require"
1311
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -16,6 +14,7 @@ import (
1614
"sigs.k8s.io/controller-runtime/pkg/client/fake"
1715

1816
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
17+
"github.com/ray-project/kuberay/ray-operator/test/support"
1918
)
2019

2120
func TestRayClusterInfo(t *testing.T) {
@@ -65,28 +64,21 @@ func TestRayClusterInfo(t *testing.T) {
6564
reg := prometheus.NewRegistry()
6665
reg.MustRegister(manager)
6766

68-
req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
69-
require.NoError(t, err)
70-
rr := httptest.NewRecorder()
71-
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
72-
handler.ServeHTTP(rr, req)
67+
body, statusCode := support.GetMetricsResponseAndCode(t, reg)
7368

74-
assert.Equal(t, http.StatusOK, rr.Code)
75-
body := rr.Body.String()
69+
assert.Equal(t, http.StatusOK, statusCode)
7670
for _, label := range tc.expectedMetrics {
7771
assert.Contains(t, body, label)
7872
}
7973

8074
if len(tc.clusters) > 0 {
81-
err = client.Delete(t.Context(), &tc.clusters[0])
75+
err := client.Delete(t.Context(), &tc.clusters[0])
8276
require.NoError(t, err)
8377
}
8478

85-
rr2 := httptest.NewRecorder()
86-
handler.ServeHTTP(rr2, req)
79+
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)
8780

88-
assert.Equal(t, http.StatusOK, rr2.Code)
89-
body2 := rr2.Body.String()
81+
assert.Equal(t, http.StatusOK, statusCode)
9082

9183
assert.NotContains(t, body2, tc.expectedMetrics[0])
9284
for _, label := range tc.expectedMetrics[1:] {
@@ -96,6 +88,65 @@ func TestRayClusterInfo(t *testing.T) {
9688
}
9789
}
9890

91+
func TestDeleteRayClusterMetrics(t *testing.T) {
92+
k8sScheme := runtime.NewScheme()
93+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
94+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
95+
manager := NewRayClusterMetricsManager(context.Background(), client)
96+
reg := prometheus.NewRegistry()
97+
reg.MustRegister(manager)
98+
99+
// Test case 1: Delete specific cluster metrics
100+
// Manually add some metrics
101+
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster1", "namespace": "ns1"}).Set(10.5)
102+
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster2", "namespace": "ns2"}).Set(20.3)
103+
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster3", "namespace": "ns1"}).Set(5.7)
104+
105+
// Test deleting metrics for cluster1 in ns1
106+
manager.DeleteRayClusterMetrics("cluster1", "ns1")
107+
108+
// Verify metrics
109+
body, statusCode := support.GetMetricsResponseAndCode(t, reg)
110+
111+
assert.Equal(t, http.StatusOK, statusCode)
112+
assert.NotContains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
113+
assert.Contains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
114+
assert.Contains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
115+
116+
// Test case 2: Delete with empty name
117+
manager.DeleteRayClusterMetrics("", "ns1")
118+
119+
// Verify metrics again
120+
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)
121+
122+
assert.Equal(t, http.StatusOK, statusCode)
123+
assert.NotContains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
124+
assert.Contains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
125+
assert.Contains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
126+
127+
// Test case 3: Delete with empty name and namespace
128+
manager.DeleteRayClusterMetrics("", "")
129+
130+
// Verify no metrics were deleted
131+
body3, statusCode := support.GetMetricsResponseAndCode(t, reg)
132+
133+
assert.Equal(t, http.StatusOK, statusCode)
134+
assert.NotContains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
135+
assert.Contains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
136+
assert.Contains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
137+
138+
// Test case 4: Delete with false name and namespace
139+
manager.DeleteRayClusterMetrics("ns2", "cluster2")
140+
141+
// Verify no metrics were deleted
142+
body4, statusCode := support.GetMetricsResponseAndCode(t, reg)
143+
144+
assert.Equal(t, http.StatusOK, statusCode)
145+
assert.NotContains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
146+
assert.Contains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
147+
assert.Contains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
148+
}
149+
99150
func TestRayClusterConditionProvisioned(t *testing.T) {
100151
tests := []struct {
101152
name string
@@ -155,28 +206,21 @@ func TestRayClusterConditionProvisioned(t *testing.T) {
155206
reg := prometheus.NewRegistry()
156207
reg.MustRegister(manager)
157208

158-
req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
159-
require.NoError(t, err)
160-
rr := httptest.NewRecorder()
161-
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
162-
handler.ServeHTTP(rr, req)
209+
body, statusCode := support.GetMetricsResponseAndCode(t, reg)
163210

164-
assert.Equal(t, http.StatusOK, rr.Code)
165-
body := rr.Body.String()
211+
assert.Equal(t, http.StatusOK, statusCode)
166212
for _, metric := range tc.expectedMetrics {
167213
assert.Contains(t, body, metric)
168214
}
169215

170216
if len(tc.clusters) > 0 {
171-
err = client.Delete(t.Context(), &tc.clusters[0])
217+
err := client.Delete(context.Background(), &tc.clusters[0])
172218
require.NoError(t, err)
173219
}
174220

175-
rr2 := httptest.NewRecorder()
176-
handler.ServeHTTP(rr2, req)
221+
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)
177222

178-
assert.Equal(t, http.StatusOK, rr2.Code)
179-
body2 := rr2.Body.String()
223+
assert.Equal(t, http.StatusOK, statusCode)
180224

181225
assert.NotContains(t, body2, tc.expectedMetrics[0])
182226
for _, metric := range tc.expectedMetrics[1:] {

ray-operator/controllers/ray/metrics/ray_job_metrics.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ func (r *RayJobMetricsManager) ObserveRayJobExecutionDuration(name, namespace st
8484
r.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, string(jobDeploymentStatus), strconv.Itoa(retryCount)).Set(duration)
8585
}
8686

87+
// DeleteRayJobMetrics removes metrics that belongs to the specified RayJob.
88+
func (r *RayJobMetricsManager) DeleteRayJobMetrics(name, namespace string) {
89+
numCleanedUpMetrics := r.rayJobExecutionDurationSeconds.DeletePartialMatch(prometheus.Labels{"name": name, "namespace": namespace})
90+
r.log.Info("Cleaned up expired rayJob metric", "name", name, "namespace", namespace, "numCleanedUpMetrics", numCleanedUpMetrics)
91+
}
92+
8793
func (r *RayJobMetricsManager) collectRayJobInfo(rayJob *rayv1.RayJob, ch chan<- prometheus.Metric) {
8894
ch <- prometheus.MustNewConstMetric(
8995
r.rayJobInfo,

ray-operator/controllers/ray/metrics/ray_job_metrics_test.go

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@ package metrics
33
import (
44
"context"
55
"net/http"
6-
"net/http/httptest"
76
"testing"
87

98
"github.com/prometheus/client_golang/prometheus"
10-
"github.com/prometheus/client_golang/prometheus/promhttp"
119
"github.com/stretchr/testify/assert"
1210
"github.com/stretchr/testify/require"
1311
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -16,6 +14,7 @@ import (
1614
"sigs.k8s.io/controller-runtime/pkg/client/fake"
1715

1816
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
17+
"github.com/ray-project/kuberay/ray-operator/test/support"
1918
)
2019

2120
func TestMetricRayJobInfo(t *testing.T) {
@@ -61,29 +60,21 @@ func TestMetricRayJobInfo(t *testing.T) {
6160
reg := prometheus.NewRegistry()
6261
reg.MustRegister(manager)
6362

64-
req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
65-
require.NoError(t, err)
66-
rr := httptest.NewRecorder()
67-
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
68-
handler.ServeHTTP(rr, req)
63+
body, statusCode := support.GetMetricsResponseAndCode(t, reg)
6964

70-
assert.Equal(t, http.StatusOK, rr.Code)
71-
body := rr.Body.String()
65+
assert.Equal(t, http.StatusOK, statusCode)
7266
for _, label := range tc.expectedMetrics {
7367
assert.Contains(t, body, label)
7468
}
7569

7670
if len(tc.rayJobs) > 0 {
77-
err = client.Delete(t.Context(), &tc.rayJobs[0])
71+
err := client.Delete(t.Context(), &tc.rayJobs[0])
7872
require.NoError(t, err)
7973
}
8074

81-
rr2 := httptest.NewRecorder()
82-
handler.ServeHTTP(rr2, req)
83-
84-
assert.Equal(t, http.StatusOK, rr2.Code)
85-
body2 := rr2.Body.String()
75+
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)
8676

77+
assert.Equal(t, http.StatusOK, statusCode)
8778
assert.NotContains(t, body2, tc.expectedMetrics[0])
8879
for _, label := range tc.expectedMetrics[1:] {
8980
assert.Contains(t, body2, label)
@@ -92,6 +83,65 @@ func TestMetricRayJobInfo(t *testing.T) {
9283
}
9384
}
9485

86+
func TestDeleteRayJobMetrics(t *testing.T) {
87+
k8sScheme := runtime.NewScheme()
88+
require.NoError(t, rayv1.AddToScheme(k8sScheme))
89+
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
90+
manager := NewRayJobMetricsManager(context.Background(), client)
91+
reg := prometheus.NewRegistry()
92+
reg.MustRegister(manager)
93+
94+
// Test case 1: Delete specific job metrics
95+
// Manually add some metrics
96+
manager.ObserveRayJobExecutionDuration("job1", "ns1", rayv1.JobDeploymentStatusComplete, 0, 10.5)
97+
manager.ObserveRayJobExecutionDuration("job2", "ns2", rayv1.JobDeploymentStatusFailed, 1, 20.3)
98+
manager.ObserveRayJobExecutionDuration("job3", "ns1", rayv1.JobDeploymentStatusRunning, 0, 5.7)
99+
100+
// Test deleting metrics for job1 in ns1
101+
manager.DeleteRayJobMetrics("job1", "ns1")
102+
103+
// Verify metrics
104+
body, statusCode := support.GetMetricsResponseAndCode(t, reg)
105+
106+
assert.Equal(t, http.StatusOK, statusCode)
107+
assert.NotContains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
108+
assert.Contains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
109+
assert.Contains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)
110+
111+
// Test case 2: Delete with empty name
112+
manager.DeleteRayJobMetrics("", "ns1")
113+
114+
// Verify metrics again
115+
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)
116+
117+
assert.Equal(t, http.StatusOK, statusCode)
118+
assert.NotContains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
119+
assert.Contains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
120+
assert.Contains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)
121+
122+
// Test case 3: Delete with empty name and namespace
123+
manager.DeleteRayJobMetrics("", "")
124+
125+
// Verify no metrics were deleted
126+
body3, statusCode := support.GetMetricsResponseAndCode(t, reg)
127+
128+
assert.Equal(t, http.StatusOK, statusCode)
129+
assert.NotContains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
130+
assert.Contains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
131+
assert.Contains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)
132+
133+
// Test case 4: Delete with false name and namespace
134+
manager.DeleteRayJobMetrics("ns2", "job2")
135+
136+
// Verify no metrics were deleted
137+
body4, statusCode := support.GetMetricsResponseAndCode(t, reg)
138+
139+
assert.Equal(t, http.StatusOK, statusCode)
140+
assert.NotContains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
141+
assert.Contains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
142+
assert.Contains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)
143+
}
144+
95145
func TestMetricRayJobDeploymentStatus(t *testing.T) {
96146
tests := []struct {
97147
name string
@@ -141,28 +191,21 @@ func TestMetricRayJobDeploymentStatus(t *testing.T) {
141191
reg := prometheus.NewRegistry()
142192
reg.MustRegister(manager)
143193

144-
req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
145-
require.NoError(t, err)
146-
rr := httptest.NewRecorder()
147-
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
148-
handler.ServeHTTP(rr, req)
194+
body, statusCode := support.GetMetricsResponseAndCode(t, reg)
149195

150-
assert.Equal(t, http.StatusOK, rr.Code)
151-
body := rr.Body.String()
196+
assert.Equal(t, http.StatusOK, statusCode)
152197
for _, label := range tc.expectedMetrics {
153198
assert.Contains(t, body, label)
154199
}
155200

156201
if len(tc.rayJobs) > 0 {
157-
err = client.Delete(t.Context(), &tc.rayJobs[0])
202+
err := client.Delete(context.Background(), &tc.rayJobs[0])
158203
require.NoError(t, err)
159204
}
160205

161-
rr2 := httptest.NewRecorder()
162-
handler.ServeHTTP(rr2, req)
206+
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)
163207

164-
assert.Equal(t, http.StatusOK, rr2.Code)
165-
body2 := rr2.Body.String()
208+
assert.Equal(t, http.StatusOK, statusCode)
166209

167210
assert.NotContains(t, body2, tc.expectedMetrics[0])
168211
for _, label := range tc.expectedMetrics[1:] {

0 commit comments

Comments
 (0)