@@ -9,8 +9,10 @@ import (
99 "fmt"
1010 "net/http"
1111 "os"
12+ "os/signal"
1213 "strconv"
1314 "strings"
15+ "syscall"
1416 "time"
1517 "unicode"
1618
@@ -58,8 +60,8 @@ func initLogging() {
5860
5961//+kubebuilder:rbac:groups="coordination.k8s.io",resources="leases",verbs={get,create,update,watch}
6062
61- func initManager () (runtime.Options , error ) {
62- log := logging .FromContext (context . Background () )
63+ func initManager (ctx context. Context ) (runtime.Options , error ) {
64+ log := logging .FromContext (ctx )
6365
6466 options := runtime.Options {}
6567 options .Cache .SyncPeriod = initialize .Pointer (time .Hour )
@@ -120,45 +122,63 @@ func initManager() (runtime.Options, error) {
120122}
121123
122124func main () {
123- // This context is canceled by SIGINT, SIGTERM, or by calling shutdown.
124- ctx , shutdown := context .WithCancel (runtime .SignalHandler ())
125-
126- otelFlush , err := initOpenTelemetry ()
127- assertNoError (err )
128- defer otelFlush ()
125+ running , stopRunning := context .WithCancel (context .Background ())
126+ defer stopRunning ()
129127
130128 initLogging ()
131-
132- log := logging .FromContext (ctx )
129+ log := logging .FromContext (running )
133130 log .V (1 ).Info ("debug flag set to true" )
134131
132+ // Start a goroutine that waits for SIGINT or SIGTERM.
133+ {
134+ signals := []os.Signal {os .Interrupt , syscall .SIGTERM }
135+ receive := make (chan os.Signal , len (signals ))
136+ signal .Notify (receive , signals ... )
137+ go func () {
138+ // Wait for a signal then immediately restore the default signal handlers.
139+ // After this, a SIGHUP, SIGINT, or SIGTERM causes the program to exit.
140+ // - https://pkg.go.dev/os/signal#hdr-Default_behavior_of_signals_in_Go_programs
141+ s := <- receive
142+ signal .Stop (receive )
143+
144+ log .Info ("received signal from OS" , "signal" , s .String ())
145+ stopRunning ()
146+ }()
147+ }
148+
135149 features := feature .NewGate ()
136150 assertNoError (features .Set (os .Getenv ("PGO_FEATURE_GATES" )))
137151
138- ctx = feature .NewContext (ctx , features )
152+ running = feature .NewContext (running , features )
139153 log .Info ("feature gates" ,
140154 // These are set by the user
141- "PGO_FEATURE_GATES" , feature .ShowAssigned (ctx ),
155+ "PGO_FEATURE_GATES" , feature .ShowAssigned (running ),
142156 // These are enabled, including features that are on by default
143- "enabled" , feature .ShowEnabled (ctx ))
157+ "enabled" , feature .ShowEnabled (running ))
158+
159+ // Initialize OpenTelemetry and flush data when there is a panic.
160+ otelFinish , err := initOpenTelemetry (running )
161+ assertNoError (err )
162+ defer otelFinish (running )
144163
145164 cfg , err := runtime .GetConfig ()
146165 assertNoError (err )
147166
148167 cfg .Wrap (otelTransportWrapper ())
149168
169+ // TODO(controller-runtime): Set config.WarningHandler instead after v0.19.0.
150170 // Configure client-go to suppress warnings when warning headers are encountered. This prevents
151171 // warnings from being logged over and over again during reconciliation (e.g. this will suppress
152172 // deprecation warnings when using an older version of a resource for backwards compatibility).
153173 rest .SetDefaultWarningHandler (rest.NoWarnings {})
154174
155175 k8s , err := kubernetes .NewDiscoveryRunner (cfg )
156176 assertNoError (err )
157- assertNoError (k8s .Read (ctx ))
177+ assertNoError (k8s .Read (running ))
158178
159- log .Info ("Connected to Kubernetes" , "api" , k8s .Version ().String (), "openshift" , k8s .IsOpenShift ())
179+ log .Info ("connected to Kubernetes" , "api" , k8s .Version ().String (), "openshift" , k8s .IsOpenShift ())
160180
161- options , err := initManager ()
181+ options , err := initManager (running )
162182 assertNoError (err )
163183
164184 // Add to the Context that Manager passes to Reconciler.Start, Runnable.Start,
@@ -174,7 +194,7 @@ func main() {
174194 assertNoError (err )
175195 assertNoError (mgr .Add (k8s ))
176196
177- registrar , err := registration .NewRunner (os .Getenv ("RSA_KEY" ), os .Getenv ("TOKEN_PATH" ), shutdown )
197+ registrar , err := registration .NewRunner (os .Getenv ("RSA_KEY" ), os .Getenv ("TOKEN_PATH" ), stopRunning )
178198 assertNoError (err )
179199 assertNoError (mgr .Add (registrar ))
180200 token , _ := registrar .CheckToken ()
@@ -212,10 +232,30 @@ func main() {
212232 assertNoError (mgr .AddHealthzCheck ("health" , healthz .Ping ))
213233 assertNoError (mgr .AddReadyzCheck ("check" , healthz .Ping ))
214234
215- log .Info ("starting controller runtime manager and will wait for signal to exit" )
235+ // Start the manager and wait for its context to be canceled.
236+ stopped := make (chan error , 1 )
237+ go func () { stopped <- mgr .Start (running ) }()
238+ <- running .Done ()
239+
240+ // Set a deadline for graceful termination.
241+ log .Info ("shutting down" )
242+ stopping , cancel := context .WithTimeout (context .Background (), 20 * time .Second )
243+ defer cancel ()
244+
245+ // Wait for the manager to return or the deadline to pass.
246+ select {
247+ case err = <- stopped :
248+ case <- stopping .Done ():
249+ err = stopping .Err ()
250+ }
216251
217- assertNoError (mgr .Start (ctx ))
218- log .Info ("signal received, exiting" )
252+ // Flush any telemetry with the remaining time we have.
253+ otelFinish (stopping )
254+ if err != nil {
255+ log .Error (err , "shutdown failed" )
256+ } else {
257+ log .Info ("shutdown complete" )
258+ }
219259}
220260
221261// addControllersToManager adds all PostgreSQL Operator controllers to the provided controller
0 commit comments