Skip to content
This repository was archived by the owner on Apr 17, 2019. It is now read-only.

Commit 80c9257

Browse files
authored
Merge pull request #1253 from aledbf/use-delayed-queue
[nginx-ingress-controller]: Use delayed queue
2 parents 45bdc24 + 556d300 commit 80c9257

File tree

3 files changed

+33
-26
lines changed

3 files changed

+33
-26
lines changed

ingress/controllers/nginx/controller.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -414,11 +414,10 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) (map[stri
414414
return namedPorts, nil
415415
}
416416

417-
func (lbc *loadBalancerController) sync(key string) {
417+
func (lbc *loadBalancerController) sync(key string) error {
418418
if !lbc.controllersInSync() {
419419
time.Sleep(podStoreSyncedPollPeriod)
420-
lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
421-
return
420+
return fmt.Errorf("deferring sync till endpoints controller has synced")
422421
}
423422

424423
var cfg *api.ConfigMap
@@ -435,29 +434,28 @@ func (lbc *loadBalancerController) sync(key string) {
435434
ings := lbc.ingLister.Store.List()
436435
upstreams, servers := lbc.getUpstreamServers(ngxConfig, ings)
437436

438-
lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
437+
return lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
439438
Upstreams: upstreams,
440439
Servers: servers,
441440
TCPUpstreams: lbc.getTCPServices(),
442441
UDPUpstreams: lbc.getUDPServices(),
443442
})
444443
}
445444

446-
func (lbc *loadBalancerController) updateIngressStatus(key string) {
445+
func (lbc *loadBalancerController) updateIngressStatus(key string) error {
447446
if !lbc.controllersInSync() {
448447
time.Sleep(podStoreSyncedPollPeriod)
449-
lbc.ingQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
450-
return
448+
return fmt.Errorf("deferring sync till endpoints controller has synced")
451449
}
452450

453451
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
454452
if err != nil {
455-
lbc.ingQueue.requeue(key, err)
456-
return
453+
return err
457454
}
458455

459456
if !ingExists {
460-
return
457+
// TODO: what's the correct behavior here?
458+
return nil
461459
}
462460

463461
ing := obj.(*extensions.Ingress)
@@ -466,8 +464,7 @@ func (lbc *loadBalancerController) updateIngressStatus(key string) {
466464

467465
currIng, err := ingClient.Get(ing.Name)
468466
if err != nil {
469-
glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
470-
return
467+
return fmt.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
471468
}
472469

473470
lbIPs := ing.Status.LoadBalancer.Ingress
@@ -478,11 +475,13 @@ func (lbc *loadBalancerController) updateIngressStatus(key string) {
478475
})
479476
if _, err := ingClient.UpdateStatus(currIng); err != nil {
480477
lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err)
481-
return
478+
return err
482479
}
483480

484481
lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", lbc.podInfo.NodeIP)
485482
}
483+
484+
return nil
486485
}
487486

488487
func (lbc *loadBalancerController) isStatusIPDefined(lbings []api.LoadBalancerIngress) bool {

ingress/controllers/nginx/nginx/command.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (ngx *Manager) Start() {
5656
// shut down, stop accepting new connections and continue to service current requests
5757
// until all such requests are serviced. After that, the old worker processes exit.
5858
// http://nginx.org/en/docs/beginners_guide.html#control
59-
func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressConfig) {
59+
func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressConfig) error {
6060
ngx.reloadRateLimiter.Accept()
6161

6262
ngx.reloadLock.Lock()
@@ -65,15 +65,18 @@ func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressC
6565
newCfg, err := ngx.writeCfg(cfg, ingressCfg)
6666

6767
if err != nil {
68-
glog.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err)
69-
return
68+
return fmt.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err)
7069
}
7170

7271
if newCfg {
73-
if err := ngx.shellOut("nginx -s reload"); err == nil {
74-
glog.Info("change in configuration detected. Reloading...")
72+
if err := ngx.shellOut("nginx -s reload"); err != nil {
73+
return fmt.Errorf("error reloading nginx: %v", err)
7574
}
75+
76+
glog.Info("change in configuration detected. Reloading...")
7677
}
78+
79+
return nil
7780
}
7881

7982
// shellOut executes a command and returns its combined standard output and standard

ingress/controllers/nginx/utils.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ type StoreToConfigmapLister struct {
5151
// invokes the given sync function for every work item inserted.
5252
type taskQueue struct {
5353
// queue is the work queue the worker polls
54-
queue *workqueue.Type
54+
queue workqueue.RateLimitingInterface
5555
// sync is called for each item in the queue
56-
sync func(string)
56+
sync func(string) error
5757
// workerDone is closed when the worker exits
5858
workerDone chan struct{}
5959
}
@@ -72,9 +72,8 @@ func (t *taskQueue) enqueue(obj interface{}) {
7272
t.queue.Add(key)
7373
}
7474

75-
func (t *taskQueue) requeue(key string, err error) {
76-
glog.V(3).Infof("requeuing %v, err %v", key, err)
77-
t.queue.Add(key)
75+
func (t *taskQueue) requeue(key string) {
76+
t.queue.AddRateLimited(key)
7877
}
7978

8079
// worker processes work in the queue through sync.
@@ -86,7 +85,13 @@ func (t *taskQueue) worker() {
8685
return
8786
}
8887
glog.V(3).Infof("syncing %v", key)
89-
t.sync(key.(string))
88+
if err := t.sync(key.(string)); err != nil {
89+
glog.V(3).Infof("requeuing %v, err %v", key, err)
90+
t.requeue(key.(string))
91+
} else {
92+
t.queue.Forget(key)
93+
}
94+
9095
t.queue.Done(key)
9196
}
9297
}
@@ -99,9 +104,9 @@ func (t *taskQueue) shutdown() {
99104

100105
// NewTaskQueue creates a new task queue with the given sync function.
101106
// The sync function is called for every element inserted into the queue.
102-
func NewTaskQueue(syncFn func(string)) *taskQueue {
107+
func NewTaskQueue(syncFn func(string) error) *taskQueue {
103108
return &taskQueue{
104-
queue: workqueue.New(),
109+
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
105110
sync: syncFn,
106111
workerDone: make(chan struct{}),
107112
}

0 commit comments

Comments
 (0)