diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index 78e9c8204..3d0ee64d1 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -38,12 +38,12 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" klog "k8s.io/klog/v2" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/connection" + libconfig "github.com/kubernetes-csi/csi-lib-utils/config" "github.com/kubernetes-csi/csi-lib-utils/leaderelection" "github.com/kubernetes-csi/csi-lib-utils/metrics" csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc" @@ -74,28 +74,13 @@ const ( // Command line flags var ( - kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") - csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") resyncPeriod = flag.Duration("resync-period", 15*time.Minute, "Resync interval of the controller. Default is 15 minutes") snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot") snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.") - showVersion = flag.Bool("version", false, "Show version.") threads = flag.Int("worker-threads", 10, "Number of worker threads.") csiTimeout = flag.Duration("timeout", defaultCSITimeout, "The timeout for any RPCs to the CSI driver. Default is 1 minute.") extraCreateMetadata = flag.Bool("extra-create-metadata", false, "If set, add snapshot metadata to plugin snapshot requests as parameters.") - leaderElection = flag.Bool("leader-election", false, "Enables leader election.") - leaderElectionNamespace = flag.String("leader-election-namespace", "", "The namespace where the leader election resource exists. Defaults to the pod namespace if not set.") - leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") - leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") - leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") - - kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") - kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") - - metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.") retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.") enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.") @@ -118,6 +103,7 @@ func main() { c := logsapi.NewLoggingConfiguration() logsapi.AddGoFlags(c, flag.CommandLine) logs.InitLogs() + standardflags.RegisterCommonFlags(flag.CommandLine) standardflags.AddAutomaxprocs(klog.Infof) flag.Parse() if err := logsapi.ValidateAndApply(c, fg); err != nil { @@ -129,28 +115,25 @@ func main() { klog.Fatal("Error while parsing feature gates: ", err) } - if *showVersion { + if standardflags.Configuration.ShowVersion { fmt.Println(os.Args[0], version) os.Exit(0) } klog.InfoS("Version", "version", version) // If distributed snapshotting is enabled and leaderElection is also set to true, return - if *enableNodeDeployment && *leaderElection { + if *enableNodeDeployment && standardflags.Configuration.LeaderElection { klog.Error("Leader election cannot happen when node-deployment is set to true") os.Exit(1) } // Create the client config. Use kubeconfig if given, otherwise assume in-cluster. - config, err := buildConfig(*kubeconfig) + config, err := libconfig.BuildConfig(standardflags.Configuration.KubeConfig, standardflags.Configuration) if err != nil { klog.Error(err.Error()) os.Exit(1) } - config.QPS = (float32)(*kubeAPIQPS) - config.Burst = *kubeAPIBurst - coreConfig := rest.CopyConfig(config) coreConfig.ContentType = runtime.ContentTypeProtobuf kubeClient, err := kubernetes.NewForConfig(coreConfig) @@ -184,13 +167,13 @@ func main() { // Add Snapshot types to the default Kubernetes so events can be logged for them snapshotscheme.AddToScheme(scheme.Scheme) - if *metricsAddress != "" && *httpEndpoint != "" { + if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" { klog.Error("only one of `--metrics-address` and `--http-endpoint` can be set.") os.Exit(1) } - addr := *metricsAddress + addr := standardflags.Configuration.MetricsAddress if addr == "" { - addr = *httpEndpoint + addr = standardflags.Configuration.HttpEndpoint } // Connect to CSI. @@ -198,7 +181,7 @@ func main() { ctx := context.Background() csiConn, err := connection.Connect( ctx, - *csiAddress, + standardflags.Configuration.CSIAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) if err != nil { @@ -226,13 +209,13 @@ func main() { // Prepare http endpoint for metrics + leader election healthz mux := http.NewServeMux() if addr != "" { - metricsManager.RegisterToServer(mux, *metricsPath) + metricsManager.RegisterToServer(mux, standardflags.Configuration.MetricsPath) metricsManager.SetDriverName(driverName) go func() { klog.Infof("ServeMux listening at %q", addr) err := http.ListenAndServe(addr, mux) if err != nil { - klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err) + klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, standardflags.Configuration.MetricsPath, err) } }() } @@ -261,7 +244,7 @@ func main() { os.Exit(1) } - klog.V(2).Infof("Start NewCSISnapshotSideCarController with snapshotter [%s] kubeconfig [%s] csiTimeout [%+v] csiAddress [%s] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", driverName, *kubeconfig, *csiTimeout, *csiAddress, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength) + klog.V(2).Infof("Start NewCSISnapshotSideCarController with snapshotter [%s] kubeconfig [%s] csiTimeout [%+v] csiAddress [%s] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", driverName, standardflags.Configuration.KubeConfig, *csiTimeout, standardflags.Configuration.CSIAddress, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength) snapShotter := snapshotter.NewSnapshotter(csiConn) var groupSnapshotter group_snapshotter.GroupSnapshotter @@ -354,44 +337,15 @@ func main() { } } - if !*leaderElection { - run(context.TODO()) - } else { - lockName := fmt.Sprintf("%s-%s", prefix, strings.Replace(driverName, "/", "-", -1)) - // Create a new clientset for leader election to prevent throttling - // due to snapshot sidecar - leClientset, err := kubernetes.NewForConfig(config) - if err != nil { - klog.Fatalf("failed to create leaderelection client: %v", err) - } - le := leaderelection.NewLeaderElection(leClientset, lockName, run) - if *httpEndpoint != "" { - le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) - } - - if *leaderElectionNamespace != "" { - le.WithNamespace(*leaderElectionNamespace) - } - - le.WithLeaseDuration(*leaderElectionLeaseDuration) - le.WithRenewDeadline(*leaderElectionRenewDeadline) - le.WithRetryPeriod(*leaderElectionRetryPeriod) - if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) { - le.WithReleaseOnCancel(true) - le.WithContext(ctx) - } - - if err := le.Run(); err != nil { - klog.Fatalf("failed to initialize leader election: %v", err) - } - } -} - -func buildConfig(kubeconfig string) (*rest.Config, error) { - if kubeconfig != "" { - return clientcmd.BuildConfigFromFlags("", kubeconfig) - } - return rest.InClusterConfig() + leaderelection.RunWithLeaderElection( + ctx, + config, + standardflags.Configuration, + run, + fmt.Sprintf("%s-%s", prefix, strings.Replace(driverName, "/", "-", -1)), + mux, + utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit), + ) } func supportsControllerCreateSnapshot(ctx context.Context, conn *grpc.ClientConn) (bool, error) {