Skip to content

Commit b61b28d

Browse files
committed
Add config in main() & fix the entrance to schedule cleanup
1 parent 8ac1cc8 commit b61b28d

File tree

5 files changed

+17
-19
lines changed

5 files changed

+17
-19
lines changed

ray-operator/controllers/ray/metrics/ray_cluster_metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type RayClusterMetricsManager struct {
4343
}
4444

4545
// NewRayClusterMetricsManager creates a new RayClusterMetricsManager instance.
46-
func NewRayClusterMetricsManager(ctx context.Context, client client.Client) *RayClusterMetricsManager {
46+
func NewRayClusterMetricsManager(ctx context.Context, client client.Client, metricsTTLSeconds int) *RayClusterMetricsManager {
4747
manager := &RayClusterMetricsManager{
4848
rayClusterProvisionedDurationSeconds: prometheus.NewGaugeVec(
4949
prometheus.GaugeOpts{
@@ -73,7 +73,7 @@ func NewRayClusterMetricsManager(ctx context.Context, client client.Client) *Ray
7373
client: client,
7474
log: ctrl.LoggerFrom(ctx),
7575
cleanupQueue: make([]RayClusterMetricCleanupItem, 0),
76-
metricTTL: 5 * time.Minute, // Keep metrics for 5 minutes
76+
metricTTL: time.Duration(metricsTTLSeconds) * time.Second,
7777
}
7878

7979
// Start the cleanup goroutine

ray-operator/controllers/ray/metrics/ray_job_metrics.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type RayJobMetricsObserver interface {
2020
}
2121

2222
// RayJobMetricCleanupItem represents an item in the RayJob metric cleanup queue
23-
type RayJobMetricCleanupItem struct {
23+
type RayJobMetricCleanupItem struct {
2424
Name string
2525
Namespace string
2626
DeleteAt time.Time
@@ -41,7 +41,7 @@ type RayJobMetricsManager struct {
4141
}
4242

4343
// NewRayJobMetricsManager creates a new RayJobMetricsManager instance.
44-
func NewRayJobMetricsManager(ctx context.Context, client client.Client) *RayJobMetricsManager {
44+
func NewRayJobMetricsManager(ctx context.Context, client client.Client, metricsTTLSeconds int) *RayJobMetricsManager {
4545
manager := &RayJobMetricsManager{
4646
rayJobExecutionDurationSeconds: prometheus.NewGaugeVec(
4747
prometheus.GaugeOpts{
@@ -67,9 +67,9 @@ func NewRayJobMetricsManager(ctx context.Context, client client.Client) *RayJobM
6767
client: client,
6868
log: ctrl.LoggerFrom(ctx),
6969
cleanupQueue: make([]RayJobMetricCleanupItem, 0),
70-
metricTTL: 5 * time.Minute, // Keep metrics for 5 minutes
70+
metricTTL: time.Duration(metricsTTLSeconds) * time.Second,
7171
}
72-
72+
7373
// Start the cleanup goroutine
7474
go manager.startRayJobCleanupLoop(ctx)
7575
return manager

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reque
135135
if errors.IsNotFound(err) {
136136
// Clear all related expectations
137137
r.rayClusterScaleExpectation.Delete(instance.Name, instance.Namespace)
138+
if r.options.RayClusterMetricsManager != nil {
139+
r.options.RayClusterMetricsManager.ScheduleRayClusterMetricForCleanup(instance.Name, instance.Namespace)
140+
}
138141
} else {
139142
logger.Error(err, "Read request instance error!")
140143
}
@@ -195,13 +198,6 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance
195198
// Please do NOT modify `originalRayClusterInstance` in the following code.
196199
originalRayClusterInstance := instance.DeepCopy()
197200

198-
if instance.DeletionTimestamp != nil && !instance.DeletionTimestamp.IsZero() {
199-
logger.Info("RayCluster is being deleted, scheduling metric for cleanup")
200-
if r.options.RayClusterMetricsManager != nil {
201-
r.options.RayClusterMetricsManager.ScheduleRayClusterMetricForCleanup(instance.Name, instance.Namespace)
202-
}
203-
}
204-
205201
// The `enableGCSFTRedisCleanup` is a feature flag introduced in KubeRay v1.0.0. It determines whether
206202
// the Redis cleanup job should be activated. Users can disable the feature by setting the environment
207203
// variable `ENABLE_GCS_FT_REDIS_CLEANUP` to `false`, and undertake the Redis storage namespace cleanup

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
9292
if errors.IsNotFound(err) {
9393
// Request object not found, could have been deleted after reconcile request. Stop reconciliation.
9494
logger.Info("RayJob resource not found. Ignoring since object must be deleted")
95+
// Schedule metrics cleanup when job reaches terminal status
96+
if r.options.RayJobMetricsManager != nil {
97+
r.options.RayJobMetricsManager.ScheduleRayJobMetricForCleanup(rayJobInstance.Name, rayJobInstance.Namespace)
98+
}
9599
return ctrl.Result{}, nil
96100
}
97101
// Error reading the object - requeue the request.
@@ -465,10 +469,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
465469
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
466470
}
467471
emitRayJobMetrics(r.options.RayJobMetricsManager, rayJobInstance.Name, rayJobInstance.Namespace, originalRayJobInstance.Status, rayJobInstance.Status)
468-
// Schedule metrics cleanup when job reaches terminal status
469-
if !rayv1.IsJobDeploymentTerminal(originalRayJobInstance.Status.JobDeploymentStatus) && rayv1.IsJobDeploymentTerminal(rayJobInstance.Status.JobDeploymentStatus) {
470-
r.options.RayJobMetricsManager.ScheduleRayJobMetricForCleanup(rayJobInstance.Name, rayJobInstance.Namespace)
471-
}
472472
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
473473
}
474474

ray-operator/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func main() {
6969
var enableBatchScheduler bool
7070
var batchScheduler string
7171
var enableMetrics bool
72+
var metricsTTLSeconds int
7273

7374
// TODO: remove flag-based config once Configuration API graduates to v1.
7475
flag.StringVar(&metricsAddr, "metrics-addr", configapi.DefaultMetricsAddr, "The address the metric endpoint binds to.")
@@ -100,6 +101,7 @@ func main() {
100101
"Use Kubernetes proxy subresource when connecting to the Ray Head node.")
101102
flag.StringVar(&featureGates, "feature-gates", "", "A set of key=value pairs that describe feature gates. E.g. FeatureOne=true,FeatureTwo=false,...")
102103
flag.BoolVar(&enableMetrics, "enable-metrics", false, "Enable the emission of control plane metrics.")
104+
flag.IntVar(&metricsTTLSeconds, "metrics-ttl-seconds", 300, "The time to live for metrics cleanup in seconds. Default is 300 seconds (5 minutes).")
103105

104106
opts := k8szap.Options{
105107
TimeEncoder: zapcore.ISO8601TimeEncoder,
@@ -237,8 +239,8 @@ func main() {
237239
var rayServiceMetricsManager *metrics.RayServiceMetricsManager
238240
if config.EnableMetrics {
239241
mgrClient := mgr.GetClient()
240-
rayClusterMetricsManager = metrics.NewRayClusterMetricsManager(ctx, mgrClient)
241-
rayJobMetricsManager = metrics.NewRayJobMetricsManager(ctx, mgrClient)
242+
rayClusterMetricsManager = metrics.NewRayClusterMetricsManager(ctx, mgrClient, metricsTTLSeconds)
243+
rayJobMetricsManager = metrics.NewRayJobMetricsManager(ctx, mgrClient, metricsTTLSeconds)
242244
rayServiceMetricsManager = metrics.NewRayServiceMetricsManager(ctx, mgrClient)
243245
ctrlmetrics.Registry.MustRegister(
244246
rayClusterMetricsManager,

0 commit comments

Comments
 (0)