Skip to content

[Feature] Add cleanup for terminated RayJob/RayCluster metrics #3923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ray-operator/controllers/ray/metrics/ray_cluster_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func (r *RayClusterMetricsManager) ObserveRayClusterProvisionedDuration(name, na
r.rayClusterProvisionedDurationSeconds.WithLabelValues(name, namespace).Set(duration)
}

// DeleteRayClusterMetrics removes metrics that belongs to the specified RayCluster.
func (r *RayClusterMetricsManager) DeleteRayClusterMetrics(name, namespace string) {
numCleanedUpMetrics := r.rayClusterProvisionedDurationSeconds.Delete(prometheus.Labels{"name": name, "namespace": namespace})
r.log.Info("Cleaned up expired RayCluster metric", "name", name, "namespace", namespace, "numCleanedUpMetrics", numCleanedUpMetrics)
}

func (r *RayClusterMetricsManager) collectRayClusterInfo(cluster *rayv1.RayCluster, ch chan<- prometheus.Metric) {
ownerKind := "None"
if v, ok := cluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok {
Expand Down
84 changes: 71 additions & 13 deletions ray-operator/controllers/ray/metrics/ray_cluster_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestRayClusterInfo(t *testing.T) {
Expand Down Expand Up @@ -65,11 +65,7 @@ func TestRayClusterInfo(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand All @@ -78,7 +74,7 @@ func TestRayClusterInfo(t *testing.T) {
}

if len(tc.clusters) > 0 {
err = client.Delete(t.Context(), &tc.clusters[0])
err := client.Delete(t.Context(), &tc.clusters[0])
require.NoError(t, err)
}

Expand All @@ -96,6 +92,72 @@ func TestRayClusterInfo(t *testing.T) {
}
}

func TestDeleteRayClusterMetrics(t *testing.T) {
k8sScheme := runtime.NewScheme()
require.NoError(t, rayv1.AddToScheme(k8sScheme))
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
manager := NewRayClusterMetricsManager(context.Background(), client)
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

// Test case 1: Delete specific cluster metrics
// Manually add some metrics
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster1", "namespace": "ns1"}).Set(10.5)
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster2", "namespace": "ns2"}).Set(20.3)
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster3", "namespace": "ns1"}).Set(5.7)

// Test deleting metrics for cluster1 in ns1
manager.DeleteRayClusterMetrics("cluster1", "ns1")

// Verify metrics
req, recorder, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, recorder.Code)
body := recorder.Body.String()
assert.NotContains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
assert.Contains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)

// Test case 2: Delete with empty name
manager.DeleteRayClusterMetrics("", "ns1")

// Verify metrics again
recorder2 := httptest.NewRecorder()
handler.ServeHTTP(recorder2, req)

assert.Equal(t, http.StatusOK, recorder2.Code)
body2 := recorder2.Body.String()
assert.NotContains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)

// Test case 3: Delete with empty name and namespace
manager.DeleteRayClusterMetrics("", "")

// Verify no metrics were deleted
recorder3 := httptest.NewRecorder()
handler.ServeHTTP(recorder3, req)

assert.Equal(t, http.StatusOK, recorder3.Code)
body3 := recorder3.Body.String()
assert.NotContains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)

// Test case 4: Delete with false name and namespace
manager.DeleteRayClusterMetrics("ns2", "cluster2")

// Verify no metrics were deleted
recorder4 := httptest.NewRecorder()
handler.ServeHTTP(recorder4, req)

assert.Equal(t, http.StatusOK, recorder4.Code)
body4 := recorder4.Body.String()
assert.NotContains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
}

func TestRayClusterConditionProvisioned(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -155,11 +217,7 @@ func TestRayClusterConditionProvisioned(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand All @@ -168,7 +226,7 @@ func TestRayClusterConditionProvisioned(t *testing.T) {
}

if len(tc.clusters) > 0 {
err = client.Delete(t.Context(), &tc.clusters[0])
err := client.Delete(context.Background(), &tc.clusters[0])
require.NoError(t, err)
}

Expand Down
6 changes: 6 additions & 0 deletions ray-operator/controllers/ray/metrics/ray_job_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (r *RayJobMetricsManager) ObserveRayJobExecutionDuration(name, namespace st
r.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, string(jobDeploymentStatus), strconv.Itoa(retryCount)).Set(duration)
}

// DeleteRayJobMetrics removes metrics that belongs to the specified RayJob.
func (r *RayJobMetricsManager) DeleteRayJobMetrics(name, namespace string) {
numCleanedUpMetrics := r.rayJobExecutionDurationSeconds.DeletePartialMatch(prometheus.Labels{"name": name, "namespace": namespace})
r.log.Info("Cleaned up expired rayJob metric", "name", name, "namespace", namespace, "numCleanedUpMetrics", numCleanedUpMetrics)
}

func (r *RayJobMetricsManager) collectRayJobInfo(rayJob *rayv1.RayJob, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
r.rayJobInfo,
Expand Down
85 changes: 72 additions & 13 deletions ray-operator/controllers/ray/metrics/ray_job_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestMetricRayJobInfo(t *testing.T) {
Expand Down Expand Up @@ -61,11 +61,7 @@ func TestMetricRayJobInfo(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand All @@ -74,7 +70,7 @@ func TestMetricRayJobInfo(t *testing.T) {
}

if len(tc.rayJobs) > 0 {
err = client.Delete(t.Context(), &tc.rayJobs[0])
err := client.Delete(t.Context(), &tc.rayJobs[0])
require.NoError(t, err)
}

Expand All @@ -92,6 +88,73 @@ func TestMetricRayJobInfo(t *testing.T) {
}
}

func TestDeleteRayJobMetrics(t *testing.T) {
k8sScheme := runtime.NewScheme()
require.NoError(t, rayv1.AddToScheme(k8sScheme))
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
manager := NewRayJobMetricsManager(context.Background(), client)
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

// Test case 1: Delete specific job metrics
// Manually add some metrics
manager.ObserveRayJobExecutionDuration("job1", "ns1", rayv1.JobDeploymentStatusComplete, 0, 10.5)
manager.ObserveRayJobExecutionDuration("job2", "ns2", rayv1.JobDeploymentStatusFailed, 1, 20.3)
manager.ObserveRayJobExecutionDuration("job3", "ns1", rayv1.JobDeploymentStatusRunning, 0, 5.7)

// Test deleting metrics for job1 in ns1
manager.DeleteRayJobMetrics("job1", "ns1")

// Verify metrics
req, recorder, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, recorder.Code)
body := recorder.Body.String()
assert.NotContains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 2: Delete with empty name
manager.DeleteRayJobMetrics("", "ns1")

// Verify metrics again
recorder2 := httptest.NewRecorder()
handler.ServeHTTP(recorder2, req)

assert.Equal(t, http.StatusOK, recorder2.Code)
body2 := recorder2.Body.String()
assert.NotContains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 3: Delete with empty name and namespace
manager.DeleteRayJobMetrics("", "")

// Verify no metrics were deleted
recorder3 := httptest.NewRecorder()
handler.ServeHTTP(recorder3, req)

assert.Equal(t, http.StatusOK, recorder3.Code)
body3 := recorder3.Body.String()
assert.NotContains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 4: Delete with false name and namespace
manager.DeleteRayJobMetrics("ns2", "job2")

// Verify no metrics were deleted
recorder4 := httptest.NewRecorder()
handler.ServeHTTP(recorder4, req)

assert.Equal(t, http.StatusOK, recorder4.Code)
body4 := recorder4.Body.String()
assert.NotContains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

}

func TestMetricRayJobDeploymentStatus(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -141,11 +204,7 @@ func TestMetricRayJobDeploymentStatus(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand All @@ -154,7 +213,7 @@ func TestMetricRayJobDeploymentStatus(t *testing.T) {
}

if len(tc.rayJobs) > 0 {
err = client.Delete(t.Context(), &tc.rayJobs[0])
err := client.Delete(context.Background(), &tc.rayJobs[0])
require.NoError(t, err)
}

Expand Down
14 changes: 3 additions & 11 deletions ray-operator/controllers/ray/metrics/ray_service_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestRayServiceInfo(t *testing.T) {
Expand Down Expand Up @@ -60,11 +60,7 @@ func TestRayServiceInfo(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand Down Expand Up @@ -186,11 +182,7 @@ func TestRayServiceCondition(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if errors.IsNotFound(err) {
// Clear all related expectations
r.rayClusterScaleExpectation.Delete(instance.Name, instance.Namespace)
cleanUpRayClusterMetrics(r.options.RayClusterMetricsManager, request.Name, request.Namespace)
} else {
logger.Error(err, "Read request instance error!")
}
Expand Down Expand Up @@ -1571,6 +1572,13 @@ func emitRayClusterProvisionedDuration(RayClusterMetricsObserver metrics.RayClus
}
}

func cleanUpRayClusterMetrics(rayClusterMetricsManager *metrics.RayClusterMetricsManager, clusterName, namespace string) {
if rayClusterMetricsManager == nil {
return
}
rayClusterMetricsManager.DeleteRayClusterMetrics(clusterName, namespace)
}

// sumGPUs sums the GPUs in the given resource list.
func sumGPUs(resources map[corev1.ResourceName]resource.Quantity) resource.Quantity {
totalGPUs := resource.Quantity{}
Expand Down
10 changes: 9 additions & 1 deletion ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if err := r.Get(ctx, request.NamespacedName, rayJobInstance); err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request. Stop reconciliation.
logger.Info("RayJob resource not found. Ignoring since object must be deleted")
logger.Info("RayJob resource not found.")
cleanUpRayJobMetrics(r.options.RayJobMetricsManager, request.Name, request.Namespace)
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -492,6 +493,13 @@ func emitRayJobExecutionDuration(rayJobMetricsObserver metrics.RayJobMetricsObse
}
}

func cleanUpRayJobMetrics(rayJobMetricsManager *metrics.RayJobMetricsManager, rayJobName, rayJobNamespace string) {
if rayJobMetricsManager == nil {
return
}
rayJobMetricsManager.DeleteRayJobMetrics(rayJobName, rayJobNamespace)
}

// checkBackoffLimitAndUpdateStatusIfNeeded determines if a RayJob is eligible for retry based on the configured backoff limit,
// the job's success status, and its failure status. If eligible, sets the JobDeploymentStatus to Retrying.
func checkBackoffLimitAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) {
Expand Down
Loading