Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ray-operator/controllers/ray/metrics/ray_cluster_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func (r *RayClusterMetricsManager) ObserveRayClusterProvisionedDuration(name, na
r.rayClusterProvisionedDurationSeconds.WithLabelValues(name, namespace).Set(duration)
}

// DeleteRayClusterMetrics removes metrics that belongs to the specified RayCluster.
func (r *RayClusterMetricsManager) DeleteRayClusterMetrics(name, namespace string) {
numCleanedUpMetrics := r.rayClusterProvisionedDurationSeconds.Delete(prometheus.Labels{"name": name, "namespace": namespace})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @phantom5125, I think we should use your original proposal, DeletePartialMatch, now. Having a comment here doesn't help us switch to DeletePartialMatch in the future, I believe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rueian Thank you for the review! Fixed in 40e1831

r.log.Info("Cleaned up expired RayCluster metric", "name", name, "namespace", namespace, "numCleanedUpMetrics", numCleanedUpMetrics)
}

func (r *RayClusterMetricsManager) collectRayClusterInfo(cluster *rayv1.RayCluster, ch chan<- prometheus.Metric) {
ownerKind := "None"
if v, ok := cluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok {
Expand Down
84 changes: 71 additions & 13 deletions ray-operator/controllers/ray/metrics/ray_cluster_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestRayClusterInfo(t *testing.T) {
Expand Down Expand Up @@ -65,11 +65,7 @@ func TestRayClusterInfo(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand All @@ -78,7 +74,7 @@ func TestRayClusterInfo(t *testing.T) {
}

if len(tc.clusters) > 0 {
err = client.Delete(t.Context(), &tc.clusters[0])
err := client.Delete(t.Context(), &tc.clusters[0])
require.NoError(t, err)
}

Expand All @@ -96,6 +92,72 @@ func TestRayClusterInfo(t *testing.T) {
}
}

func TestDeleteRayClusterMetrics(t *testing.T) {
k8sScheme := runtime.NewScheme()
require.NoError(t, rayv1.AddToScheme(k8sScheme))
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
manager := NewRayClusterMetricsManager(context.Background(), client)
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

// Test case 1: Delete specific cluster metrics
// Manually add some metrics
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster1", "namespace": "ns1"}).Set(10.5)
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster2", "namespace": "ns2"}).Set(20.3)
manager.rayClusterProvisionedDurationSeconds.With(prometheus.Labels{"name": "cluster3", "namespace": "ns1"}).Set(5.7)

// Test deleting metrics for cluster1 in ns1
manager.DeleteRayClusterMetrics("cluster1", "ns1")

// Verify metrics
req, recorder, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, recorder.Code)
body := recorder.Body.String()
assert.NotContains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
assert.Contains(t, body, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)

// Test case 2: Delete with empty name
manager.DeleteRayClusterMetrics("", "ns1")

// Verify metrics again
recorder2 := httptest.NewRecorder()
handler.ServeHTTP(recorder2, req)

assert.Equal(t, http.StatusOK, recorder2.Code)
body2 := recorder2.Body.String()
assert.NotContains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body2, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)

// Test case 3: Delete with empty name and namespace
manager.DeleteRayClusterMetrics("", "")

// Verify no metrics were deleted
recorder3 := httptest.NewRecorder()
handler.ServeHTTP(recorder3, req)

assert.Equal(t, http.StatusOK, recorder3.Code)
body3 := recorder3.Body.String()
assert.NotContains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body3, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)

// Test case 4: Delete with false name and namespace
manager.DeleteRayClusterMetrics("ns2", "cluster2")

// Verify no metrics were deleted
recorder4 := httptest.NewRecorder()
handler.ServeHTTP(recorder4, req)

assert.Equal(t, http.StatusOK, recorder4.Code)
body4 := recorder4.Body.String()
assert.NotContains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster1",namespace="ns1"}`)
assert.Contains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster3",namespace="ns1"}`)
assert.Contains(t, body4, `kuberay_cluster_provisioned_duration_seconds{name="cluster2",namespace="ns2"}`)
}

func TestRayClusterConditionProvisioned(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -155,11 +217,7 @@ func TestRayClusterConditionProvisioned(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand All @@ -168,7 +226,7 @@ func TestRayClusterConditionProvisioned(t *testing.T) {
}

if len(tc.clusters) > 0 {
err = client.Delete(t.Context(), &tc.clusters[0])
err := client.Delete(context.Background(), &tc.clusters[0])
require.NoError(t, err)
}

Expand Down
6 changes: 6 additions & 0 deletions ray-operator/controllers/ray/metrics/ray_job_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (r *RayJobMetricsManager) ObserveRayJobExecutionDuration(name, namespace st
r.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, string(jobDeploymentStatus), strconv.Itoa(retryCount)).Set(duration)
}

// DeleteRayJobMetrics removes metrics that belongs to the specified RayJob.
func (r *RayJobMetricsManager) DeleteRayJobMetrics(name, namespace string) {
numCleanedUpMetrics := r.rayJobExecutionDurationSeconds.DeletePartialMatch(prometheus.Labels{"name": name, "namespace": namespace})
r.log.Info("Cleaned up expired rayJob metric", "name", name, "namespace", namespace, "numCleanedUpMetrics", numCleanedUpMetrics)
}

func (r *RayJobMetricsManager) collectRayJobInfo(rayJob *rayv1.RayJob, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
r.rayJobInfo,
Expand Down
85 changes: 72 additions & 13 deletions ray-operator/controllers/ray/metrics/ray_job_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestMetricRayJobInfo(t *testing.T) {
Expand Down Expand Up @@ -61,11 +61,7 @@ func TestMetricRayJobInfo(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand All @@ -74,7 +70,7 @@ func TestMetricRayJobInfo(t *testing.T) {
}

if len(tc.rayJobs) > 0 {
err = client.Delete(t.Context(), &tc.rayJobs[0])
err := client.Delete(t.Context(), &tc.rayJobs[0])
require.NoError(t, err)
}

Expand All @@ -92,6 +88,73 @@ func TestMetricRayJobInfo(t *testing.T) {
}
}

func TestDeleteRayJobMetrics(t *testing.T) {
k8sScheme := runtime.NewScheme()
require.NoError(t, rayv1.AddToScheme(k8sScheme))
client := fake.NewClientBuilder().WithScheme(k8sScheme).Build()
manager := NewRayJobMetricsManager(context.Background(), client)
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

// Test case 1: Delete specific job metrics
// Manually add some metrics
manager.ObserveRayJobExecutionDuration("job1", "ns1", rayv1.JobDeploymentStatusComplete, 0, 10.5)
manager.ObserveRayJobExecutionDuration("job2", "ns2", rayv1.JobDeploymentStatusFailed, 1, 20.3)
manager.ObserveRayJobExecutionDuration("job3", "ns1", rayv1.JobDeploymentStatusRunning, 0, 5.7)

// Test deleting metrics for job1 in ns1
manager.DeleteRayJobMetrics("job1", "ns1")

// Verify metrics
req, recorder, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, recorder.Code)
body := recorder.Body.String()
assert.NotContains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 2: Delete with empty name
manager.DeleteRayJobMetrics("", "ns1")

// Verify metrics again
recorder2 := httptest.NewRecorder()
handler.ServeHTTP(recorder2, req)

assert.Equal(t, http.StatusOK, recorder2.Code)
body2 := recorder2.Body.String()
assert.NotContains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body2, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 3: Delete with empty name and namespace
manager.DeleteRayJobMetrics("", "")

// Verify no metrics were deleted
recorder3 := httptest.NewRecorder()
handler.ServeHTTP(recorder3, req)

assert.Equal(t, http.StatusOK, recorder3.Code)
body3 := recorder3.Body.String()
assert.NotContains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body3, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

// Test case 4: Delete with false name and namespace
manager.DeleteRayJobMetrics("ns2", "job2")

// Verify no metrics were deleted
recorder4 := httptest.NewRecorder()
handler.ServeHTTP(recorder4, req)

assert.Equal(t, http.StatusOK, recorder4.Code)
body4 := recorder4.Body.String()
assert.NotContains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Complete",name="job1",namespace="ns1",retry_count="0"}`)
assert.Contains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Failed",name="job2",namespace="ns2",retry_count="1"}`)
assert.Contains(t, body4, `kuberay_job_execution_duration_seconds{job_deployment_status="Running",name="job3",namespace="ns1",retry_count="0"}`)

}

func TestMetricRayJobDeploymentStatus(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -141,11 +204,7 @@ func TestMetricRayJobDeploymentStatus(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand All @@ -154,7 +213,7 @@ func TestMetricRayJobDeploymentStatus(t *testing.T) {
}

if len(tc.rayJobs) > 0 {
err = client.Delete(t.Context(), &tc.rayJobs[0])
err := client.Delete(context.Background(), &tc.rayJobs[0])
require.NoError(t, err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestRayServiceInfo(t *testing.T) {
Expand Down Expand Up @@ -60,11 +60,7 @@ func TestRayServiceInfo(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand Down Expand Up @@ -186,11 +182,7 @@ func TestRayServiceCondition(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(manager)

req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/metrics", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
handler.ServeHTTP(rr, req)
req, rr, handler := support.CreateAndExecuteMetricsRequest(t, reg)

assert.Equal(t, http.StatusOK, rr.Code)
body := rr.Body.String()
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if errors.IsNotFound(err) {
// Clear all related expectations
r.rayClusterScaleExpectation.Delete(instance.Name, instance.Namespace)
cleanUpRayClusterMetrics(r.options.RayClusterMetricsManager, request.Name, request.Namespace)
} else {
logger.Error(err, "Read request instance error!")
}
Expand Down Expand Up @@ -1571,6 +1572,13 @@ func emitRayClusterProvisionedDuration(RayClusterMetricsObserver metrics.RayClus
}
}

func cleanUpRayClusterMetrics(rayClusterMetricsManager *metrics.RayClusterMetricsManager, clusterName, namespace string) {
if rayClusterMetricsManager == nil {
return
}
rayClusterMetricsManager.DeleteRayClusterMetrics(clusterName, namespace)
}

// sumGPUs sums the GPUs in the given resource list.
func sumGPUs(resources map[corev1.ResourceName]resource.Quantity) resource.Quantity {
totalGPUs := resource.Quantity{}
Expand Down
10 changes: 9 additions & 1 deletion ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if err := r.Get(ctx, request.NamespacedName, rayJobInstance); err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request. Stop reconciliation.
logger.Info("RayJob resource not found. Ignoring since object must be deleted")
logger.Info("RayJob resource not found.")
cleanUpRayJobMetrics(r.options.RayJobMetricsManager, request.Name, request.Namespace)
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -492,6 +493,13 @@ func emitRayJobExecutionDuration(rayJobMetricsObserver metrics.RayJobMetricsObse
}
}

func cleanUpRayJobMetrics(rayJobMetricsManager *metrics.RayJobMetricsManager, rayJobName, rayJobNamespace string) {
if rayJobMetricsManager == nil {
return
}
rayJobMetricsManager.DeleteRayJobMetrics(rayJobName, rayJobNamespace)
}

// checkBackoffLimitAndUpdateStatusIfNeeded determines if a RayJob is eligible for retry based on the configured backoff limit,
// the job's success status, and its failure status. If eligible, sets the JobDeploymentStatus to Retrying.
func checkBackoffLimitAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) {
Expand Down
Loading