@@ -26,7 +26,7 @@ import (
2626 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2727 _ "k8s.io/client-go/plugin/pkg/client/auth"
2828 "k8s.io/client-go/util/retry"
29- "k8s.io/utils/pointer "
29+ "k8s.io/utils/ptr "
3030 ctrl "sigs.k8s.io/controller-runtime"
3131 "sigs.k8s.io/controller-runtime/pkg/builder"
3232 "sigs.k8s.io/controller-runtime/pkg/cache"
@@ -35,7 +35,6 @@ import (
3535 "sigs.k8s.io/controller-runtime/pkg/event"
3636 "sigs.k8s.io/controller-runtime/pkg/healthz"
3737 "sigs.k8s.io/controller-runtime/pkg/manager"
38- "sigs.k8s.io/controller-runtime/pkg/metrics"
3938 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
4039 "sigs.k8s.io/controller-runtime/pkg/predicate"
4140 "sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -68,15 +67,6 @@ var runCmd = &cobra.Command{
6867 Run : func (cmd * cobra.Command , args []string ) {
6968 ctrl .SetLogger (logrusr .New (log .Log ))
7069
71- kClient , err := client .New (ctrl .GetConfigOrDie (), client.Options {})
72- if err != nil {
73- log .WithError (err ).Fatal ("unable to create client" )
74- }
75-
76- if err := initializeLabels (context .Background (), kClient ); err != nil {
77- log .WithError (err ).Fatal ("failed to initialize labels" )
78- }
79-
8070 mgr , err := ctrl .NewManager (ctrl .GetConfigOrDie (), ctrl.Options {
8171 Scheme : scheme ,
8272 HealthProbeBindAddress : ":8086" ,
@@ -88,7 +78,7 @@ var runCmd = &cobra.Command{
8878 // default sync period is 10h.
8979 // in case node-labeler is restarted and not change happens, we could waste (at least) 20m in a node
9080 // that never will run workspaces and the additional nodes cluster-autoscaler adds to compensate
91- SyncPeriod : pointer . Duration (2 * time .Minute ),
81+ SyncPeriod : ptr . To ( time . Duration (2 * time .Minute ) ),
9282 },
9383 WebhookServer : webhook .NewServer (webhook.Options {
9484 Port : 9443 ,
@@ -101,7 +91,7 @@ var runCmd = &cobra.Command{
10191 }
10292
10393 r := & PodReconciler {
104- kClient ,
94+ mgr . GetClient () ,
10595 }
10696
10797 componentPredicate , err := predicate .LabelSelectorPredicate (metav1.LabelSelector {
@@ -123,6 +113,25 @@ var runCmd = &cobra.Command{
123113 if err != nil {
124114 log .WithError (err ).Fatal ("unable to bind controller watch event handler" )
125115 }
116+ nr := & NodeReconciler {
117+ mgr .GetClient (),
118+ }
119+
120+ err = ctrl .NewControllerManagedBy (mgr ).
121+ Named ("node-watcher" ).
122+ For (& corev1.Node {}, builder .WithPredicates (predicate .Or (nr .nodeFilter ()))).
123+ WithOptions (controller.Options {MaxConcurrentReconciles : 1 }).
124+ Complete (nr )
125+ if err != nil {
126+ log .WithError (err ).Fatal ("unable to bind controller watch event handler" )
127+ }
128+
129+ go func () {
130+ <- mgr .Elected ()
131+ if err := nr .reconcileAll (context .Background ()); err != nil {
132+ log .WithError (err ).Fatal ("failed to reconcile all nodes" )
133+ }
134+ }()
126135
127136 if err := mgr .GetFieldIndexer ().IndexField (context .Background (), & workspacev1.Workspace {}, "status.runtime.nodeName" , func (o client.Object ) []string {
128137 ws := o .(* workspacev1.Workspace )
@@ -135,6 +144,17 @@ var runCmd = &cobra.Command{
135144 return
136145 }
137146
147+ if err := mgr .GetFieldIndexer ().IndexField (context .Background (), & corev1.Pod {}, "spec.nodeName" , func (o client.Object ) []string {
148+ pod := o .(* corev1.Pod )
149+ if pod .Spec .NodeName == "" {
150+ return nil
151+ }
152+ return []string {pod .Spec .NodeName }
153+ }); err != nil {
154+ log .WithError (err ).Fatal ("unable to create pod indexer" )
155+ return
156+ }
157+
138158 nsac , err := NewNodeScaledownAnnotationController (mgr .GetClient ())
139159 if err != nil {
140160 log .WithError (err ).Fatal ("unable to create node scaledown annotation controller" )
@@ -153,10 +173,6 @@ var runCmd = &cobra.Command{
153173 if err != nil {
154174 log .WithError (err ).Fatal ("couldn't properly clean up node scaledown annotation controller" )
155175 }
156-
157- metrics .Registry .MustRegister (NodeLabelerCounterVec )
158- metrics .Registry .MustRegister (NodeLabelerTimeHistVec )
159-
160176 err = mgr .AddHealthzCheck ("healthz" , healthz .Ping )
161177 if err != nil {
162178 log .WithError (err ).Fatal ("unable to set up health check" )
@@ -208,96 +224,170 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
208224 return reconcile.Result {RequeueAfter : defaultRequeueTime }, nil
209225 }
210226
227+ var taintKey string
228+ switch {
229+ case strings .HasPrefix (pod .Name , registryFacade ):
230+ taintKey = registryFacadeTaintKey
231+ case strings .HasPrefix (pod .Name , wsDaemon ):
232+ taintKey = wsDaemonTaintKey
233+ default :
234+ // nothing to do
235+ return reconcile.Result {}, nil
236+ }
237+
238+ healthy , err := checkPodHealth (pod )
239+ if err != nil {
240+ log .WithError (err ).Error ("checking pod health" )
241+ return reconcile.Result {RequeueAfter : defaultRequeueTime }, nil
242+ }
243+
244+ var node corev1.Node
245+ err = r .Get (ctx , types.NamespacedName {Name : nodeName }, & node )
246+ if err != nil {
247+ return reconcile.Result {}, fmt .Errorf ("obtaining node %s: %w" , nodeName , err )
248+ }
249+
250+ if isNodeTaintExists (taintKey , node ) != healthy {
251+ // nothing to do, the taint already exists and is in the desired state.
252+ return reconcile.Result {}, nil
253+ }
254+
255+ err = updateNodeTaint (taintKey , ! healthy , nodeName , r )
256+ if err != nil {
257+ log .WithError (err ).Error ("updating node taint" )
258+ return reconcile.Result {RequeueAfter : defaultRequeueTime }, nil
259+ }
260+
261+ return reconcile.Result {}, nil
262+ }
263+
264+ func checkPodHealth (pod corev1.Pod ) (bool , error ) {
211265 var (
212266 ipAddress string
213267 port string
214- taintKey string
215268 )
216-
217269 switch {
218270 case strings .HasPrefix (pod .Name , registryFacade ):
219- taintKey = registryFacadeTaintKey
220271 ipAddress = pod .Status .HostIP
221272 port = strconv .Itoa (registryFacadePort )
222273 case strings .HasPrefix (pod .Name , wsDaemon ):
223- taintKey = wsDaemonTaintKey
224274 ipAddress = pod .Status .PodIP
225275 port = strconv .Itoa (wsdaemonPort )
226276 default :
227277 // nothing to do
228- return reconcile. Result {} , nil
278+ return true , nil
229279 }
230280
231281 if ! pod .ObjectMeta .DeletionTimestamp .IsZero () {
232282 // the pod is being removed.
233283 // add the taint to the node
234- time .Sleep (1 * time .Second )
235- err := updateNodeTaint (taintKey , true , nodeName , r )
236- if err != nil {
237- // this is a edge case when cluster-autoscaler removes a node
238- // (all the running pods will be removed after that)
239- if errors .IsNotFound (err ) {
240- return reconcile.Result {}, nil
241- }
242-
243- log .WithError (err ).Error ("adding node taint" )
244- return reconcile.Result {RequeueAfter : defaultRequeueTime }, err
245- }
246-
247- return reconcile.Result {}, err
284+ return false , nil
248285 }
249286
250287 if ! IsPodReady (pod ) {
251288 // not ready. Wait until the next update.
252- return reconcile. Result {} , nil
289+ return false , nil
253290 }
254291
255- var node corev1.Node
256- err = r .Get (ctx , types.NamespacedName {Name : nodeName }, & node )
292+ err := checkTCPPortIsReachable (ipAddress , port )
257293 if err != nil {
258- return reconcile.Result {}, fmt .Errorf ("obtaining node %s: %w" , nodeName , err )
294+ log .WithField ("host" , ipAddress ).WithField ("port" , port ).WithField ("pod" , pod .Name ).WithError (err ).Error ("checking if TCP port is open" )
295+ return false , nil
259296 }
260297
261- // Check if taint exists
262- taintExists := false
263- for _ , taint := range node .Spec .Taints {
264- if taint .Key == taintKey {
265- taintExists = true
266- break
298+ if strings .HasPrefix (pod .Name , registryFacade ) {
299+ err = checkRegistryFacade (ipAddress , port )
300+ if err != nil {
301+ log .WithError (err ).Error ("checking registry-facade" )
302+ return false , nil
267303 }
268304 }
269305
270- if ! taintExists {
271- // nothing to do, the taint doesn't exist.
272- return reconcile.Result {}, nil
306+ return true , nil
307+ }
308+
309+ type NodeReconciler struct {
310+ client.Client
311+ }
312+
313+ func (r * NodeReconciler ) nodeFilter () predicate.Predicate {
314+ return predicate.Funcs {
315+ CreateFunc : func (e event.CreateEvent ) bool {
316+ node , ok := e .Object .(* corev1.Node )
317+ if ! ok {
318+ return false
319+ }
320+ return isWorkspaceNode (* node )
321+ },
322+ UpdateFunc : func (e event.UpdateEvent ) bool {
323+ return false
324+ },
325+ DeleteFunc : func (e event.DeleteEvent ) bool {
326+ return false
327+ },
273328 }
329+ }
274330
275- err = checkTCPPortIsReachable (ipAddress , port )
276- if err != nil {
277- log .WithField ("host" , ipAddress ).WithField ("port" , port ).WithField ("pod" , pod .Name ).WithError (err ).Error ("checking if TCP port is open" )
278- return reconcile.Result {RequeueAfter : defaultRequeueTime }, nil
331+ func (r * NodeReconciler ) reconcileAll (ctx context.Context ) error {
332+ log .Info ("start reconciling all nodes" )
333+
334+ var nodes corev1.NodeList
335+ if err := r .List (ctx , & nodes ); err != nil {
336+ return fmt .Errorf ("failed to list nodes: %w" , err )
279337 }
280338
281- if strings .HasPrefix (pod .Name , registryFacade ) {
282- err = checkRegistryFacade (ipAddress , port )
283- if err != nil {
284- log .WithError (err ).Error ("checking registry-facade" )
285- return reconcile.Result {RequeueAfter : defaultRequeueTime }, nil
339+ for _ , node := range nodes .Items {
340+ if node .Labels == nil {
341+ continue
342+ }
343+ if ! isWorkspaceNode (node ) {
344+ continue
286345 }
287346
288- time .Sleep (1 * time .Second )
347+ err := updateNodeLabel (node .Name , r .Client )
348+ if err != nil {
349+ log .WithError (err ).WithField ("node" , node .Name ).Error ("failed to initialize labels on node" )
350+ }
351+ r .Reconcile (ctx , reconcile.Request {NamespacedName : types.NamespacedName {Name : node .Name }})
289352 }
290353
291- err = updateNodeTaint (taintKey , false , nodeName , r )
354+ log .Info ("finished reconciling all nodes" )
355+ return nil
356+ }
357+
358+ func (r * NodeReconciler ) Reconcile (ctx context.Context , req reconcile.Request ) (reconcile.Result , error ) {
359+ var node corev1.Node
360+ err := r .Get (ctx , req .NamespacedName , & node )
292361 if err != nil {
293- log .WithError (err ).Error ("removing node taint" )
294- return reconcile.Result {}, fmt .Errorf ("trying to remove the taint: %v" , err )
362+ if ! errors .IsNotFound (err ) {
363+ log .WithError (err ).Error ("unable to fetch node" )
364+ }
365+ return ctrl.Result {}, client .IgnoreNotFound (err )
366+ }
367+ var podList corev1.PodList
368+ err = r .List (ctx , & podList , client.MatchingFields {
369+ "spec.nodeName" : node .Name ,
370+ })
371+ if err != nil {
372+ return reconcile.Result {}, fmt .Errorf ("listing pods: %w" , err )
373+ }
374+ isWsdaemonTaintExists := isNodeTaintExists (wsDaemonTaintKey , node )
375+ isRegistryFacadeTaintExists := isNodeTaintExists (registryFacadeTaintKey , node )
376+ isWsDaemonReady , isRegistryFacadeReady := false , false
377+ for _ , pod := range podList .Items {
378+ if strings .HasPrefix (pod .Name , wsDaemon ) {
379+ isWsDaemonReady = IsPodReady (pod )
380+ }
381+ if strings .HasPrefix (pod .Name , registryFacade ) {
382+ isRegistryFacadeReady = IsPodReady (pod )
383+ }
384+ }
385+ if isWsDaemonReady == isWsdaemonTaintExists {
386+ updateNodeTaint (wsDaemonTaintKey , isWsDaemonReady , node .Name , r )
387+ }
388+ if isRegistryFacadeReady == isRegistryFacadeTaintExists {
389+ updateNodeTaint (registryFacadeTaintKey , isRegistryFacadeReady , node .Name , r )
295390 }
296-
297- readyIn := time .Since (pod .Status .StartTime .Time )
298- NodeLabelerTimeHistVec .WithLabelValues (strings .Split (pod .Name , "-" )[0 ]).Observe (readyIn .Seconds ())
299- NodeLabelerCounterVec .WithLabelValues (strings .Split (pod .Name , "-" )[0 ]).Inc ()
300-
301391 return reconcile.Result {}, nil
302392}
303393
@@ -510,7 +600,10 @@ func updateNodeTaint(taintKey string, add bool, nodeName string, client client.C
510600 var node corev1.Node
511601 err := client .Get (ctx , types.NamespacedName {Name : nodeName }, & node )
512602 if err != nil {
513- return err
603+ if ! errors .IsNotFound (err ) {
604+ return err
605+ }
606+ return nil
514607 }
515608
516609 // Create or remove taint
@@ -554,6 +647,15 @@ func updateNodeTaint(taintKey string, add bool, nodeName string, client client.C
554647 })
555648}
556649
650+ func isNodeTaintExists (taintKey string , node corev1.Node ) bool {
651+ for _ , taint := range node .Spec .Taints {
652+ if taint .Key == taintKey {
653+ return true
654+ }
655+ }
656+ return false
657+ }
658+
557659func checkTCPPortIsReachable (host string , port string ) error {
558660 conn , err := net .DialTimeout ("tcp" , net .JoinHostPort (host , port ), 1 * time .Second )
559661 if err != nil {
@@ -611,31 +713,10 @@ func newDefaultTransport() *http.Transport {
611713 }
612714}
613715
614- func initializeLabels (ctx context.Context , kClient client.Client ) error {
615- log .Info ("initializing labels on nodes" )
616-
617- var nodes corev1.NodeList
618- if err := kClient .List (ctx , & nodes ); err != nil {
619- return fmt .Errorf ("failed to list nodes: %w" , err )
620- }
621-
622- for _ , node := range nodes .Items {
623- if node .Labels == nil {
624- continue
625- }
626- _ , isRegularWorkspaceNode := node .Labels [workspacesRegularLabel ]
627- _ , isHeadlessWorkspaceNode := node .Labels [workspacesHeadlessLabel ]
628-
629- if isRegularWorkspaceNode || isHeadlessWorkspaceNode {
630- err := updateNodeLabel (node .Name , kClient )
631- if err != nil {
632- log .WithError (err ).WithField ("node" , node .Name ).Error ("failed to initialize labels on node" )
633- }
634- }
635- }
636-
637- log .Info ("finished initializing labels on nodes" )
638- return nil
716+ func isWorkspaceNode (node corev1.Node ) bool {
717+ _ , isRegularWorkspaceNode := node .Labels [workspacesRegularLabel ]
718+ _ , isHeadlessWorkspaceNode := node .Labels [workspacesHeadlessLabel ]
719+ return isRegularWorkspaceNode || isHeadlessWorkspaceNode
639720}
640721
641722func updateNodeLabel (nodeName string , client client.Client ) error {
0 commit comments