Skip to content

Commit cc1921d

Browse files
committed
pkg/start: Release leader lease on graceful shutdown
So the incoming cluster-version operator doesn't need to wait for the outgoing operator's lease to expire, which can take a while [1]: I0802 10:06:01.056591 1 leaderelection.go:243] attempting to acquire leader lease openshift-cluster-version/version... ... I0802 10:07:42.632719 1 leaderelection.go:253] successfully acquired lease openshift-cluster-version/version and time out the: Cluster did not acknowledge request to upgrade in a reasonable time testcase [2]. Using ReleaseOnCancel has been the plan since 2b81f47 (cvo: Release our leader lease when we are gracefully terminated, 2019-01-16, #87). I'm not clear on why it (sometimes?) doesn't work today. The discrepancy between the "exit after 2s no matter what" comment and the 5s After dates back to dbedb7a (cvo: When the CVO restarts, perform one final sync to write status, 2019-04-27, #179), which bumped the After from 2s to 5s, but forgot to bump the comment. I'm removing that code here in favor of the two-minute timeout from b30aa0e (pkg/cvo/metrics: Graceful server shutdown, 2020-04-15, #349). We still exit immediately on a second TERM, for folks who get impatient waiting for the graceful timeout. Decouple shutdownContext from the context passed into Options.run, to allow TestIntegrationCVO_gracefulStepDown to request a graceful shutdown. And remove Context.Start(), inlining the logic in Options.run so we can count and reap the goroutines it used to launch. This also allows us to be more targeted with the context for each goroutines: * Informers are now launched before the lease controller, so they're up and running by the time we acquire the lease. They remain running until the main operator CVO.Run() exits, after which we shut them down. Having informers running before we have a lease is somewhat expensive in terms of API traffic, but we should rarely have two CVO pods competing for leadership since we transitioned to the Recreate Deployment strategy in 078686d (install/0000_00_cluster-version-operator_03_deployment: Set 'strategy: Recreate', 2019-03-20, #140) and 5d8a527 (install/0000_00_cluster-version-operator_03_deployment: Fix Recreate strategy, 2019-04-03, #155). I don't see a way to block on their internal goroutine's completion, but maybe informers will grow an API for that in the future. * The metrics server also continues to run until CVO.Run() exits, where previously we began gracefully shutting it down at the same time we started shutting down CVO.Run(). This ensures we are around and publishing any last-minute CVO.Run() changes. * Leader election also continues to run until CVO.Run() exits. We don't want to release the lease while we're still controlling things. * CVO.Run() and AutoUpdate.Run() both stop immediately when the passed-in context is canceled or we call runCancel internally (because of a TERM, error from a goroutine, or loss of leadership). These are the only two goroutines that are actually writing to the API servers, so we want to shut them down as quickly as possible. Drop an unnecessary runCancel() from the "shutting down" branch of the error collector. I'd added it in b30aa0e, but you can only ever get into the "shutting down" branch if runCancel has already been called. And fix the scoping for the shutdownTimer variable so we don't clear it on each for-loop iteration (oops :p, bug from b30aa0e). Add some logging to the error collector, so it's easier to see where we are in the collection process from the operator logs. Also start logging collected goroutines by name, so we can figure out which may still be outstanding. Set terminationGracePeriodSeconds 130 to extend the default 30s [3], to give the container the full two-minute graceful timeout window before the kubelet steps in with a KILL. Push the Background() initialization all the way up to the command-line handler, to make it more obvious that the context is scoped to the whole 'start' invocation. [1]: https://storage.googleapis.com/origin-ci-test/pr-logs/pull/25365/pull-ci-openshift-origin-master-e2e-gcp-upgrade/1289853267223777280/artifacts/e2e-gcp-upgrade/pods/openshift-cluster-version_cluster-version-operator-5b6ff896c6-57ppb_cluster-version-operator.log [2]: https://bugzilla.redhat.com/show_bug.cgi?id=1843505#c7 [3]: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#podspec-v1-core squash! pkg/start: Release leader lease on graceful shutdown
1 parent bcf385b commit cc1921d

File tree

7 files changed

+125
-110
lines changed

7 files changed

+125
-110
lines changed

bootstrap/bootstrap-pod.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ spec:
3737
fieldRef:
3838
fieldPath: spec.nodeName
3939
hostNetwork: true
40+
terminationGracePeriodSeconds: 130
4041
volumes:
4142
- name: kubeconfig
4243
hostPath:

cmd/start.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package main
22

33
import (
4+
"context"
5+
46
"github.com/spf13/cobra"
57
"k8s.io/klog"
68

@@ -18,9 +20,10 @@ func init() {
1820
// To help debugging, immediately log version
1921
klog.Infof("%s", version.String)
2022

21-
if err := opts.Run(); err != nil {
23+
if err := opts.Run(context.Background()); err != nil {
2224
klog.Fatalf("error: %v", err)
2325
}
26+
klog.Info("Graceful shutdown complete.")
2427
},
2528
}
2629

install/0000_00_cluster-version-operator_03_deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ spec:
5757
nodeSelector:
5858
node-role.kubernetes.io/master: ""
5959
priorityClassName: "system-cluster-critical"
60+
terminationGracePeriodSeconds: 130
6061
tolerations:
6162
- key: "node-role.kubernetes.io/master"
6263
operator: Exists

pkg/autoupdate/autoupdate.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,23 +84,24 @@ func New(
8484
}
8585

8686
// Run runs the autoupdate controller.
87-
func (ctrl *Controller) Run(ctx context.Context, workers int) {
87+
func (ctrl *Controller) Run(ctx context.Context, workers int) error {
8888
defer utilruntime.HandleCrash()
8989
defer ctrl.queue.ShutDown()
9090

9191
klog.Info("Starting AutoUpdateController")
9292
defer klog.Info("Shutting down AutoUpdateController")
9393

9494
if !cache.WaitForCacheSync(ctx.Done(), ctrl.cacheSynced...) {
95-
klog.Info("Caches never synchronized")
96-
return
95+
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
9796
}
9897

9998
for i := 0; i < workers; i++ {
99+
// FIXME: actually wait until these complete if the Context is canceled.
100100
go wait.UntilWithContext(ctx, ctrl.worker, time.Second)
101101
}
102102

103103
<-ctx.Done()
104+
return nil
104105
}
105106

106107
func (ctrl *Controller) eventHandler() cache.ResourceEventHandler {

pkg/cvo/cvo.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
290290
}
291291

292292
// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
293-
func (optr *Operator) Run(ctx context.Context, workers int) {
293+
func (optr *Operator) Run(ctx context.Context, workers int) error {
294294
defer utilruntime.HandleCrash()
295295
defer optr.queue.ShutDown()
296296
stopCh := ctx.Done()
@@ -300,8 +300,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
300300
defer klog.Info("Shutting down ClusterVersionOperator")
301301

302302
if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) {
303-
klog.Info("Caches never synchronized")
304-
return
303+
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
305304
}
306305

307306
// trigger the first cluster version reconcile always
@@ -330,6 +329,8 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
330329
// stop the queue, then wait for the worker to exit
331330
optr.queue.ShutDown()
332331
<-workerStopCh
332+
333+
return nil
333334
}
334335

335336
func (optr *Operator) queueKey() string {

pkg/start/start.go

Lines changed: 92 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"math/rand"
1111
"os"
1212
"os/signal"
13-
"sync"
1413
"syscall"
1514
"time"
1615

@@ -77,6 +76,11 @@ type Options struct {
7776
ResyncInterval time.Duration
7877
}
7978

79+
type asyncResult struct {
80+
name string
81+
error error
82+
}
83+
8084
func defaultEnv(name, defaultValue string) string {
8185
env, ok := os.LookupEnv(name)
8286
if !ok {
@@ -101,7 +105,7 @@ func NewOptions() *Options {
101105
}
102106
}
103107

104-
func (o *Options) Run() error {
108+
func (o *Options) Run(ctx context.Context) error {
105109
if o.NodeName == "" {
106110
return fmt.Errorf("node-name is required")
107111
}
@@ -137,29 +141,6 @@ func (o *Options) Run() error {
137141
return err
138142
}
139143

140-
// TODO: Kube 1.14 will contain a ReleaseOnCancel boolean on
141-
// LeaderElectionConfig that allows us to have the lock code
142-
// release the lease when this context is cancelled. At that
143-
// time we can remove our changes to OnStartedLeading.
144-
ctx, cancel := context.WithCancel(context.Background())
145-
defer cancel()
146-
ch := make(chan os.Signal, 1)
147-
defer func() { signal.Stop(ch) }()
148-
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
149-
go func() {
150-
sig := <-ch
151-
klog.Infof("Shutting down due to %s", sig)
152-
cancel()
153-
154-
// exit after 2s no matter what
155-
select {
156-
case <-time.After(5 * time.Second):
157-
klog.Fatalf("Exiting")
158-
case <-ch:
159-
klog.Fatalf("Received shutdown signal twice, exiting")
160-
}
161-
}()
162-
163144
o.run(ctx, controllerCtx, lock)
164145
return nil
165146
}
@@ -186,13 +167,32 @@ func (o *Options) makeTLSConfig() (*tls.Config, error) {
186167
}), nil
187168
}
188169

170+
// run launches a number of goroutines to handle manifest application,
171+
// metrics serving, etc. It continues operating until ctx.Done(),
172+
// and then attempts a clean shutdown limited by an internal context
173+
// with a two-minute cap. It returns after it successfully collects all
174+
// launched goroutines.
189175
func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) {
190-
runContext, runCancel := context.WithCancel(ctx)
176+
runContext, runCancel := context.WithCancel(ctx) // so we can cancel internally on errors or TERM
191177
defer runCancel()
192-
shutdownContext, shutdownCancel := context.WithCancel(ctx)
178+
shutdownContext, shutdownCancel := context.WithCancel(context.Background()) // extends beyond ctx
193179
defer shutdownCancel()
194-
errorChannel := make(chan error, 1)
195-
errorChannelCount := 0
180+
postMainContext, postMainCancel := context.WithCancel(context.Background()) // extends beyond ctx
181+
defer postMainCancel()
182+
183+
ch := make(chan os.Signal, 1)
184+
defer func() { signal.Stop(ch) }()
185+
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
186+
go func() {
187+
sig := <-ch
188+
klog.Infof("Shutting down due to %s", sig)
189+
runCancel()
190+
sig = <-ch
191+
klog.Fatalf("Received shutdown signal twice, exiting: %s", sig)
192+
}()
193+
194+
resultChannel := make(chan asyncResult, 1)
195+
resultChannelCount := 0
196196
if o.ListenAddr != "" {
197197
var tlsConfig *tls.Config
198198
if o.ServingCertFile != "" || o.ServingKeyFile != "" {
@@ -202,85 +202,92 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
202202
klog.Fatalf("Failed to create TLS config: %v", err)
203203
}
204204
}
205-
errorChannelCount++
205+
resultChannelCount++
206206
go func() {
207-
errorChannel <- cvo.RunMetrics(runContext, shutdownContext, o.ListenAddr, tlsConfig)
207+
err := cvo.RunMetrics(postMainContext, shutdownContext, o.ListenAddr, tlsConfig)
208+
resultChannel <- asyncResult{name: "metrics server", error: err}
208209
}()
209210
}
210211

211-
exit := make(chan struct{})
212-
exitClose := sync.Once{}
213-
214-
// TODO: when we switch to graceful lock shutdown, this can be
215-
// moved back inside RunOrDie
216-
// TODO: properly wire ctx here
217-
go leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
218-
Lock: lock,
219-
LeaseDuration: leaseDuration,
220-
RenewDeadline: renewDeadline,
221-
RetryPeriod: retryPeriod,
222-
Callbacks: leaderelection.LeaderCallbacks{
223-
OnStartedLeading: func(localCtx context.Context) {
224-
controllerCtx.Start(runContext)
225-
select {
226-
case <-runContext.Done():
227-
// WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel
228-
// and client-go ContextCancelable, which allows us to block new API requests before
229-
// we step down. However, the CVO isn't that sensitive to races and can tolerate
230-
// brief overlap.
231-
klog.Infof("Stepping down as leader")
232-
// give the controllers some time to shut down
233-
time.Sleep(100 * time.Millisecond)
234-
// if we still hold the leader lease, clear the owner identity (other lease watchers
235-
// still have to wait for expiration) like the new ReleaseOnCancel code will do.
236-
if err := lock.Update(localCtx, resourcelock.LeaderElectionRecord{}); err == nil {
237-
// if we successfully clear the owner identity, we can safely delete the record
238-
if err := lock.Client.ConfigMaps(lock.ConfigMapMeta.Namespace).Delete(localCtx, lock.ConfigMapMeta.Name, metav1.DeleteOptions{}); err != nil {
239-
klog.Warningf("Unable to step down cleanly: %v", err)
240-
}
212+
informersDone := postMainContext.Done()
213+
// FIXME: would be nice if there was a way to collect these.
214+
controllerCtx.CVInformerFactory.Start(informersDone)
215+
controllerCtx.OpenshiftConfigInformerFactory.Start(informersDone)
216+
controllerCtx.OpenshiftConfigManagedInformerFactory.Start(informersDone)
217+
controllerCtx.InformerFactory.Start(informersDone)
218+
219+
resultChannelCount++
220+
go func() {
221+
leaderelection.RunOrDie(postMainContext, leaderelection.LeaderElectionConfig{
222+
Lock: lock,
223+
ReleaseOnCancel: true,
224+
LeaseDuration: leaseDuration,
225+
RenewDeadline: renewDeadline,
226+
RetryPeriod: retryPeriod,
227+
Callbacks: leaderelection.LeaderCallbacks{
228+
OnStartedLeading: func(_ context.Context) { // no need for this passed-through postMainContext, because goroutines we launch inside will use runContext
229+
resultChannelCount++
230+
go func() {
231+
err := controllerCtx.CVO.Run(runContext, 2)
232+
resultChannel <- asyncResult{name: "main operator", error: err}
233+
}()
234+
235+
if controllerCtx.AutoUpdate != nil {
236+
resultChannelCount++
237+
go func() {
238+
err := controllerCtx.AutoUpdate.Run(runContext, 2)
239+
resultChannel <- asyncResult{name: "auto-update controller", error: err}
240+
}()
241241
}
242-
klog.Infof("Finished shutdown")
243-
exitClose.Do(func() { close(exit) })
244-
case <-localCtx.Done():
245-
// we will exit in OnStoppedLeading
246-
}
247-
},
248-
OnStoppedLeading: func() {
249-
klog.Warning("leaderelection lost")
250-
exitClose.Do(func() { close(exit) })
242+
},
243+
OnStoppedLeading: func() {
244+
klog.Info("Stopped leading; shutting down.")
245+
runCancel()
246+
},
251247
},
252-
},
253-
})
248+
})
249+
resultChannel <- asyncResult{name: "leader controller", error: nil}
250+
}()
254251

255-
for errorChannelCount > 0 {
256-
var shutdownTimer *time.Timer
252+
var shutdownTimer *time.Timer
253+
for resultChannelCount > 0 {
254+
klog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount)
257255
if shutdownTimer == nil { // running
258256
select {
259257
case <-runContext.Done():
258+
klog.Info("Run context completed; beginning two-minute graceful shutdown period.")
260259
shutdownTimer = time.NewTimer(2 * time.Minute)
261-
case err := <-errorChannel:
262-
errorChannelCount--
263-
if err != nil {
264-
klog.Error(err)
260+
case result := <-resultChannel:
261+
resultChannelCount--
262+
if result.error == nil {
263+
klog.Infof("Collected %s goroutine.", result.name)
264+
} else {
265+
klog.Errorf("Collected %s goroutine: %v", result.name, result.error)
265266
runCancel() // this will cause shutdownTimer initialization in the next loop
266267
}
268+
if result.name == "main operator" {
269+
postMainCancel()
270+
}
267271
}
268272
} else { // shutting down
269273
select {
270274
case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing.
271275
shutdownCancel()
272276
shutdownTimer.Stop()
273-
case err := <-errorChannel:
274-
errorChannelCount--
275-
if err != nil {
276-
klog.Error(err)
277-
runCancel()
277+
case result := <-resultChannel:
278+
resultChannelCount--
279+
if result.error == nil {
280+
klog.Infof("Collected %s goroutine.", result.name)
281+
} else {
282+
klog.Errorf("Collected %s goroutine: %v", result.name, result.error)
283+
}
284+
if result.name == "main operator" {
285+
postMainCancel()
278286
}
279287
}
280288
}
281289
}
282-
283-
<-exit
290+
klog.Info("Finished collecting operator goroutines.")
284291
}
285292

286293
// createResourceLock initializes the lock.
@@ -440,17 +447,3 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context {
440447
}
441448
return ctx
442449
}
443-
444-
// Start launches the controllers in the provided context and any supporting
445-
// infrastructure. When ch is closed the controllers will be shut down.
446-
func (c *Context) Start(ctx context.Context) {
447-
ch := ctx.Done()
448-
go c.CVO.Run(ctx, 2)
449-
if c.AutoUpdate != nil {
450-
go c.AutoUpdate.Run(ctx, 2)
451-
}
452-
c.CVInformerFactory.Start(ch)
453-
c.OpenshiftConfigInformerFactory.Start(ch)
454-
c.OpenshiftConfigManagedInformerFactory.Start(ch)
455-
c.InformerFactory.Start(ch)
456-
}

0 commit comments

Comments
 (0)