@@ -39,6 +39,7 @@ import (
3939 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4040 "k8s.io/apimachinery/pkg/runtime"
4141 "k8s.io/apimachinery/pkg/util/wait"
42+ server "k8s.io/apiserver/pkg/server"
4243
4344 klog "k8s.io/klog/v2"
4445
@@ -259,18 +260,55 @@ func main() {
259260 os .Exit (1 )
260261 }
261262
262- run := func (context.Context ) {
263- // run...
264- stopCh := make (chan struct {})
265- factory .Start (stopCh )
266- coreFactory .Start (stopCh )
267- go ctrl .Run (* threads , stopCh )
268-
269- // ...until SIGINT
270- c := make (chan os.Signal , 1 )
271- signal .Notify (c , os .Interrupt )
272- <- c
273- close (stopCh )
263+ ctx := context .Background ()
264+
265+ // handle SIGTERM and SIGINT by cancelling the context.
266+ var (
267+ terminate func () // called when all controllers are finished
268+ controllerCtx context.Context // shuts down all controllers on a signal
269+ shutdownHandler <- chan struct {} // called when the signal is received
270+ )
271+
272+ if utilfeature .DefaultFeatureGate .Enabled (features .ReleaseLeaderElectionOnExit ) {
273+ // ctx waits for all controllers to finish, then shuts down the whole process, incl. leader election
274+ ctx , terminate = context .WithCancel (ctx )
275+ var cancelControllerCtx context.CancelFunc
276+ controllerCtx , cancelControllerCtx = context .WithCancel (ctx )
277+ shutdownHandler = server .SetupSignalHandler ()
278+
279+ defer terminate ()
280+
281+ go func () {
282+ defer cancelControllerCtx ()
283+ <- shutdownHandler
284+ klog .Info ("Received SIGTERM or SIGINT signal, shutting down controller." )
285+ }()
286+ }
287+
288+ run := func (ctx context.Context ) {
289+ if utilfeature .DefaultFeatureGate .Enabled (features .ReleaseLeaderElectionOnExit ) {
290+ // run...
291+ stopCh := controllerCtx .Done ()
292+ factory .Start (stopCh )
293+ coreFactory .Start (stopCh )
294+ var controllerWg sync.WaitGroup
295+ go ctrl .Run (* threads , stopCh , & controllerWg )
296+ <- shutdownHandler
297+ controllerWg .Wait ()
298+ terminate ()
299+ } else {
300+ // run...
301+ stopCh := make (chan struct {})
302+ factory .Start (stopCh )
303+ coreFactory .Start (stopCh )
304+ go ctrl .Run (* threads , stopCh , nil )
305+
306+ // ...until SIGINT
307+ c := make (chan os.Signal , 1 )
308+ signal .Notify (c , os .Interrupt )
309+ <- c
310+ close (stopCh )
311+ }
274312 }
275313
276314 // start listening & serving http endpoint if set
@@ -289,7 +327,7 @@ func main() {
289327 klog .Infof ("Metrics http server successfully started on %s, %s" , * httpEndpoint , * metricsPath )
290328
291329 defer func () {
292- err := srv .Shutdown (context . Background () )
330+ err := srv .Shutdown (ctx )
293331 if err != nil {
294332 klog .Errorf ("Failed to shutdown metrics server: %s" , err .Error ())
295333 }
@@ -300,7 +338,7 @@ func main() {
300338 }
301339
302340 if ! * leaderElection {
303- run (context . TODO () )
341+ run (ctx )
304342 } else {
305343 lockName := "snapshot-controller-leader"
306344 // Create a new clientset for leader election to prevent throttling
@@ -320,6 +358,11 @@ func main() {
320358 le .WithLeaseDuration (* leaderElectionLeaseDuration )
321359 le .WithRenewDeadline (* leaderElectionRenewDeadline )
322360 le .WithRetryPeriod (* leaderElectionRetryPeriod )
361+ if utilfeature .DefaultFeatureGate .Enabled (features .ReleaseLeaderElectionOnExit ) {
362+ le .WithReleaseOnCancel (true )
363+ le .WithContext (ctx )
364+ }
365+
323366 if err := le .Run (); err != nil {
324367 klog .Fatalf ("failed to initialize leader election: %v" , err )
325368 }
0 commit comments