diff --git a/cmd/csi-external-health-monitor-controller/main.go b/cmd/csi-external-health-monitor-controller/main.go index 6937df722..a317a1ee6 100644 --- a/cmd/csi-external-health-monitor-controller/main.go +++ b/cmd/csi-external-health-monitor-controller/main.go @@ -32,8 +32,6 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" "k8s.io/component-base/featuregate" "k8s.io/component-base/logs" @@ -43,6 +41,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/connection" + libconfig "github.com/kubernetes-csi/csi-lib-utils/config" "github.com/kubernetes-csi/csi-lib-utils/leaderelection" "github.com/kubernetes-csi/csi-lib-utils/metrics" "github.com/kubernetes-csi/csi-lib-utils/rpc" @@ -63,26 +62,13 @@ const ( var ( monitorInterval = flag.Duration("monitor-interval", 1*time.Minute, "Interval for controller to check volumes health condition.") - kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.") - csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") - showVersion = flag.Bool("version", false, "Show version.") timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") listVolumesInterval = flag.Duration("list-volumes-interval", 5*time.Minute, "Time interval for calling ListVolumes RPC to check volumes' health condition") volumeListAndAddInterval = flag.Duration("volume-list-add-interval", 5*time.Minute, "Time interval for listing volumes and add them to queue") nodeListAndAddInterval = flag.Duration("node-list-add-interval", 5*time.Minute, "Time interval for listing nodess and add them to queue") workerThreads = flag.Uint("worker-threads", 10, "Number of pv monitor worker threads") enableNodeWatcher = flag.Bool("enable-node-watcher", false, "Indicates whether the node watcher is enabled or not.") - - enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.") - leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") - leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") - leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") - leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") - - metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") ) var ( @@ -95,6 +81,7 @@ func main() { c := logsapi.NewLoggingConfiguration() logsapi.AddGoFlags(c, flag.CommandLine) logs.InitLogs() + standardflags.RegisterCommonFlags(flag.CommandLine) standardflags.AddAutomaxprocs(klog.Infof) flag.Parse() logger := klog.Background() @@ -103,23 +90,23 @@ func main() { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - if *showVersion { + if standardflags.Configuration.ShowVersion { fmt.Println(os.Args[0], version) return } logger.Info("Version", "version", version) - if *metricsAddress != "" && *httpEndpoint != "" { + if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" { logger.Error(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set.") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - addr := *metricsAddress + addr := standardflags.Configuration.MetricsAddress if addr == "" { - addr = *httpEndpoint + addr = standardflags.Configuration.HttpEndpoint } // Create the client config. Use kubeconfig if given, otherwise assume in-cluster. - config, err := buildConfig(*kubeconfig) + config, err := libconfig.BuildConfig(standardflags.Configuration.KubeConfig, standardflags.Configuration) if err != nil { logger.Error(err, "Failed to build a Kubernetes config") klog.FlushAndExit(klog.ExitFlushTimeout, 1) @@ -142,7 +129,7 @@ func main() { // Connect to CSI. ctx := context.Background() - csiConn, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) + csiConn, err := connection.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) if err != nil { logger.Error(err, "Failed to connect to the CSI driver") klog.FlushAndExit(klog.ExitFlushTimeout, 1) @@ -169,12 +156,12 @@ func main() { // Prepare HTTP endpoint for metrics + leader election healthz mux := http.NewServeMux() if addr != "" { - metricsManager.RegisterToServer(mux, *metricsPath) + metricsManager.RegisterToServer(mux, standardflags.Configuration.MetricsPath) go func() { logger.Info("ServeMux listening", "address", addr) err := http.ListenAndServe(addr, mux) if err != nil { - logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "path", *metricsPath) + logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "path", standardflags.Configuration.MetricsPath) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }() @@ -276,48 +263,15 @@ func main() { monitorController.Run(ctx, int(*workerThreads), nil) } } - - if !*enableLeaderElection { - run(ctx) - } else { - // Name of config map with leader election lock - lockName := "external-health-monitor-leader-" + storageDriver - le := leaderelection.NewLeaderElection(clientset, lockName, run) - if *httpEndpoint != "" { - le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) - } - - if *leaderElectionNamespace != "" { - le.WithNamespace(*leaderElectionNamespace) - } - - le.WithLeaseDuration(*leaderElectionLeaseDuration) - le.WithRenewDeadline(*leaderElectionRenewDeadline) - le.WithRetryPeriod(*leaderElectionRetryPeriod) - le.WithContext(ctx) - if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) { - le.WithReleaseOnCancel(true) - } - - // TODO: The broadcaster and eventRecorder in the leaderelection package - // within csi-lib-utils do not support contextual logging. - // To fully support contextual logging in external-health-monitor, - // an upgrade of csi-lib-utils version will be necessary - // after contextual logging support is added to csi-lib-utils. - // https://github.com/kubernetes-sigs/sig-storage-lib-external-provisioner/pull/171 - if err := le.Run(); err != nil { - logger.Error(err, "Failed to initialize leader election") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - } - -} - -func buildConfig(kubeconfig string) (*rest.Config, error) { - if kubeconfig != "" { - return clientcmd.BuildConfigFromFlags("", kubeconfig) - } - return rest.InClusterConfig() + leaderelection.RunWithLeaderElection( + ctx, + config, + standardflags.Configuration, + run, + "external-health-monitor-leader-" + storageDriver, + mux, + utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit), + ) } func supportControllerListVolumes(ctx context.Context, csiConn *grpc.ClientConn) (supportControllerListVolumes bool, err error) {