|
1 | 1 | package batchscheduler
|
2 | 2 |
|
3 | 3 | import (
|
4 |
| - "fmt" |
5 | 4 | "sync"
|
6 | 5 |
|
7 | 6 | "k8s.io/apimachinery/pkg/runtime"
|
8 |
| - "k8s.io/client-go/rest" |
9 | 7 | "sigs.k8s.io/controller-runtime/pkg/builder"
|
10 | 8 |
|
11 |
| - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" |
12 |
| - schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" |
| 9 | + configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" |
13 | 10 | "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
|
14 | 11 | "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
|
| 12 | + |
| 13 | + "k8s.io/client-go/rest" |
| 14 | + |
| 15 | + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" |
| 16 | + schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" |
15 | 17 | "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
|
16 | 18 | )
|
17 | 19 |
|
18 |
| -var schedulerContainers = map[string]schedulerinterface.BatchSchedulerFactory{ |
19 |
| - schedulerinterface.GetDefaultPluginName(): &schedulerinterface.DefaultBatchSchedulerFactory{}, |
20 |
| - volcano.GetPluginName(): &volcano.VolcanoBatchSchedulerFactory{}, |
21 |
| - yunikorn.GetPluginName(): &yunikorn.YuniKornSchedulerFactory{}, |
22 |
| -} |
23 |
| - |
24 |
| -func GetRegisteredNames() []string { |
25 |
| - var pluginNames []string |
26 |
| - for key := range schedulerContainers { |
27 |
| - pluginNames = append(pluginNames, key) |
28 |
| - } |
29 |
| - return pluginNames |
| 20 | +type SchedulerManager struct { |
| 21 | + config *rest.Config |
| 22 | + factory schedulerinterface.BatchSchedulerFactory |
| 23 | + scheduler schedulerinterface.BatchScheduler |
| 24 | + rayConfigs configapi.Configuration |
| 25 | + sync.Mutex |
30 | 26 | }
|
31 | 27 |
|
32 |
| -func ConfigureReconciler(b *builder.Builder) *builder.Builder { |
33 |
| - for _, factory := range schedulerContainers { |
34 |
| - b = factory.ConfigureReconciler(b) |
| 28 | +// NewSchedulerManager maintains a specific scheduler plugin based on config |
| 29 | +func NewSchedulerManager(rayConfigs configapi.Configuration, config *rest.Config) (*SchedulerManager, error) { |
| 30 | + // init the scheduler factory from config |
| 31 | + factory := getSchedulerFactory(rayConfigs) |
| 32 | + scheduler, err := factory.New(config) |
| 33 | + if err != nil { |
| 34 | + return nil, err |
35 | 35 | }
|
36 |
| - return b |
37 |
| -} |
38 | 36 |
|
39 |
| -func AddToScheme(scheme *runtime.Scheme) { |
40 |
| - for _, factory := range schedulerContainers { |
41 |
| - factory.AddToScheme(scheme) |
| 37 | + manager := SchedulerManager{ |
| 38 | + rayConfigs: rayConfigs, |
| 39 | + config: config, |
| 40 | + factory: factory, |
| 41 | + scheduler: scheduler, |
42 | 42 | }
|
43 |
| -} |
44 | 43 |
|
45 |
| -type SchedulerManager struct { |
46 |
| - config *rest.Config |
47 |
| - plugins map[string]schedulerinterface.BatchScheduler |
48 |
| - sync.Mutex |
| 44 | + return &manager, nil |
49 | 45 | }
|
50 | 46 |
|
51 |
| -func NewSchedulerManager(config *rest.Config) *SchedulerManager { |
52 |
| - manager := SchedulerManager{ |
53 |
| - config: config, |
54 |
| - plugins: make(map[string]schedulerinterface.BatchScheduler), |
| 47 | +func getSchedulerFactory(rayConfigs configapi.Configuration) schedulerinterface.BatchSchedulerFactory { |
| 48 | + var factory schedulerinterface.BatchSchedulerFactory |
| 49 | + // init with the default factory |
| 50 | + factory = &schedulerinterface.DefaultBatchSchedulerFactory{} |
| 51 | + // when a batch scheduler name is provided |
| 52 | + if len(rayConfigs.BatchScheduler) > 0 { |
| 53 | + switch rayConfigs.BatchScheduler { |
| 54 | + case volcano.GetPluginName(): |
| 55 | + factory = &volcano.VolcanoBatchSchedulerFactory{} |
| 56 | + case yunikorn.GetPluginName(): |
| 57 | + factory = &yunikorn.YuniKornSchedulerFactory{} |
| 58 | + default: |
| 59 | + factory = &schedulerinterface.DefaultBatchSchedulerFactory{} |
| 60 | + } |
55 | 61 | }
|
56 |
| - return &manager |
57 |
| -} |
58 | 62 |
|
59 |
| -func (batch *SchedulerManager) GetSchedulerForCluster(app *rayv1.RayCluster) (schedulerinterface.BatchScheduler, error) { |
60 |
| - if schedulerName, ok := app.ObjectMeta.Labels[utils.RaySchedulerName]; ok { |
61 |
| - return batch.GetScheduler(schedulerName) |
| 63 | + // legacy option, if this is enabled, register volcano |
| 64 | + // this is for backwards compatibility |
| 65 | + if rayConfigs.EnableBatchScheduler { |
| 66 | + factory = &volcano.VolcanoBatchSchedulerFactory{} |
62 | 67 | }
|
63 | 68 |
|
64 |
| - // no scheduler provided |
65 |
| - return &schedulerinterface.DefaultBatchScheduler{}, nil |
| 69 | + return factory |
66 | 70 | }
|
67 | 71 |
|
68 |
| -func (batch *SchedulerManager) GetScheduler(schedulerName string) (schedulerinterface.BatchScheduler, error) { |
69 |
| - factory, registered := schedulerContainers[schedulerName] |
70 |
| - if !registered { |
71 |
| - return nil, fmt.Errorf("unregistered scheduler plugin %s", schedulerName) |
| 72 | +func (batch *SchedulerManager) GetSchedulerForCluster(app *rayv1.RayCluster) (schedulerinterface.BatchScheduler, error) { |
| 73 | + // for backwards compatibility |
| 74 | + if batch.rayConfigs.EnableBatchScheduler { |
| 75 | + if schedulerName, ok := app.ObjectMeta.Labels[utils.RaySchedulerName]; ok { |
| 76 | + if schedulerName == volcano.GetPluginName() { |
| 77 | + return batch.scheduler, nil |
| 78 | + } |
| 79 | + } |
72 | 80 | }
|
73 | 81 |
|
74 |
| - batch.Lock() |
75 |
| - defer batch.Unlock() |
| 82 | + return batch.scheduler, nil |
| 83 | +} |
76 | 84 |
|
77 |
| - plugin, existed := batch.plugins[schedulerName] |
| 85 | +func (batch *SchedulerManager) ConfigureReconciler(b *builder.Builder) *builder.Builder { |
| 86 | + batch.factory.ConfigureReconciler(b) |
| 87 | + return b |
| 88 | +} |
78 | 89 |
|
79 |
| - if existed && plugin != nil { |
80 |
| - return plugin, nil |
81 |
| - } |
82 |
| - if existed && plugin == nil { |
83 |
| - return nil, fmt.Errorf( |
84 |
| - "failed to get scheduler plugin %s, previous initialization has failed", schedulerName) |
85 |
| - } |
86 |
| - plugin, err := factory.New(batch.config) |
87 |
| - if err != nil { |
88 |
| - batch.plugins[schedulerName] = nil |
89 |
| - return nil, err |
90 |
| - } |
91 |
| - batch.plugins[schedulerName] = plugin |
92 |
| - return plugin, nil |
| 90 | +func (batch *SchedulerManager) AddToScheme(scheme *runtime.Scheme) { |
| 91 | + batch.factory.AddToScheme(scheme) |
93 | 92 | }
|
0 commit comments