Skip to content

Commit 2114f13

Browse files
Merge pull request #424 from wking/gracefully-release-leader-lease
Bug 1843505: pkg/start: Release leader lease on graceful shutdown
2 parents 00aa243 + 22f3553 commit 2114f13

File tree

7 files changed

+141
-122
lines changed

7 files changed

+141
-122
lines changed

bootstrap/bootstrap-pod.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ spec:
3737
fieldRef:
3838
fieldPath: spec.nodeName
3939
hostNetwork: true
40+
terminationGracePeriodSeconds: 130
4041
volumes:
4142
- name: kubeconfig
4243
hostPath:

cmd/start.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package main
22

33
import (
4+
"context"
5+
46
"github.com/spf13/cobra"
57
"k8s.io/klog"
68

@@ -16,11 +18,12 @@ func init() {
1618
Long: "",
1719
Run: func(cmd *cobra.Command, args []string) {
1820
// To help debugging, immediately log version
19-
klog.Infof("%s", version.String)
21+
klog.Info(version.String)
2022

21-
if err := opts.Run(); err != nil {
23+
if err := opts.Run(context.Background()); err != nil {
2224
klog.Fatalf("error: %v", err)
2325
}
26+
klog.Infof("Graceful shutdown complete for %s.", version.String)
2427
},
2528
}
2629

install/0000_00_cluster-version-operator_03_deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ spec:
5757
nodeSelector:
5858
node-role.kubernetes.io/master: ""
5959
priorityClassName: "system-cluster-critical"
60+
terminationGracePeriodSeconds: 130
6061
tolerations:
6162
- key: "node-role.kubernetes.io/master"
6263
operator: Exists

pkg/autoupdate/autoupdate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,23 +84,23 @@ func New(
8484
}
8585

8686
// Run runs the autoupdate controller.
87-
func (ctrl *Controller) Run(ctx context.Context, workers int) {
88-
defer utilruntime.HandleCrash()
87+
func (ctrl *Controller) Run(ctx context.Context, workers int) error {
8988
defer ctrl.queue.ShutDown()
9089

9190
klog.Info("Starting AutoUpdateController")
9291
defer klog.Info("Shutting down AutoUpdateController")
9392

9493
if !cache.WaitForCacheSync(ctx.Done(), ctrl.cacheSynced...) {
95-
klog.Info("Caches never synchronized")
96-
return
94+
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
9795
}
9896

9997
for i := 0; i < workers; i++ {
98+
// FIXME: actually wait until these complete if the Context is canceled. And possibly add utilruntime.HandleCrash.
10099
go wait.UntilWithContext(ctx, ctrl.worker, time.Second)
101100
}
102101

103102
<-ctx.Done()
103+
return nil
104104
}
105105

106106
func (ctrl *Controller) eventHandler() cache.ResourceEventHandler {

pkg/cvo/cvo.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
290290
}
291291

292292
// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
293-
func (optr *Operator) Run(ctx context.Context, workers int) {
294-
defer utilruntime.HandleCrash()
293+
func (optr *Operator) Run(ctx context.Context, workers int) error {
295294
defer optr.queue.ShutDown()
296295
stopCh := ctx.Done()
297296
workerStopCh := make(chan struct{})
@@ -300,8 +299,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
300299
defer klog.Info("Shutting down ClusterVersionOperator")
301300

302301
if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) {
303-
klog.Info("Caches never synchronized")
304-
return
302+
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
305303
}
306304

307305
// trigger the first cluster version reconcile always
@@ -330,6 +328,8 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
330328
// stop the queue, then wait for the worker to exit
331329
optr.queue.ShutDown()
332330
<-workerStopCh
331+
332+
return nil
333333
}
334334

335335
func (optr *Operator) queueKey() string {

pkg/start/start.go

Lines changed: 98 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ import (
1010
"math/rand"
1111
"os"
1212
"os/signal"
13-
"sync"
1413
"syscall"
1514
"time"
1615

1716
"github.com/google/uuid"
1817
v1 "k8s.io/api/core/v1"
1918
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2020
"k8s.io/client-go/informers"
2121
"k8s.io/client-go/kubernetes"
2222
"k8s.io/client-go/kubernetes/scheme"
@@ -77,6 +77,11 @@ type Options struct {
7777
ResyncInterval time.Duration
7878
}
7979

80+
type asyncResult struct {
81+
name string
82+
error error
83+
}
84+
8085
func defaultEnv(name, defaultValue string) string {
8186
env, ok := os.LookupEnv(name)
8287
if !ok {
@@ -101,7 +106,7 @@ func NewOptions() *Options {
101106
}
102107
}
103108

104-
func (o *Options) Run() error {
109+
func (o *Options) Run(ctx context.Context) error {
105110
if o.NodeName == "" {
106111
return fmt.Errorf("node-name is required")
107112
}
@@ -137,29 +142,6 @@ func (o *Options) Run() error {
137142
return err
138143
}
139144

140-
// TODO: Kube 1.14 will contain a ReleaseOnCancel boolean on
141-
// LeaderElectionConfig that allows us to have the lock code
142-
// release the lease when this context is cancelled. At that
143-
// time we can remove our changes to OnStartedLeading.
144-
ctx, cancel := context.WithCancel(context.Background())
145-
defer cancel()
146-
ch := make(chan os.Signal, 1)
147-
defer func() { signal.Stop(ch) }()
148-
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
149-
go func() {
150-
sig := <-ch
151-
klog.Infof("Shutting down due to %s", sig)
152-
cancel()
153-
154-
// exit after 2s no matter what
155-
select {
156-
case <-time.After(5 * time.Second):
157-
klog.Fatalf("Exiting")
158-
case <-ch:
159-
klog.Fatalf("Received shutdown signal twice, exiting")
160-
}
161-
}()
162-
163145
o.run(ctx, controllerCtx, lock)
164146
return nil
165147
}
@@ -186,13 +168,33 @@ func (o *Options) makeTLSConfig() (*tls.Config, error) {
186168
}), nil
187169
}
188170

171+
// run launches a number of goroutines to handle manifest application,
172+
// metrics serving, etc. It continues operating until ctx.Done(),
173+
// and then attempts a clean shutdown limited by an internal context
174+
// with a two-minute cap. It returns after it successfully collects all
175+
// launched goroutines.
189176
func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) {
190-
runContext, runCancel := context.WithCancel(ctx)
177+
runContext, runCancel := context.WithCancel(ctx) // so we can cancel internally on errors or TERM
191178
defer runCancel()
192-
shutdownContext, shutdownCancel := context.WithCancel(ctx)
179+
shutdownContext, shutdownCancel := context.WithCancel(context.Background()) // extends beyond ctx
193180
defer shutdownCancel()
194-
errorChannel := make(chan error, 1)
195-
errorChannelCount := 0
181+
postMainContext, postMainCancel := context.WithCancel(context.Background()) // extends beyond ctx
182+
defer postMainCancel()
183+
184+
ch := make(chan os.Signal, 1)
185+
defer func() { signal.Stop(ch) }()
186+
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
187+
go func() {
188+
defer utilruntime.HandleCrash()
189+
sig := <-ch
190+
klog.Infof("Shutting down due to %s", sig)
191+
runCancel()
192+
sig = <-ch
193+
klog.Fatalf("Received shutdown signal twice, exiting: %s", sig)
194+
}()
195+
196+
resultChannel := make(chan asyncResult, 1)
197+
resultChannelCount := 0
196198
if o.ListenAddr != "" {
197199
var tlsConfig *tls.Config
198200
if o.ServingCertFile != "" || o.ServingKeyFile != "" {
@@ -202,85 +204,96 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
202204
klog.Fatalf("Failed to create TLS config: %v", err)
203205
}
204206
}
205-
errorChannelCount++
207+
resultChannelCount++
206208
go func() {
207-
errorChannel <- cvo.RunMetrics(runContext, shutdownContext, o.ListenAddr, tlsConfig)
209+
defer utilruntime.HandleCrash()
210+
err := cvo.RunMetrics(postMainContext, shutdownContext, o.ListenAddr, tlsConfig)
211+
resultChannel <- asyncResult{name: "metrics server", error: err}
208212
}()
209213
}
210214

211-
exit := make(chan struct{})
212-
exitClose := sync.Once{}
213-
214-
// TODO: when we switch to graceful lock shutdown, this can be
215-
// moved back inside RunOrDie
216-
// TODO: properly wire ctx here
217-
go leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
218-
Lock: lock,
219-
LeaseDuration: leaseDuration,
220-
RenewDeadline: renewDeadline,
221-
RetryPeriod: retryPeriod,
222-
Callbacks: leaderelection.LeaderCallbacks{
223-
OnStartedLeading: func(localCtx context.Context) {
224-
controllerCtx.Start(runContext)
225-
select {
226-
case <-runContext.Done():
227-
// WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel
228-
// and client-go ContextCancelable, which allows us to block new API requests before
229-
// we step down. However, the CVO isn't that sensitive to races and can tolerate
230-
// brief overlap.
231-
klog.Infof("Stepping down as leader")
232-
// give the controllers some time to shut down
233-
time.Sleep(100 * time.Millisecond)
234-
// if we still hold the leader lease, clear the owner identity (other lease watchers
235-
// still have to wait for expiration) like the new ReleaseOnCancel code will do.
236-
if err := lock.Update(localCtx, resourcelock.LeaderElectionRecord{}); err == nil {
237-
// if we successfully clear the owner identity, we can safely delete the record
238-
if err := lock.Client.ConfigMaps(lock.ConfigMapMeta.Namespace).Delete(localCtx, lock.ConfigMapMeta.Name, metav1.DeleteOptions{}); err != nil {
239-
klog.Warningf("Unable to step down cleanly: %v", err)
240-
}
215+
informersDone := postMainContext.Done()
216+
// FIXME: would be nice if there was a way to collect these.
217+
controllerCtx.CVInformerFactory.Start(informersDone)
218+
controllerCtx.OpenshiftConfigInformerFactory.Start(informersDone)
219+
controllerCtx.OpenshiftConfigManagedInformerFactory.Start(informersDone)
220+
controllerCtx.InformerFactory.Start(informersDone)
221+
222+
resultChannelCount++
223+
go func() {
224+
defer utilruntime.HandleCrash()
225+
leaderelection.RunOrDie(postMainContext, leaderelection.LeaderElectionConfig{
226+
Lock: lock,
227+
ReleaseOnCancel: true,
228+
LeaseDuration: leaseDuration,
229+
RenewDeadline: renewDeadline,
230+
RetryPeriod: retryPeriod,
231+
Callbacks: leaderelection.LeaderCallbacks{
232+
OnStartedLeading: func(_ context.Context) { // no need for this passed-through postMainContext, because goroutines we launch inside will use runContext
233+
resultChannelCount++
234+
go func() {
235+
defer utilruntime.HandleCrash()
236+
err := controllerCtx.CVO.Run(runContext, 2)
237+
resultChannel <- asyncResult{name: "main operator", error: err}
238+
}()
239+
240+
if controllerCtx.AutoUpdate != nil {
241+
resultChannelCount++
242+
go func() {
243+
defer utilruntime.HandleCrash()
244+
err := controllerCtx.AutoUpdate.Run(runContext, 2)
245+
resultChannel <- asyncResult{name: "auto-update controller", error: err}
246+
}()
241247
}
242-
klog.Infof("Finished shutdown")
243-
exitClose.Do(func() { close(exit) })
244-
case <-localCtx.Done():
245-
// we will exit in OnStoppedLeading
246-
}
247-
},
248-
OnStoppedLeading: func() {
249-
klog.Warning("leaderelection lost")
250-
exitClose.Do(func() { close(exit) })
248+
},
249+
OnStoppedLeading: func() {
250+
klog.Info("Stopped leading; shutting down.")
251+
runCancel()
252+
},
251253
},
252-
},
253-
})
254+
})
255+
resultChannel <- asyncResult{name: "leader controller", error: nil}
256+
}()
254257

255-
for errorChannelCount > 0 {
256-
var shutdownTimer *time.Timer
258+
var shutdownTimer *time.Timer
259+
for resultChannelCount > 0 {
260+
klog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount)
257261
if shutdownTimer == nil { // running
258262
select {
259263
case <-runContext.Done():
264+
klog.Info("Run context completed; beginning two-minute graceful shutdown period.")
260265
shutdownTimer = time.NewTimer(2 * time.Minute)
261-
case err := <-errorChannel:
262-
errorChannelCount--
263-
if err != nil {
264-
klog.Error(err)
266+
case result := <-resultChannel:
267+
resultChannelCount--
268+
if result.error == nil {
269+
klog.Infof("Collected %s goroutine.", result.name)
270+
} else {
271+
klog.Errorf("Collected %s goroutine: %v", result.name, result.error)
265272
runCancel() // this will cause shutdownTimer initialization in the next loop
266273
}
274+
if result.name == "main operator" {
275+
postMainCancel()
276+
}
267277
}
268278
} else { // shutting down
269279
select {
270280
case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing.
271281
shutdownCancel()
272282
shutdownTimer.Stop()
273-
case err := <-errorChannel:
274-
errorChannelCount--
275-
if err != nil {
276-
klog.Error(err)
277-
runCancel()
283+
case result := <-resultChannel:
284+
resultChannelCount--
285+
if result.error == nil {
286+
klog.Infof("Collected %s goroutine.", result.name)
287+
} else {
288+
klog.Errorf("Collected %s goroutine: %v", result.name, result.error)
289+
}
290+
if result.name == "main operator" {
291+
postMainCancel()
278292
}
279293
}
280294
}
281295
}
282-
283-
<-exit
296+
klog.Info("Finished collecting operator goroutines.")
284297
}
285298

286299
// createResourceLock initializes the lock.
@@ -440,17 +453,3 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context {
440453
}
441454
return ctx
442455
}
443-
444-
// Start launches the controllers in the provided context and any supporting
445-
// infrastructure. When ch is closed the controllers will be shut down.
446-
func (c *Context) Start(ctx context.Context) {
447-
ch := ctx.Done()
448-
go c.CVO.Run(ctx, 2)
449-
if c.AutoUpdate != nil {
450-
go c.AutoUpdate.Run(ctx, 2)
451-
}
452-
c.CVInformerFactory.Start(ch)
453-
c.OpenshiftConfigInformerFactory.Start(ch)
454-
c.OpenshiftConfigManagedInformerFactory.Start(ch)
455-
c.InformerFactory.Start(ch)
456-
}

0 commit comments

Comments
 (0)