From 6a917e9dc47f9c108ea5a9cfb88539679a4fa613 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Tue, 10 Sep 2024 15:59:05 -0400 Subject: [PATCH] explicitly handle SIGINT and SIGTERM --- cmd/configmap-server/main.go | 22 +++++++++++++--------- cmd/opm/main.go | 9 ++++++++- cmd/opm/registry/serve.go | 35 ++++++++++++++++++++--------------- cmd/opm/serve/serve.go | 13 ++++++++----- cmd/registry-server/main.go | 34 +++++++++++++++++++--------------- pkg/lib/graceful/shutdown.go | 33 --------------------------------- 6 files changed, 68 insertions(+), 78 deletions(-) delete mode 100644 pkg/lib/graceful/shutdown.go diff --git a/cmd/configmap-server/main.go b/cmd/configmap-server/main.go index 454cbc56f..5b5ec2b71 100644 --- a/cmd/configmap-server/main.go +++ b/cmd/configmap-server/main.go @@ -17,7 +17,6 @@ import ( "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/lib/dns" - "github.com/operator-framework/operator-registry/pkg/lib/graceful" "github.com/operator-framework/operator-registry/pkg/lib/log" "github.com/operator-framework/operator-registry/pkg/registry" "github.com/operator-framework/operator-registry/pkg/server" @@ -59,6 +58,9 @@ func main() { } func runCmdFunc(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + // Immediately set up termination log terminationLogPath, err := cmd.Flags().GetString("termination-log") if err != nil { @@ -99,7 +101,7 @@ func runCmdFunc(cmd *cobra.Command, args []string) error { logger := logrus.WithFields(logrus.Fields{"configMapName": configMapName, "configMapNamespace": configMapNamespace, "port": port}) client := NewClientFromConfig(kubeconfig, logger.Logger) - configMap, err := client.CoreV1().ConfigMaps(configMapNamespace).Get(context.TODO(), configMapName, metav1.GetOptions{}) + configMap, err := client.CoreV1().ConfigMaps(configMapNamespace).Get(ctx, configMapName, metav1.GetOptions{}) if err != nil { logger.Fatalf("error getting configmap: %s", err) } @@ -113,7 +115,7 @@ func runCmdFunc(cmd *cobra.Command, args []string) error { if err != nil { return err } - if err := sqlLoader.Migrate(context.TODO()); err != nil { + if err := sqlLoader.Migrate(ctx); err != nil { return err } @@ -136,7 +138,7 @@ func runCmdFunc(cmd *cobra.Command, args []string) error { } // sanity check that the db is available - tables, err := store.ListTables(context.TODO()) + tables, err := store.ListTables(ctx) if err != nil { logger.WithError(err).Warnf("couldn't list tables in db") } @@ -154,12 +156,14 @@ func runCmdFunc(cmd *cobra.Command, args []string) error { health.RegisterHealthServer(s, server.NewHealthServer()) reflection.Register(s) - logger.Info("serving registry") - return graceful.Shutdown(logger, func() error { - return s.Serve(lis) - }, func() { + go func() { + <-ctx.Done() + logger.Info("shutting down server") s.GracefulStop() - }) + }() + + logger.Info("serving registry") + return s.Serve(lis) } // NewClient creates a kubernetes client or bails out on on failures. diff --git a/cmd/opm/main.go b/cmd/opm/main.go index c88f52148..2359df458 100644 --- a/cmd/opm/main.go +++ b/cmd/opm/main.go @@ -1,7 +1,10 @@ package main import ( + "context" "os" + "os/signal" + "syscall" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -12,7 +15,11 @@ import ( func main() { showAlphaHelp := os.Getenv("HELP_ALPHA") == "true" cmd := root.NewCmd(showAlphaHelp) - if err := cmd.Execute(); err != nil { + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + if err := cmd.ExecuteContext(ctx); err != nil { agg, ok := err.(utilerrors.Aggregate) if !ok { os.Exit(1) diff --git a/cmd/opm/registry/serve.go b/cmd/opm/registry/serve.go index fcaf67e6f..320bd795a 100644 --- a/cmd/opm/registry/serve.go +++ b/cmd/opm/registry/serve.go @@ -17,7 +17,6 @@ import ( "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/lib/dns" - "github.com/operator-framework/operator-registry/pkg/lib/graceful" "github.com/operator-framework/operator-registry/pkg/lib/log" "github.com/operator-framework/operator-registry/pkg/lib/tmp" "github.com/operator-framework/operator-registry/pkg/server" @@ -54,6 +53,9 @@ func newRegistryServeCmd() *cobra.Command { } func serveFunc(cmd *cobra.Command, _ []string) error { + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + // Immediately set up termination log terminationLogPath, err := cmd.Flags().GetString("termination-log") if err != nil { @@ -93,19 +95,23 @@ func serveFunc(cmd *cobra.Command, _ []string) error { return err } - if _, err := db.ExecContext(context.TODO(), `PRAGMA soft_heap_limit=1`); err != nil { + if _, err := db.ExecContext(ctx, `PRAGMA soft_heap_limit=1`); err != nil { logger.WithError(err).Warnf("error setting soft heap limit for sqlite") } // migrate to the latest version - if err := migrate(cmd, db); err != nil { + shouldSkipMigrate, err := cmd.Flags().GetBool("skip-migrate") + if err != nil { + return err + } + if err := migrate(ctx, shouldSkipMigrate, db); err != nil { logger.WithError(err).Warnf("couldn't migrate db") } store := sqlite.NewSQLLiteQuerierFromDb(db, sqlite.OmitManifests(true)) // sanity check that the db is available - tables, err := store.ListTables(context.TODO()) + tables, err := store.ListTables(ctx) if err != nil { logger.WithError(err).Warnf("couldn't list tables in db") } @@ -142,19 +148,18 @@ func serveFunc(cmd *cobra.Command, _ []string) error { api.RegisterRegistryServer(s, server.NewRegistryServer(store)) health.RegisterHealthServer(s, server.NewHealthServer()) reflection.Register(s) - logger.Info("serving registry") - return graceful.Shutdown(logger, func() error { - return s.Serve(lis) - }, func() { + + go func() { + <-ctx.Done() + logger.Info("shutting down server") s.GracefulStop() - }) + }() + + logger.Info("serving registry") + return s.Serve(lis) } -func migrate(cmd *cobra.Command, db *sql.DB) error { - shouldSkipMigrate, err := cmd.Flags().GetBool("skip-migrate") - if err != nil { - return err - } +func migrate(ctx context.Context, shouldSkipMigrate bool, db *sql.DB) error { if shouldSkipMigrate { return nil } @@ -167,5 +172,5 @@ func migrate(cmd *cobra.Command, db *sql.DB) error { return fmt.Errorf("failed to load migrator") } - return migrator.Migrate(context.TODO()) + return migrator.Migrate(ctx) } diff --git a/cmd/opm/serve/serve.go b/cmd/opm/serve/serve.go index 6ed72a351..011286c6f 100644 --- a/cmd/opm/serve/serve.go +++ b/cmd/opm/serve/serve.go @@ -23,7 +23,6 @@ import ( "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/cache" "github.com/operator-framework/operator-registry/pkg/lib/dns" - "github.com/operator-framework/operator-registry/pkg/lib/graceful" "github.com/operator-framework/operator-registry/pkg/lib/log" "github.com/operator-framework/operator-registry/pkg/server" ) @@ -91,6 +90,9 @@ will not be reflected in the served content. } func (s *serve) run(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + mainLogger := s.logger.Dup() p := newProfilerInterface(s.pprofAddr, mainLogger) if err := p.startEndpoint(); err != nil { @@ -169,15 +171,16 @@ func (s *serve) run(ctx context.Context) error { mainLogger.Info("serving registry") p.stopCpuProfileCache() - return graceful.Shutdown(s.logger, func() error { - return grpcServer.Serve(lis) - }, func() { + go func() { + <-ctx.Done() + mainLogger.Info("shutting down server") grpcServer.GracefulStop() if err := p.stopEndpoint(ctx); err != nil { mainLogger.Warnf("error shutting down pprof server: %v", err) } - }) + }() + return grpcServer.Serve(lis) } // manages an HTTP pprof endpoint served by `server`, diff --git a/cmd/registry-server/main.go b/cmd/registry-server/main.go index 1fbc2061d..6ed2fc9ed 100644 --- a/cmd/registry-server/main.go +++ b/cmd/registry-server/main.go @@ -15,7 +15,6 @@ import ( "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/lib/dns" - "github.com/operator-framework/operator-registry/pkg/lib/graceful" "github.com/operator-framework/operator-registry/pkg/lib/log" "github.com/operator-framework/operator-registry/pkg/lib/tmp" "github.com/operator-framework/operator-registry/pkg/server" @@ -58,6 +57,9 @@ func main() { } func runCmdFunc(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + // Immediately set up termination log terminationLogPath, err := cmd.Flags().GetString("termination-log") if err != nil { @@ -95,19 +97,23 @@ func runCmdFunc(cmd *cobra.Command, args []string) error { return err } - if _, err := db.ExecContext(context.TODO(), `PRAGMA soft_heap_limit=1`); err != nil { + if _, err := db.ExecContext(ctx, `PRAGMA soft_heap_limit=1`); err != nil { logger.WithError(err).Warnf("error setting soft heap limit for sqlite") } // migrate to the latest version - if err := migrate(cmd, db); err != nil { + shouldSkipMigrate, err := cmd.Flags().GetBool("skip-migrate") + if err != nil { + return err + } + if err := migrate(ctx, shouldSkipMigrate, db); err != nil { logger.WithError(err).Warnf("couldn't migrate db") } store := sqlite.NewSQLLiteQuerierFromDb(db, sqlite.OmitManifests(true)) // sanity check that the db is available - tables, err := store.ListTables(context.TODO()) + tables, err := store.ListTables(ctx) if err != nil { logger.WithError(err).Warnf("couldn't list tables in db") } @@ -124,20 +130,18 @@ func runCmdFunc(cmd *cobra.Command, args []string) error { api.RegisterRegistryServer(s, server.NewRegistryServer(store)) health.RegisterHealthServer(s, server.NewHealthServer()) reflection.Register(s) - logger.Info("serving registry") - return graceful.Shutdown(logger, func() error { - return s.Serve(lis) - }, func() { + go func() { + <-ctx.Done() + logger.Info("shutting down server") s.GracefulStop() - }) + }() + + logger.Info("serving registry") + return s.Serve(lis) } -func migrate(cmd *cobra.Command, db *sql.DB) error { - shouldSkipMigrate, err := cmd.Flags().GetBool("skip-migrate") - if err != nil { - return err - } +func migrate(ctx context.Context, shouldSkipMigrate bool, db *sql.DB) error { if shouldSkipMigrate { return nil } @@ -150,5 +154,5 @@ func migrate(cmd *cobra.Command, db *sql.DB) error { return fmt.Errorf("failed to load migrator") } - return migrator.Migrate(context.TODO()) + return migrator.Migrate(ctx) } diff --git a/pkg/lib/graceful/shutdown.go b/pkg/lib/graceful/shutdown.go deleted file mode 100644 index 34c72d414..000000000 --- a/pkg/lib/graceful/shutdown.go +++ /dev/null @@ -1,33 +0,0 @@ -package graceful - -import ( - "context" - "os" - "os/signal" - "syscall" - - "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" -) - -func Shutdown(logger logrus.FieldLogger, run func() error, cleanup func()) error { - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) - defer signal.Stop(interrupt) - - g, ctx := errgroup.WithContext(context.Background()) - g.Go(run) - - select { - case <-interrupt: - break - case <-ctx.Done(): - break - } - - logger.Info("shutting down...") - - cleanup() - - return g.Wait() -}