@@ -222,8 +222,7 @@ type Controller struct {
222
222
// 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'.
223
223
taintNodeByCondition bool
224
224
225
- nodeUpdateChannels []chan * v1.Node
226
- nodeUpdateQueue workqueue.Interface
225
+ nodeUpdateQueue workqueue.Interface
227
226
}
228
227
229
228
// NewNodeLifecycleController returns a new taint controller.
@@ -350,11 +349,11 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer,
350
349
glog .Infof ("Controller will taint node by condition." )
351
350
nodeInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
352
351
AddFunc : nodeutil .CreateAddNodeHandler (func (node * v1.Node ) error {
353
- nc .nodeUpdateQueue .Add (node )
352
+ nc .nodeUpdateQueue .Add (node . Name )
354
353
return nil
355
354
}),
356
355
UpdateFunc : nodeutil .CreateUpdateNodeHandler (func (_ , newNode * v1.Node ) error {
357
- nc .nodeUpdateQueue .Add (newNode )
356
+ nc .nodeUpdateQueue .Add (newNode . Name )
358
357
return nil
359
358
}),
360
359
})
@@ -396,36 +395,16 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
396
395
}
397
396
398
397
if nc .taintNodeByCondition {
399
- for i := 0 ; i < scheduler .UpdateWorkerSize ; i ++ {
400
- nc .nodeUpdateChannels = append (nc .nodeUpdateChannels , make (chan * v1.Node , scheduler .NodeUpdateChannelSize ))
401
- }
402
-
403
- // Dispatcher
404
- go func (stopCh <- chan struct {}) {
405
- for {
406
- obj , shutdown := nc .nodeUpdateQueue .Get ()
407
- if shutdown {
408
- break
409
- }
410
-
411
- node := obj .(* v1.Node )
412
- hash := hash (node .Name , scheduler .UpdateWorkerSize )
413
-
414
- select {
415
- case <- stopCh :
416
- nc .nodeUpdateQueue .Done (node )
417
- return
418
- case nc .nodeUpdateChannels [hash ] <- node :
419
- }
420
- nc .nodeUpdateQueue .Done (node )
421
- }
422
- }(stopCh )
423
398
// Close node update queue to cleanup go routine.
424
399
defer nc .nodeUpdateQueue .ShutDown ()
425
400
426
401
// Start workers to update NoSchedule taint for nodes.
427
402
for i := 0 ; i < scheduler .UpdateWorkerSize ; i ++ {
428
- go nc .doNoScheduleTaintingPassWorker (i , stopCh )
403
+ // Thanks to "workqueue", each worker just need to get item from queue, because
404
+ // the item is flagged when got from queue: if new event come, the new item will
405
+ // be re-queued until "Done", so no more than one worker handle the same item and
406
+ // no event missed.
407
+ go wait .Until (nc .doNoScheduleTaintingPassWorker , time .Second , stopCh )
429
408
}
430
409
}
431
410
@@ -488,20 +467,34 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error {
488
467
return nil
489
468
}
490
469
491
- func (nc * Controller ) doNoScheduleTaintingPassWorker (i int , stopCh <- chan struct {} ) {
470
+ func (nc * Controller ) doNoScheduleTaintingPassWorker () {
492
471
for {
493
- select {
494
- case <- stopCh :
472
+ obj , shutdown := nc .nodeUpdateQueue .Get ()
473
+ // "nodeUpdateQueue" will be shutdown when "stopCh" closed;
474
+ // we do not need to re-check "stopCh" again.
475
+ if shutdown {
495
476
return
496
- case node := <- nc .nodeUpdateChannels [i ]:
497
- if err := nc .doNoScheduleTaintingPass (node ); err != nil {
498
- glog .Errorf ("Failed to taint NoSchedule on node <%s>: %v" , node .Name , err )
499
- }
500
477
}
478
+ nodeName := obj .(string )
479
+
480
+ if err := nc .doNoScheduleTaintingPass (nodeName ); err != nil {
481
+ // TODO (k82cn): Add nodeName back to the queue.
482
+ glog .Errorf ("Failed to taint NoSchedule on node <%s>, requeue it: %v" , nodeName , err )
483
+ }
484
+ nc .nodeUpdateQueue .Done (nodeName )
501
485
}
502
486
}
503
487
504
- func (nc * Controller ) doNoScheduleTaintingPass (node * v1.Node ) error {
488
+ func (nc * Controller ) doNoScheduleTaintingPass (nodeName string ) error {
489
+ node , err := nc .nodeLister .Get (nodeName )
490
+ if err != nil {
491
+ // If node not found, just ignore it.
492
+ if apierrors .IsNotFound (err ) {
493
+ return nil
494
+ }
495
+ return err
496
+ }
497
+
505
498
// Map node's condition to Taints.
506
499
var taints []v1.Taint
507
500
for _ , condition := range node .Status .Conditions {
0 commit comments