diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index 75dd76ae1..18a57bea2 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -31,8 +31,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" utilflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/featuregate" @@ -48,6 +46,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/connection" "github.com/kubernetes-csi/csi-lib-utils/leaderelection" + libconfig "github.com/kubernetes-csi/csi-lib-utils/config" "github.com/kubernetes-csi/csi-lib-utils/metrics" "github.com/kubernetes-csi/csi-lib-utils/rpc" "github.com/kubernetes-csi/csi-lib-utils/standardflags" @@ -65,34 +64,15 @@ const ( // Command line flags var ( - kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.") - csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") - showVersion = flag.Bool("version", false, "Show version.") timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads") maxEntries = flag.Int("max-entries", 0, "Max entries per each page in volume lister call, 0 means no limit.") - retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.") - retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.") - - enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.") - leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") - leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") - leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") - leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") - defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to publish. Defaults to empty string") reconcileSync = flag.Duration("reconcile-sync", 1*time.Minute, "Resync interval of the VolumeAttachment reconciler.") - metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") - - kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") - kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") - maxGRPCLogLength = flag.Int("max-grpc-log-length", -1, "The maximum amount of characters logged for every grpc responses. Defaults to no limit") featureGates map[string]bool @@ -111,6 +91,7 @@ func main() { c := logsapi.NewLoggingConfiguration() logsapi.AddGoFlags(c, flag.CommandLine) logs.InitLogs() + standardflags.RegisterCommonFlags(flag.CommandLine) standardflags.AddAutomaxprocs(klog.Infof) flag.Parse() logger := klog.Background() @@ -124,29 +105,27 @@ func main() { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - if *showVersion { + if standardflags.Configuration.ShowVersion { fmt.Println(os.Args[0], version) return } logger.Info("Version", "version", version) - if *metricsAddress != "" && *httpEndpoint != "" { + if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" { logger.Error(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - addr := *metricsAddress + addr := standardflags.Configuration.MetricsAddress if addr == "" { - addr = *httpEndpoint + addr = standardflags.Configuration.HttpEndpoint } // Create the client config. Use kubeconfig if given, otherwise assume in-cluster. - config, err := buildConfig(*kubeconfig) + config, err := libconfig.BuildConfig(standardflags.Configuration.KubeConfig, standardflags.Configuration) if err != nil { logger.Error(err, "Failed to build a Kubernetes config") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - config.QPS = (float32)(*kubeAPIQPS) - config.Burst = *kubeAPIBurst config.ContentType = runtime.ContentTypeProtobuf if *workerThreads == 0 { @@ -167,9 +146,9 @@ func main() { // Connect to CSI. connection.SetMaxGRPCLogLength(*maxGRPCLogLength) ctx := context.Background() - csiConn, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) + csiConn, err := connection.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) if err != nil { - logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress) + logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", standardflags.Configuration.CSIAddress) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } @@ -194,9 +173,9 @@ func main() { translator := csitrans.New() if translator.IsMigratedCSIDriverByName(csiAttacher) { metricsManager = metrics.NewCSIMetricsManagerWithOptions(csiAttacher, metrics.WithMigration()) - migratedCsiClient, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) + migratedCsiClient, err := connection.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) if err != nil { - logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress, "migrated", true) + logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", standardflags.Configuration.CSIAddress, "migrated", true) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } csiConn.Close() @@ -216,13 +195,13 @@ func main() { // Prepare http endpoint for metrics + leader election healthz mux := http.NewServeMux() if addr != "" { - metricsManager.RegisterToServer(mux, *metricsPath) + metricsManager.RegisterToServer(mux, standardflags.Configuration.MetricsPath) metricsManager.SetDriverName(csiAttacher) go func() { - logger.Info("ServeMux listening", "address", addr, "metricsPath", *metricsPath) + logger.Info("ServeMux listening", "address", addr, "metricsPath", standardflags.Configuration.MetricsPath) err := http.ListenAndServe(addr, mux) if err != nil { - logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", *metricsPath) + logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", standardflags.Configuration.MetricsPath) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }() @@ -291,8 +270,8 @@ func main() { handler, factory.Storage().V1().VolumeAttachments(), factory.Core().V1().PersistentVolumes(), - workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), - workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax), supportsListVolumesPublishedNodes, *reconcileSync, ) @@ -332,49 +311,15 @@ func main() { } } - if !*enableLeaderElection { - run(klog.NewContext(context.Background(), logger)) - } else { - // Create a new clientset for leader election. When the attacher - // gets busy and its client gets throttled, the leader election - // can proceed without issues. - leClientset, err := kubernetes.NewForConfig(config) - if err != nil { - logger.Error(err, "Failed to create leaderelection client") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - - // Name of config map with leader election lock - lockName := "external-attacher-leader-" + csiAttacher - le := leaderelection.NewLeaderElection(leClientset, lockName, run) - if *httpEndpoint != "" { - le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) - } - - if *leaderElectionNamespace != "" { - le.WithNamespace(*leaderElectionNamespace) - } - - le.WithLeaseDuration(*leaderElectionLeaseDuration) - le.WithRenewDeadline(*leaderElectionRenewDeadline) - le.WithRetryPeriod(*leaderElectionRetryPeriod) - if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) { - le.WithReleaseOnCancel(true) - le.WithContext(ctx) - } - - if err := le.Run(); err != nil { - logger.Error(err, "Failed to initialize leader election") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - } -} - -func buildConfig(kubeconfig string) (*rest.Config, error) { - if kubeconfig != "" { - return clientcmd.BuildConfigFromFlags("", kubeconfig) - } - return rest.InClusterConfig() + leaderelection.RunWithLeaderElection( + ctx, + config, + standardflags.Configuration, + run, + "external-attacher-leader-" + csiAttacher, + mux, + utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit), + ) } func supportsControllerCapabilities(ctx context.Context, csiConn *grpc.ClientConn) (bool, bool, bool, bool, error) {