@@ -20,22 +20,42 @@ import (
20
20
"context"
21
21
"os"
22
22
23
+ "k8s.io/apimachinery/pkg/runtime"
24
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
23
25
"k8s.io/apimachinery/pkg/util/uuid"
24
26
"k8s.io/apiserver/pkg/server"
25
27
"k8s.io/client-go/informers"
26
28
"k8s.io/client-go/kubernetes"
29
+ clientgoscheme "k8s.io/client-go/kubernetes/scheme"
27
30
"k8s.io/client-go/rest"
28
31
restclient "k8s.io/client-go/rest"
29
32
"k8s.io/client-go/tools/clientcmd"
30
33
"k8s.io/client-go/tools/leaderelection"
31
34
"k8s.io/client-go/tools/leaderelection/resourcelock"
32
35
"k8s.io/klog/v2"
36
+ "k8s.io/klog/v2/klogr"
33
37
38
+ ctrl "sigs.k8s.io/controller-runtime"
39
+ "sigs.k8s.io/controller-runtime/pkg/healthz"
40
+
41
+ schedulingv1a1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
34
42
"sigs.k8s.io/scheduler-plugins/pkg/controller"
43
+ "sigs.k8s.io/scheduler-plugins/pkg/controllers"
35
44
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
36
45
schedformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
37
46
)
38
47
48
+ var (
49
+ scheme = runtime .NewScheme ()
50
+ setupLog = ctrl .Log .WithName ("setup" )
51
+ )
52
+
53
+ func init () {
54
+ utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
55
+
56
+ utilruntime .Must (schedulingv1a1 .AddToScheme (scheme ))
57
+ }
58
+
39
59
func newConfig (kubeconfig , master string , inCluster bool ) (* restclient.Config , error ) {
40
60
var (
41
61
config * rest.Config
@@ -66,17 +86,53 @@ func Run(s *ServerRunOptions) error {
66
86
kubeClient := kubernetes .NewForConfigOrDie (config )
67
87
68
88
schedInformerFactory := schedformers .NewSharedInformerFactory (schedClient , 0 )
69
- pgInformer := schedInformerFactory .Scheduling ().V1alpha1 ().PodGroups ()
70
89
eqInformer := schedInformerFactory .Scheduling ().V1alpha1 ().ElasticQuotas ()
71
90
72
91
coreInformerFactory := informers .NewSharedInformerFactory (kubeClient , 0 )
73
92
podInformer := coreInformerFactory .Core ().V1 ().Pods ()
74
- pgCtrl := controller .NewPodGroupController (kubeClient , pgInformer , podInformer , schedClient )
75
93
eqCtrl := controller .NewElasticQuotaController (kubeClient , eqInformer , podInformer , schedClient )
76
94
95
+ // Controller Runtime Controllers
96
+ ctrl .SetLogger (klogr .New ())
97
+ mgr , err := ctrl .NewManager (ctrl .GetConfigOrDie (), ctrl.Options {
98
+ Scheme : scheme ,
99
+ MetricsBindAddress : s .MetricsAddr ,
100
+ Port : 9443 ,
101
+ HealthProbeBindAddress : s .ProbeAddr ,
102
+ LeaderElection : s .EnableLeaderElection ,
103
+ LeaderElectionID : "sched-plugins-controllers" ,
104
+ LeaderElectionNamespace : "kube-system" ,
105
+ })
106
+ if err != nil {
107
+ setupLog .Error (err , "unable to start manager" )
108
+ return err
109
+ }
110
+
111
+ if err = (& controllers.PodGroupReconciler {
112
+ Client : mgr .GetClient (),
113
+ Scheme : mgr .GetScheme (),
114
+ }).SetupWithManager (mgr ); err != nil {
115
+ setupLog .Error (err , "unable to create controller" , "controller" , "PodGroup" )
116
+ return err
117
+ }
118
+
119
+ if err := mgr .AddHealthzCheck ("healthz" , healthz .Ping ); err != nil {
120
+ setupLog .Error (err , "unable to set up health check" )
121
+ return err
122
+ }
123
+ if err := mgr .AddReadyzCheck ("readyz" , healthz .Ping ); err != nil {
124
+ setupLog .Error (err , "unable to set up ready check" )
125
+ return err
126
+ }
127
+
77
128
run := func (ctx context.Context ) {
78
- go pgCtrl .Run (s .Workers , ctx .Done ())
79
129
go eqCtrl .Run (s .Workers , ctx .Done ())
130
+ setupLog .Info ("starting manager" )
131
+ if err := mgr .Start (ctrl .SetupSignalHandler ()); err != nil {
132
+ setupLog .Error (err , "unable to start manager" )
133
+ panic (err )
134
+ }
135
+
80
136
select {}
81
137
}
82
138
schedInformerFactory .Start (stopCh )
0 commit comments