Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ray-operator/controllers/ray/metrics/ray_cluster_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ func (r *RayClusterMetricsManager) ObserveRayClusterProvisionedDuration(name, na
r.rayClusterProvisionedDurationSeconds.WithLabelValues(name, namespace).Set(duration)
}

// DeleteRayClusterMetrics removes metrics that belongs to the specified RayCluster.
// NOTE: Uses Delete() as metric has only "name" and "namespace" labels.
// If more labels are added, switch to DeletePartialMatch(), otherwise it may not clean up all metrics correctly.
func (r *RayClusterMetricsManager) DeleteRayClusterMetrics(name, namespace string) {
numCleanedUpMetrics := r.rayClusterProvisionedDurationSeconds.Delete(prometheus.Labels{"name": name, "namespace": namespace})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @phantom5125, I think we should use your original proposal, DeletePartialMatch, now. Having a comment here doesn't help us switch to DeletePartialMatch in the future, I believe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rueian Thank you for the review! Fixed in 40e1831

r.log.Info("Cleaned up expired RayCluster metric", "name", name, "namespace", namespace, "numCleanedUpMetrics", numCleanedUpMetrics)
}

func (r *RayClusterMetricsManager) collectRayClusterInfo(cluster *rayv1.RayCluster, ch chan<- prometheus.Metric) {
ownerKind := "None"
if v, ok := cluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok {
Expand Down
96 changes: 70 additions & 26 deletions ray-operator/controllers/ray/metrics/ray_cluster_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package metrics
import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestRayClusterInfo(t *testing.T) {
Expand Down Expand Up @@ -65,28 +64,21 @@ func TestRayClusterInfo(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
body, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
assert.Equal(t, http.StatusOK, statusCode)
for _, label := range tc.expectedMetrics {
assert.Contains(t, body, label)
}

if len(tc.clusters) > 0 {
err = client.Delete(t.Context(), &tc.clusters[0])
err := client.Delete(t.Context(), &tc.clusters[0])
require.NoError(t, err)
}

rr2 := httptest.NewRecorder()
handler.ServeHTTP(rr2, req)
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, rr2.Code)
body2 := rr2.Body.String()
assert.Equal(t, http.StatusOK, statusCode)

assert.NotContains(t, body2, tc.expectedMetrics[0])
for _, label := range tc.expectedMetrics[1:] {
Expand All @@ -96,6 +88,65 @@ func TestRayClusterInfo(t *testing.T) {
}
}

func TestDeleteRayClusterMetrics(t *testing.T) {
k8sScheme := runtime.NewScheme()
require.NoError(t, rayv1.AddToScheme(k8sScheme))
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
manager := NewRayClusterMetricsManager(context.Background(), client)
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

// Test case 1: Delete specific cluster metrics
// Manually add some metrics
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster1", "namespace": "ns1"}).Set(10.5)
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster2", "namespace": "ns2"}).Set(20.3)
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster3", "namespace": "ns1"}).Set(5.7)

// Test deleting metrics for cluster1 in ns1
manager.DeleteRayClusterMetrics("cluster1", "ns1")

// Verify metrics
body, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
assert.Contains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)

// Test case 2: Delete with empty name
manager.DeleteRayClusterMetrics("", "ns1")

// Verify metrics again
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)

// Test case 3: Delete with empty name and namespace
manager.DeleteRayClusterMetrics("", "")

// Verify no metrics were deleted
body3, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)

// Test case 4: Delete with false name and namespace
manager.DeleteRayClusterMetrics("ns2", "cluster2")

// Verify no metrics were deleted
body4, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
}

func TestRayClusterConditionProvisioned(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -155,28 +206,21 @@ func TestRayClusterConditionProvisioned(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
body, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
assert.Equal(t, http.StatusOK, statusCode)
for _, metric := range tc.expectedMetrics {
assert.Contains(t, body, metric)
}

if len(tc.clusters) > 0 {
err = client.Delete(t.Context(), &tc.clusters[0])
err := client.Delete(context.Background(), &tc.clusters[0])
require.NoError(t, err)
}

rr2 := httptest.NewRecorder()
handler.ServeHTTP(rr2, req)
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, rr2.Code)
body2 := rr2.Body.String()
assert.Equal(t, http.StatusOK, statusCode)

assert.NotContains(t, body2, tc.expectedMetrics[0])
for _, metric := range tc.expectedMetrics[1:] {
Expand Down
6 changes: 6 additions & 0 deletions ray-operator/controllers/ray/metrics/ray_job_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (r *RayJobMetricsManager) ObserveRayJobExecutionDuration(name, namespace st
r.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, string(jobDeploymentStatus), strconv.Itoa(retryCount)).Set(duration)
}

// DeleteRayJobMetrics removes metrics that belongs to the specified RayJob.
func (r *RayJobMetricsManager) DeleteRayJobMetrics(name, namespace string) {
numCleanedUpMetrics := r.rayJobExecutionDurationSeconds.DeletePartialMatch(prometheus.Labels{"name": name, "namespace": namespace})
r.log.Info("Cleaned up expired rayJob metric", "name", name, "namespace", namespace, "numCleanedUpMetrics", numCleanedUpMetrics)
}

func (r *RayJobMetricsManager) collectRayJobInfo(rayJob *rayv1.RayJob, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
r.rayJobInfo,
Expand Down
98 changes: 71 additions & 27 deletions ray-operator/controllers/ray/metrics/ray_job_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package metrics
import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestMetricRayJobInfo(t *testing.T) {
Expand Down Expand Up @@ -61,29 +60,21 @@ func TestMetricRayJobInfo(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
body, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
assert.Equal(t, http.StatusOK, statusCode)
for _, label := range tc.expectedMetrics {
assert.Contains(t, body, label)
}

if len(tc.rayJobs) > 0 {
err = client.Delete(t.Context(), &tc.rayJobs[0])
err := client.Delete(t.Context(), &tc.rayJobs[0])
require.NoError(t, err)
}

rr2 := httptest.NewRecorder()
handler.ServeHTTP(rr2, req)

assert.Equal(t, http.StatusOK, rr2.Code)
body2 := rr2.Body.String()
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body2, tc.expectedMetrics[0])
for _, label := range tc.expectedMetrics[1:] {
assert.Contains(t, body2, label)
Expand All @@ -92,6 +83,66 @@ func TestMetricRayJobInfo(t *testing.T) {
}
}

func TestDeleteRayJobMetrics(t *testing.T) {
k8sScheme := runtime.NewScheme()
require.NoError(t, rayv1.AddToScheme(k8sScheme))
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
manager := NewRayJobMetricsManager(context.Background(), client)
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

// Test case 1: Delete specific job metrics
// Manually add some metrics
manager.ObserveRayJobExecutionDuration("job1", "ns1", rayv1.JobDeploymentStatusComplete, 0, 10.5)
manager.ObserveRayJobExecutionDuration("job2", "ns2", rayv1.JobDeploymentStatusFailed, 1, 20.3)
manager.ObserveRayJobExecutionDuration("job3", "ns1", rayv1.JobDeploymentStatusRunning, 0, 5.7)

// Test deleting metrics for job1 in ns1
manager.DeleteRayJobMetrics("job1", "ns1")

// Verify metrics
body, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 2: Delete with empty name
manager.DeleteRayJobMetrics("", "ns1")

// Verify metrics again
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 3: Delete with empty name and namespace
manager.DeleteRayJobMetrics("", "")

// Verify no metrics were deleted
body3, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 4: Delete with false name and namespace
manager.DeleteRayJobMetrics("ns2", "job2")

// Verify no metrics were deleted
body4, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, statusCode)
assert.NotContains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

}

func TestMetricRayJobDeploymentStatus(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -141,28 +192,21 @@ func TestMetricRayJobDeploymentStatus(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
body, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
assert.Equal(t, http.StatusOK, statusCode)
for _, label := range tc.expectedMetrics {
assert.Contains(t, body, label)
}

if len(tc.rayJobs) > 0 {
err = client.Delete(t.Context(), &tc.rayJobs[0])
err := client.Delete(context.Background(), &tc.rayJobs[0])
require.NoError(t, err)
}

rr2 := httptest.NewRecorder()
handler.ServeHTTP(rr2, req)
body2, statusCode := support.GetMetricsResponseAndCode(t, reg)

assert.Equal(t, http.StatusOK, rr2.Code)
body2 := rr2.Body.String()
assert.Equal(t, http.StatusOK, statusCode)

assert.NotContains(t, body2, tc.expectedMetrics[0])
for _, label := range tc.expectedMetrics[1:] {
Expand Down
Loading
Loading