Skip to content

Commit 52e027a

Browse files
owenowenismewin5923
authored andcommitted
Move BatchSchedulerManager into reconciler option (ray-project#3935)
Signed-off-by: You-Cheng Lin (Owen) <[email protected]> Signed-off-by: Owen Lin (You-Cheng Lin) <[email protected]> Co-authored-by: Jun-Hao Wan <[email protected]>
1 parent 775b111 commit 52e027a

File tree

3 files changed

+20
-26
lines changed

3 files changed

+20
-26
lines changed

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"sigs.k8s.io/controller-runtime/pkg/predicate"
3434
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3535

36-
configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
3736
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
3837
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
3938
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
@@ -53,29 +52,18 @@ var (
5352
)
5453

5554
// NewReconciler returns a new reconcile.Reconciler
56-
func NewReconciler(ctx context.Context, mgr manager.Manager, options RayClusterReconcilerOptions, rayConfigs configapi.Configuration) *RayClusterReconciler {
55+
func NewReconciler(ctx context.Context, mgr manager.Manager, options RayClusterReconcilerOptions) *RayClusterReconciler {
5756
if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, podUIDIndexField, func(rawObj client.Object) []string {
5857
pod := rawObj.(*corev1.Pod)
5958
return []string{string(pod.UID)}
6059
}); err != nil {
6160
panic(err)
6261
}
6362

64-
// init the batch scheduler manager
65-
schedulerMgr, err := batchscheduler.NewSchedulerManager(ctx, rayConfigs, mgr.GetConfig(), mgr.GetClient())
66-
if err != nil {
67-
// fail fast if the scheduler plugin fails to init
68-
// prevent running the controller in an undefined state
69-
panic(err)
70-
}
71-
72-
// add schema to runtime
73-
schedulerMgr.AddToScheme(mgr.GetScheme())
7463
return &RayClusterReconciler{
7564
Client: mgr.GetClient(),
7665
Scheme: mgr.GetScheme(),
7766
Recorder: mgr.GetEventRecorderFor("raycluster-controller"),
78-
BatchSchedulerMgr: schedulerMgr,
7967
rayClusterScaleExpectation: expectations.NewRayClusterScaleExpectation(mgr.GetClient()),
8068
options: options,
8169
}
@@ -86,13 +74,13 @@ type RayClusterReconciler struct {
8674
client.Client
8775
Scheme *k8sruntime.Scheme
8876
Recorder record.EventRecorder
89-
BatchSchedulerMgr *batchscheduler.SchedulerManager
9077
rayClusterScaleExpectation expectations.RayClusterScaleExpectation
9178
options RayClusterReconcilerOptions
9279
}
9380

9481
type RayClusterReconcilerOptions struct {
9582
RayClusterMetricsManager *metrics.RayClusterMetricsManager
83+
BatchSchedulerManager *batchscheduler.SchedulerManager
9684
HeadSidecarContainers []corev1.Container
9785
WorkerSidecarContainers []corev1.Container
9886
IsOpenShift bool
@@ -592,8 +580,8 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
592580
}
593581
// check if the batch scheduler integration is enabled
594582
// call the scheduler plugin if so
595-
if r.BatchSchedulerMgr != nil {
596-
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(); err == nil {
583+
if r.options.BatchSchedulerManager != nil {
584+
if scheduler, err := r.options.BatchSchedulerManager.GetSchedulerForCluster(); err == nil {
597585
if err := scheduler.DoBatchSchedulingOnSubmission(ctx, instance); err != nil {
598586
return err
599587
}
@@ -935,8 +923,8 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1
935923
pod := r.buildHeadPod(ctx, instance)
936924
// check if the batch scheduler integration is enabled
937925
// call the scheduler plugin if so
938-
if r.BatchSchedulerMgr != nil {
939-
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(); err == nil {
926+
if r.options.BatchSchedulerManager != nil {
927+
if scheduler, err := r.options.BatchSchedulerManager.GetSchedulerForCluster(); err == nil {
940928
scheduler.AddMetadataToPod(ctx, &instance, utils.RayNodeHeadGroupLabelValue, &pod)
941929
} else {
942930
return err
@@ -958,8 +946,8 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray
958946

959947
// build the pod then create it
960948
pod := r.buildWorkerPod(ctx, instance, worker)
961-
if r.BatchSchedulerMgr != nil {
962-
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(); err == nil {
949+
if r.options.BatchSchedulerManager != nil {
950+
if scheduler, err := r.options.BatchSchedulerManager.GetSchedulerForCluster(); err == nil {
963951
scheduler.AddMetadataToPod(ctx, &instance, worker.GroupName, &pod)
964952
} else {
965953
return err
@@ -1123,8 +1111,8 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
11231111
))).
11241112
Owns(&corev1.Pod{}).
11251113
Owns(&corev1.Service{})
1126-
if r.BatchSchedulerMgr != nil {
1127-
r.BatchSchedulerMgr.ConfigureReconciler(b)
1114+
if r.options.BatchSchedulerManager != nil {
1115+
r.options.BatchSchedulerManager.ConfigureReconciler(b)
11281116
}
11291117

11301118
return b.

ray-operator/controllers/ray/suite_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"sigs.k8s.io/controller-runtime/pkg/manager"
3434
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3535

36-
configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
3736
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
3837
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
3938
)
@@ -117,8 +116,7 @@ var _ = BeforeSuite(func(ctx SpecContext) {
117116
},
118117
},
119118
}
120-
configs := configapi.Configuration{}
121-
err = NewReconciler(ctx, mgr, options, configs).SetupWithManager(mgr, 1)
119+
err = NewReconciler(ctx, mgr, options).SetupWithManager(mgr, 1)
122120
Expect(err).NotTo(HaveOccurred(), "failed to setup RayCluster controller")
123121

124122
testClientProvider := TestClientProvider{}

ray-operator/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
3232
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
3333
"github.com/ray-project/kuberay/ray-operator/controllers/ray"
34+
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
3435
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics"
3536
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
3637
"github.com/ray-project/kuberay/ray-operator/pkg/features"
@@ -235,6 +236,7 @@ func main() {
235236
var rayClusterMetricsManager *metrics.RayClusterMetricsManager
236237
var rayJobMetricsManager *metrics.RayJobMetricsManager
237238
var rayServiceMetricsManager *metrics.RayServiceMetricsManager
239+
var batchSchedulerManager *batchscheduler.SchedulerManager
238240
if config.EnableMetrics {
239241
mgrClient := mgr.GetClient()
240242
rayClusterMetricsManager = metrics.NewRayClusterMetricsManager(ctx, mgrClient)
@@ -246,13 +248,19 @@ func main() {
246248
rayServiceMetricsManager,
247249
)
248250
}
251+
252+
batchSchedulerManager, err = batchscheduler.NewSchedulerManager(ctx, config, restConfig, mgr.GetClient())
253+
exitOnError(err, "unable to create batch scheduler manager")
254+
batchSchedulerManager.AddToScheme(mgr.GetScheme())
255+
249256
rayClusterOptions := ray.RayClusterReconcilerOptions{
250257
HeadSidecarContainers: config.HeadSidecarContainers,
251258
WorkerSidecarContainers: config.WorkerSidecarContainers,
252259
IsOpenShift: utils.GetClusterType(),
253260
RayClusterMetricsManager: rayClusterMetricsManager,
261+
BatchSchedulerManager: batchSchedulerManager,
254262
}
255-
exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions, config).SetupWithManager(mgr, config.ReconcileConcurrency),
263+
exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(mgr, config.ReconcileConcurrency),
256264
"unable to create controller", "controller", "RayCluster")
257265

258266
exitOnError(ray.NewRayServiceReconciler(ctx, mgr, config).SetupWithManager(mgr, config.ReconcileConcurrency),

0 commit comments

Comments
 (0)