diff --git a/acceptance/steps/rpk.go b/acceptance/steps/rpk.go index e113ef7fa..beae5de80 100644 --- a/acceptance/steps/rpk.go +++ b/acceptance/steps/rpk.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "strings" + "time" "github.com/cucumber/godog" "github.com/stretchr/testify/require" @@ -105,15 +106,31 @@ func checkRPKCommands(ctx context.Context, t framework.TestingT, clusterName str Stderr: &stderr, }), "\nStdout: %s\nStderr: %s\n", stdout.String(), stderr.String()) require.Len(t, stderr.Bytes(), 0) - - require.NoErrorf(t, ctl.Exec(ctx, &p, kube.ExecOptions{ - Container: "redpanda", - Command: []string{"rpk", "registry", "schema", "list"}, - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - }), "\nStdout: %s\nStderr: %s\n", stdout.String(), stderr.String()) - require.Len(t, stderr.Bytes(), 0) + stdout.Reset() + + require.Eventually(t, func() bool { + command := []string{"rpk", "registry", "schema", "list"} + err := ctl.Exec(ctx, &p, kube.ExecOptions{ + Container: "redpanda", + Command: command, + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + t.Logf("rpk command %q failed with error: %v\nStdout: %s\nStderr: %s\n", strings.Join(command, " "), err, stdout.String(), stderr.String()) + stdout.Reset() + return false + } + if len(stderr.Bytes()) != 0 { + t.Logf("rpk command %q failed with \nStdout: %s\nStderr: %s\n", strings.Join(command, " "), stdout.String(), stderr.String()) + stderr.Reset() + stdout.Reset() + return false + } + return true + }, time.Minute, 10*time.Second) + stdout.Reset() require.NoErrorf(t, ctl.Exec(ctx, &p, kube.ExecOptions{ Container: "redpanda", @@ -123,5 +140,6 @@ func checkRPKCommands(ctx context.Context, t framework.TestingT, clusterName str Stderr: &stderr, }), "\nStdout: %s\nStderr: %s\n", stdout.String(), stderr.String()) require.Len(t, stderr.Bytes(), 0) + } } diff --git a/operator/cmd/run/run.go b/operator/cmd/run/run.go index 1ae5820b3..c50f9737d 100644 --- a/operator/cmd/run/run.go +++ b/operator/cmd/run/run.go @@ -15,7 +15,6 @@ import ( "context" "crypto/tls" "fmt" - "os" "path/filepath" "slices" "strings" @@ -24,7 +23,6 @@ import ( "github.com/cockroachdb/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" - helmkube "helm.sh/helm/v3/pkg/kube" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -63,14 +61,6 @@ import ( redpandawebhooks "github.com/redpanda-data/redpanda-operator/operator/webhooks/redpanda" ) -type RedpandaController string - -type OperatorState string - -func (r RedpandaController) toString() string { - return string(r) -} - const ( defaultConfiguratorContainerImage = "docker.redpanda.com/redpandadata/redpanda-operator" @@ -81,8 +71,6 @@ const ( OperatorV1Mode = OperatorState("Clustered-v1") OperatorV2Mode = OperatorState("Namespaced-v2") NamespaceControllerMode = OperatorState("Namespaced-Controllers") - - controllerName = "redpanda-controller" ) var availableControllers = []string{ @@ -90,6 +78,119 @@ var availableControllers = []string{ DecommissionController.toString(), } +type RunOptions struct { + managerOptions ctrl.Options + clusterDomain string + secureMetrics bool + enableHTTP2 bool + webhookEnabled bool + configuratorBaseImage string + configuratorTag string + configuratorImagePullPolicy string + decommissionWaitInterval time.Duration + metricsTimeout time.Duration + rpClientTimeout time.Duration + restrictToRedpandaVersion string + namespace string + additionalControllers []string + operatorMode bool + ghostbuster bool + unbindPVCsAfter time.Duration + unbinderSelector LabelSelectorValue + allowPVRebinding bool + autoDeletePVCs bool + webhookCertPath string + webhookCertName string + webhookCertKey string + metricsCertPath string + metricsCertName string + metricsCertKey string + enableGhostBrokerDecommissioner bool + ghostBrokerDecommissionerSyncPeriod time.Duration + cloudSecretsEnabled bool + cloudSecretsPrefix string + cloudSecretsConfig pkgsecrets.ExpanderCloudConfiguration +} + +func (o *RunOptions) BindFlags(cmd *cobra.Command) { + // Manager flags. + cmd.Flags().StringVar(&o.managerOptions.Metrics.BindAddress, "metrics-bind-address", "0", "The address the metrics endpoint binds to. Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") + cmd.Flags().BoolVar(&o.managerOptions.LeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") + cmd.Flags().StringVar(&o.managerOptions.LeaderElectionID, "leader-election-id", "aa9fc693.vectorized.io", "Sets the ID used for the leader election process.") + // NB: The default behavior here is in the controller-runtime, pretty deep. It reads the namespace file that's created when mounting a service account token. + cmd.Flags().StringVar(&o.managerOptions.LeaderElectionNamespace, "leader-election-namespace", "", "Sets the namespace that leader election resources will be created within. If not specified, defaults the value of --namespace or the namespace this Pod is running in.") + cmd.Flags().StringVar(&o.managerOptions.HealthProbeBindAddress, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + cmd.Flags().StringVar(&o.managerOptions.PprofBindAddress, "pprof-bind-address", ":8082", "The address the metric endpoint binds to. Set to '' or 0 to disable") + cmd.Flags().StringVar(&o.namespace, "namespace", "", "If namespace is set to not empty value, it changes scope of Redpanda operator to work in single namespace") + cmd.Flags().BoolVar(&o.secureMetrics, "metrics-secure", true, "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.") + cmd.Flags().BoolVar(&o.enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers") + cmd.Flags().StringVar(&o.webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.") + cmd.Flags().StringVar(&o.webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.") + cmd.Flags().StringVar(&o.webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.") + cmd.Flags().StringVar(&o.metricsCertPath, "metrics-cert-path", "", "The directory that contains the metrics server certificate.") + cmd.Flags().StringVar(&o.metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.") + cmd.Flags().StringVar(&o.metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.") + cmd.Flags().BoolVar(&o.webhookEnabled, "webhook-enabled", false, "Enable webhook Manager") + + // Controller flags. + cmd.Flags().StringVar(&o.clusterDomain, "cluster-domain", "cluster.local", "Set the Kubernetes local domain (Kubelet's --cluster-domain)") + cmd.Flags().StringVar(&o.configuratorBaseImage, "configurator-base-image", defaultConfiguratorContainerImage, "The repository of the operator container image for use in self-referential deployments, such as the configurator and sidecar") + cmd.Flags().StringVar(&o.configuratorTag, "configurator-tag", version.Version, "The tag of the operator container image for use in self-referential deployments, such as the configurator and sidecar") + cmd.Flags().StringVar(&o.configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy") + cmd.Flags().DurationVar(&o.decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster") + cmd.Flags().DurationVar(&o.metricsTimeout, "metrics-timeout", 8*time.Second, "Set the timeout for a checking metrics Admin API endpoint. If set to 0, then the 2 seconds default will be used") + cmd.Flags().DurationVar(&o.rpClientTimeout, "cluster-connection-timeout", 10*time.Second, "Set the timeout for internal clients used to connect to Redpanda clusters") + cmd.Flags().StringVar(&o.restrictToRedpandaVersion, "restrict-redpanda-version", "", "Restrict management of clusters to those with this version") + cmd.Flags().BoolVar(&o.ghostbuster, "unsafe-decommission-failed-brokers", false, "Set to enable decommissioning a failed broker that is configured but does not exist in the StatefulSet (ghost broker). This may result in invalidating valid data") + _ = cmd.Flags().MarkHidden("unsafe-decommission-failed-brokers") + cmd.Flags().StringSliceVar(&o.additionalControllers, "additional-controllers", []string{""}, fmt.Sprintf("which controllers to run, available: all, %s", strings.Join(availableControllers, ", "))) + cmd.Flags().BoolVar(&o.operatorMode, "operator-mode", true, "enables to run as an operator, setting this to false will disable cluster (deprecated), redpanda resources reconciliation.") + cmd.Flags().DurationVar(&o.unbindPVCsAfter, "unbind-pvcs-after", 0, "if not zero, runs the PVCUnbinder controller which attempts to 'unbind' the PVCs' of Pods that are Pending for longer than the given duration") + cmd.Flags().BoolVar(&o.allowPVRebinding, "allow-pv-rebinding", false, "controls whether or not PVs unbound by the PVCUnbinder have their .ClaimRef cleared, which allows them to be reused") + cmd.Flags().Var(&o.unbinderSelector, "unbinder-label-selector", "if provided, a Kubernetes label selector that will filter Pods to be considered by the PVCUnbinder.") + cmd.Flags().BoolVar(&o.autoDeletePVCs, "auto-delete-pvcs", false, "Use StatefulSet PersistentVolumeClaimRetentionPolicy to auto delete PVCs on scale down and Cluster resource delete.") + cmd.Flags().BoolVar(&o.enableGhostBrokerDecommissioner, "enable-ghost-broker-decommissioner", false, "Enable ghost broker decommissioner.") + cmd.Flags().DurationVar(&o.ghostBrokerDecommissionerSyncPeriod, "ghost-broker-decommissioner-sync-period", time.Minute*5, "Ghost broker sync period. The Ghost Broker Decommissioner is guaranteed to be called after this period.") + + // Secret store related flags. + cmd.Flags().BoolVar(&o.cloudSecretsEnabled, "enable-cloud-secrets", false, "Set to true if config values can reference secrets from cloud secret store") + cmd.Flags().StringVar(&o.cloudSecretsPrefix, "cloud-secrets-prefix", "", "Prefix for all names of cloud secrets") + cmd.Flags().StringVar(&o.cloudSecretsConfig.AWSRegion, "cloud-secrets-aws-region", "", "AWS Region in which the secrets are stored") + cmd.Flags().StringVar(&o.cloudSecretsConfig.AWSRoleARN, "cloud-secrets-aws-role-arn", "", "AWS role ARN to assume when fetching secrets") + cmd.Flags().StringVar(&o.cloudSecretsConfig.GCPProjectID, "cloud-secrets-gcp-project-id", "", "GCP project ID in which the secrets are stored") + cmd.Flags().StringVar(&o.cloudSecretsConfig.AzureKeyVaultURI, "cloud-secrets-azure-key-vault-uri", "", "Azure Key Vault URI in which the secrets are stored") + + // Legacy Global flags. + cmd.Flags().BoolVar(&vectorizedv1alpha1.AllowDownscalingInWebhook, "allow-downscaling", true, "Allow to reduce the number of replicas in existing clusters") + cmd.Flags().BoolVar(&vectorizedv1alpha1.AllowConsoleAnyNamespace, "allow-console-any-ns", false, "Allow to create Console in any namespace. Allowing this copies Redpanda SchemaRegistry TLS Secret to namespace (alpha feature)") + cmd.Flags().StringVar(&vectorizedv1alpha1.SuperUsersPrefix, "superusers-prefix", "", "Prefix to add in username of superusers managed by operator. This will only affect new clusters, enabling this will not add prefix to existing clusters (alpha feature)") + + // Deprecated flags. + cmd.Flags().Bool("debug", false, "A deprecated and unused flag") + cmd.Flags().String("events-addr", "", "A deprecated and unused flag") + cmd.Flags().Bool("enable-helm-controllers", false, "A deprecated and unused flag") + cmd.Flags().String("helm-repository-url", "https://charts.redpanda.com/", "A deprecated and unused flag") + cmd.Flags().Bool("force-defluxed-mode", false, "A deprecated and unused flag") + cmd.Flags().Bool("allow-pvc-deletion", false, "Deprecated: Ignored if specified") +} + +func (o *RunOptions) ControllerEnabled(controller RedpandaController) bool { + for _, c := range o.additionalControllers { + if RedpandaController(c) == AllControllers || RedpandaController(c) == controller { + return true + } + } + return false +} + +type RedpandaController string + +type OperatorState string + +func (r RedpandaController) toString() string { + return string(r) +} + type LabelSelectorValue struct { Selector labels.Selector } @@ -127,170 +228,28 @@ func (s *LabelSelectorValue) Type() string { // +kubebuilder:rbac:groups=core,namespace=default,resources=events,verbs=create;patch func Command() *cobra.Command { - var ( - clusterDomain string - metricsAddr string - secureMetrics bool - enableHTTP2 bool - probeAddr string - pprofAddr string - enableLeaderElection bool - webhookEnabled bool - configuratorBaseImage string - configuratorTag string - configuratorImagePullPolicy string - decommissionWaitInterval time.Duration - metricsTimeout time.Duration - rpClientTimeout time.Duration - restrictToRedpandaVersion string - namespace string - additionalControllers []string - operatorMode bool - ghostbuster bool - unbindPVCsAfter time.Duration - unbinderSelector LabelSelectorValue - allowPVRebinding bool - autoDeletePVCs bool - webhookCertPath string - webhookCertName string - webhookCertKey string - metricsCertPath string - metricsCertName string - metricsCertKey string - enableGhostBrokerDecommissioner bool - ghostBrokerDecommissionerSyncPeriod time.Duration - cloudSecretsEnabled bool - cloudSecretsPrefix string - cloudSecretsAWSRegion string - cloudSecretsAWSRoleARN string - cloudSecretsGCPProjectID string - cloudSecretsAzureKeyVaultURI string - ) + var options RunOptions cmd := &cobra.Command{ Use: "run", Short: "Run the redpanda operator", RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() + var cloudExpander *pkgsecrets.CloudExpander - if cloudSecretsEnabled { - cloudConfig := pkgsecrets.ExpanderCloudConfiguration{} - if cloudSecretsAWSRegion != "" { - cloudConfig.AWSRegion = cloudSecretsAWSRegion - // if AWSRoleARN is empty, it uses the assumed role of the pod - cloudConfig.AWSRoleARN = cloudSecretsAWSRoleARN - } else if cloudSecretsGCPProjectID != "" { - cloudConfig.GCPProjectID = cloudSecretsGCPProjectID - } else if cloudSecretsAzureKeyVaultURI != "" { - cloudConfig.AzureKeyVaultURI = cloudSecretsAzureKeyVaultURI - } else { - return errors.New("Cloud secrets are enabled but configuration for cloud provider is missing or invalid") - } + if options.cloudSecretsEnabled { var err error - cloudExpander, err = pkgsecrets.NewCloudExpander(ctx, cloudSecretsPrefix, cloudConfig) + cloudExpander, err = pkgsecrets.NewCloudExpander(ctx, options.cloudSecretsPrefix, options.cloudSecretsConfig) if err != nil { return err } } - return Run( - ctx, - clusterDomain, - metricsAddr, - secureMetrics, - enableHTTP2, - probeAddr, - enableLeaderElection, - webhookEnabled, - configuratorBaseImage, - configuratorTag, - configuratorImagePullPolicy, - decommissionWaitInterval, - metricsTimeout, - restrictToRedpandaVersion, - namespace, - additionalControllers, - operatorMode, - ghostbuster, - unbindPVCsAfter, - unbinderSelector.Selector, - allowPVRebinding, - autoDeletePVCs, - pprofAddr, - webhookCertPath, - webhookCertName, - webhookCertKey, - metricsCertPath, - metricsCertName, - metricsCertKey, - enableGhostBrokerDecommissioner, - ghostBrokerDecommissionerSyncPeriod, - cloudExpander, - cloudSecretsEnabled, - cloudSecretsPrefix, - cloudSecretsAWSRegion, - cloudSecretsAWSRoleARN, - cloudSecretsGCPProjectID, - cloudSecretsAzureKeyVaultURI, - rpClientTimeout, - ) + return Run(ctx, cloudExpander, &options) }, } - cmd.Flags().StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ - "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") - cmd.Flags().BoolVar(&secureMetrics, "metrics-secure", true, "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.") - cmd.Flags().BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers") - cmd.Flags().StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") - cmd.Flags().StringVar(&pprofAddr, "pprof-bind-address", ":8082", "The address the metric endpoint binds to. Set to '' or 0 to disable") - cmd.Flags().StringVar(&clusterDomain, "cluster-domain", "cluster.local", "Set the Kubernetes local domain (Kubelet's --cluster-domain)") - cmd.Flags().BoolVar(&enableLeaderElection, "leader-elect", false, - "Enable leader election for controller manager. "+ - "Enabling this will ensure there is only one active controller manager.") - cmd.Flags().BoolVar(&webhookEnabled, "webhook-enabled", false, "Enable webhook Manager") - cmd.Flags().StringVar(&configuratorBaseImage, "configurator-base-image", defaultConfiguratorContainerImage, "The repository of the operator container image for use in self-referential deployments, such as the configurator and sidecar") - cmd.Flags().StringVar(&configuratorTag, "configurator-tag", version.Version, "The tag of the operator container image for use in self-referential deployments, such as the configurator and sidecar") - cmd.Flags().StringVar(&configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy") - cmd.Flags().DurationVar(&decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster") - cmd.Flags().DurationVar(&metricsTimeout, "metrics-timeout", 8*time.Second, "Set the timeout for a checking metrics Admin API endpoint. If set to 0, then the 2 seconds default will be used") - cmd.Flags().DurationVar(&rpClientTimeout, "cluster-connection-timeout", 10*time.Second, "Set the timeout for internal clients used to connect to Redpanda clusters") - cmd.Flags().BoolVar(&vectorizedv1alpha1.AllowDownscalingInWebhook, "allow-downscaling", true, "Allow to reduce the number of replicas in existing clusters") - cmd.Flags().Bool("allow-pvc-deletion", false, "Deprecated: Ignored if specified") - cmd.Flags().BoolVar(&vectorizedv1alpha1.AllowConsoleAnyNamespace, "allow-console-any-ns", false, "Allow to create Console in any namespace. Allowing this copies Redpanda SchemaRegistry TLS Secret to namespace (alpha feature)") - cmd.Flags().StringVar(&restrictToRedpandaVersion, "restrict-redpanda-version", "", "Restrict management of clusters to those with this version") - cmd.Flags().StringVar(&vectorizedv1alpha1.SuperUsersPrefix, "superusers-prefix", "", "Prefix to add in username of superusers managed by operator. This will only affect new clusters, enabling this will not add prefix to existing clusters (alpha feature)") - cmd.Flags().StringVar(&namespace, "namespace", "", "If namespace is set to not empty value, it changes scope of Redpanda operator to work in single namespace") - cmd.Flags().BoolVar(&ghostbuster, "unsafe-decommission-failed-brokers", false, "Set to enable decommissioning a failed broker that is configured but does not exist in the StatefulSet (ghost broker). This may result in invalidating valid data") - _ = cmd.Flags().MarkHidden("unsafe-decommission-failed-brokers") - cmd.Flags().StringSliceVar(&additionalControllers, "additional-controllers", []string{""}, fmt.Sprintf("which controllers to run, available: all, %s", strings.Join(availableControllers, ", "))) - cmd.Flags().BoolVar(&operatorMode, "operator-mode", true, "enables to run as an operator, setting this to false will disable cluster (deprecated), redpanda resources reconciliation.") - cmd.Flags().DurationVar(&unbindPVCsAfter, "unbind-pvcs-after", 0, "if not zero, runs the PVCUnbinder controller which attempts to 'unbind' the PVCs' of Pods that are Pending for longer than the given duration") - cmd.Flags().BoolVar(&allowPVRebinding, "allow-pv-rebinding", false, "controls whether or not PVs unbound by the PVCUnbinder have their .ClaimRef cleared, which allows them to be reused") - cmd.Flags().Var(&unbinderSelector, "unbinder-label-selector", "if provided, a Kubernetes label selector that will filter Pods to be considered by the PVCUnbinder.") - cmd.Flags().BoolVar(&autoDeletePVCs, "auto-delete-pvcs", false, "Use StatefulSet PersistentVolumeClaimRetentionPolicy to auto delete PVCs on scale down and Cluster resource delete.") - cmd.Flags().StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.") - cmd.Flags().StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.") - cmd.Flags().StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.") - cmd.Flags().StringVar(&metricsCertPath, "metrics-cert-path", "", "The directory that contains the metrics server certificate.") - cmd.Flags().StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.") - cmd.Flags().StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.") - cmd.Flags().BoolVar(&enableGhostBrokerDecommissioner, "enable-ghost-broker-decommissioner", false, "Enable ghost broker decommissioner.") - cmd.Flags().DurationVar(&ghostBrokerDecommissionerSyncPeriod, "ghost-broker-decommissioner-sync-period", time.Minute*5, "Ghost broker sync period. The Ghost Broker Decommissioner is guaranteed to be called after this period.") - - // secret store related flags - cmd.Flags().BoolVar(&cloudSecretsEnabled, "enable-cloud-secrets", false, "Set to true if config values can reference secrets from cloud secret store") - cmd.Flags().StringVar(&cloudSecretsPrefix, "cloud-secrets-prefix", "", "Prefix for all names of cloud secrets") - cmd.Flags().StringVar(&cloudSecretsAWSRegion, "cloud-secrets-aws-region", "", "AWS Region in which the secrets are stored") - cmd.Flags().StringVar(&cloudSecretsAWSRoleARN, "cloud-secrets-aws-role-arn", "", "AWS role ARN to assume when fetching secrets") - cmd.Flags().StringVar(&cloudSecretsGCPProjectID, "cloud-secrets-gcp-project-id", "", "GCP project ID in which the secrets are stored") - cmd.Flags().StringVar(&cloudSecretsAzureKeyVaultURI, "cloud-secrets-azure-key-vault-uri", "", "Azure Key Vault URI in which the secrets are stored") - - // Deprecated flags. - cmd.Flags().Bool("debug", false, "A deprecated and unused flag") - cmd.Flags().String("events-addr", "", "A deprecated and unused flag") - cmd.Flags().Bool("enable-helm-controllers", false, "A deprecated and unused flag") - cmd.Flags().String("helm-repository-url", "https://charts.redpanda.com/", "A deprecated and unused flag") - cmd.Flags().Bool("force-defluxed-mode", false, "A deprecated and unused flag") + options.BindFlags(cmd) return cmd } @@ -313,50 +272,11 @@ func (f *v1Fetcher) FetchLatest(ctx context.Context, name, namespace string) (an //nolint:funlen,gocyclo // length looks good func Run( ctx context.Context, - clusterDomain string, - metricsAddr string, - secureMetrics bool, - enableHTTP2 bool, - probeAddr string, - enableLeaderElection bool, - webhookEnabled bool, - configuratorBaseImage string, - configuratorTag string, - configuratorImagePullPolicy string, - decommissionWaitInterval time.Duration, - metricsTimeout time.Duration, - restrictToRedpandaVersion string, - namespace string, - additionalControllers []string, - operatorMode bool, - ghostbuster bool, - unbindPVCsAfter time.Duration, - unbinderSelector labels.Selector, - allowPVRebinding bool, - autoDeletePVCs bool, - pprofAddr string, - webhookCertPath string, - webhookCertName string, - webhookCertKey string, - metricsCertPath string, - metricsCertName string, - metricsCertKey string, - enableGhostBrokerDecommissioner bool, - ghostBrokerDecommissionerSyncPeriod time.Duration, cloudExpander *pkgsecrets.CloudExpander, - cloudSecretsEnabled bool, - cloudSecretsPrefix string, - cloudSecretsAWSRegion string, - cloudSecretsAWSRoleARN string, - cloudSecretsGCPProjectID string, - cloudSecretsAzureKeyVaultURI string, - rpClientTimeout time.Duration, + opts *RunOptions, ) error { setupLog := ctrl.LoggerFrom(ctx).WithName("setup") - // set the managedFields owner for resources reconciled from Helm charts - helmkube.ManagedFieldsManager = controllerName - // if the enable-http2 flag is false (the default), http/2 should be disabled // due to its vulnerabilities. More specifically, disabling http/2 will // prevent from being vulnerable to the HTTP/2 Stream Cancellation and @@ -369,7 +289,7 @@ func Run( } var tlsOpts []func(*tls.Config) - if !enableHTTP2 { + if !opts.enableHTTP2 { tlsOpts = append(tlsOpts, disableHTTP2) } @@ -377,22 +297,22 @@ func Run( var metricsCertWatcher, webhookCertWatcher *certwatcher.CertWatcher var webhookServer webhook.Server - if webhookEnabled { + if opts.webhookEnabled { // Initial webhook TLS options webhookTLSOpts := tlsOpts - if len(webhookCertPath) > 0 { + if len(opts.webhookCertPath) > 0 { setupLog.Info("Initializing webhook certificate watcher using provided certificates", - "webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey) + "webhook-cert-path", opts.webhookCertPath, "webhook-cert-name", opts.webhookCertName, "webhook-cert-key", opts.webhookCertKey) var err error webhookCertWatcher, err = certwatcher.New( - filepath.Join(webhookCertPath, webhookCertName), - filepath.Join(webhookCertPath, webhookCertKey), + filepath.Join(opts.webhookCertPath, opts.webhookCertName), + filepath.Join(opts.webhookCertPath, opts.webhookCertKey), ) if err != nil { setupLog.Error(err, "Failed to initialize webhook certificate watcher") - os.Exit(1) + return err } go func() { setupLog.Error(webhookCertWatcher.Start(ctx), "webhook cert watcher exits") @@ -413,12 +333,12 @@ func Run( // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.4/pkg/metrics/server // - https://book.kubebuilder.io/reference/metrics.html metricsServerOptions := metricsserver.Options{ - BindAddress: metricsAddr, - SecureServing: secureMetrics, + BindAddress: opts.managerOptions.Metrics.BindAddress, + SecureServing: opts.secureMetrics, TLSOpts: tlsOpts, } - if secureMetrics { + if opts.secureMetrics { // FilterProvider is used to protect the metrics endpoint with authn/authz. // These configurations ensure that only authorized users and service accounts // can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info: @@ -434,18 +354,18 @@ func Run( // - [METRICS-WITH-CERTS] at config/default/kustomization.yaml to generate and use certificates // managed by cert-manager for the metrics server. // - [PROMETHEUS-WITH-CERTS] at config/prometheus/kustomization.yaml for TLS certification. - if len(metricsCertPath) > 0 { + if len(opts.metricsCertPath) > 0 { setupLog.Info("Initializing metrics certificate watcher using provided certificates", - "metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey) + "metrics-cert-path", opts.metricsCertPath, "metrics-cert-name", opts.metricsCertName, "metrics-cert-key", opts.metricsCertKey) var err error metricsCertWatcher, err = certwatcher.New( - filepath.Join(metricsCertPath, metricsCertName), - filepath.Join(metricsCertPath, metricsCertKey), + filepath.Join(opts.metricsCertPath, opts.metricsCertName), + filepath.Join(opts.metricsCertPath, opts.metricsCertKey), ) if err != nil { setupLog.Error(err, "to initialize metrics certificate watcher", "error", err) - os.Exit(1) + return err } go func() { setupLog.Error(metricsCertWatcher.Start(ctx), "metrics cert watcher exits") @@ -456,59 +376,56 @@ func Run( }) } - mgrOptions := ctrl.Options{ - HealthProbeBindAddress: probeAddr, - LeaderElection: enableLeaderElection, - LeaderElectionID: "aa9fc693.vectorized.io", - LeaderElectionNamespace: namespace, - Metrics: metricsServerOptions, - PprofBindAddress: pprofAddr, - WebhookServer: webhookServer, + // Set options that are not or cannot be bound to CLI flags. + opts.managerOptions.WebhookServer = webhookServer + opts.managerOptions.Metrics = metricsServerOptions + opts.managerOptions.Scheme = controller.UnifiedScheme + + if opts.managerOptions.LeaderElectionNamespace == "" { + opts.managerOptions.LeaderElectionNamespace = opts.namespace } - if namespace != "" { - mgrOptions.Cache.DefaultNamespaces = map[string]cache.Config{namespace: {}} + + if opts.namespace != "" { + opts.managerOptions.Cache.DefaultNamespaces = map[string]cache.Config{opts.namespace: {}} } - configurator := resources.ConfiguratorSettings{ - ConfiguratorBaseImage: configuratorBaseImage, - ConfiguratorTag: configuratorTag, - ImagePullPolicy: corev1.PullPolicy(configuratorImagePullPolicy), - CloudSecretsEnabled: cloudSecretsEnabled, - CloudSecretsPrefix: cloudSecretsPrefix, - CloudSecretsAWSRegion: cloudSecretsAWSRegion, - CloudSecretsAWSRoleARN: cloudSecretsAWSRoleARN, - CloudSecretsGCPProjectID: cloudSecretsGCPProjectID, - CloudSecretsAzureKeyVaultURI: cloudSecretsAzureKeyVaultURI, + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), opts.managerOptions) + if err != nil { + setupLog.Error(err, "Unable to start manager") + return err } // init running state values if we are not in operator mode var operatorRunningState OperatorState - if namespace != "" { + if opts.namespace != "" { operatorRunningState = NamespaceControllerMode } // but if we are in operator mode, then the run state is different - if operatorMode { + if opts.operatorMode { operatorRunningState = OperatorV1Mode - if namespace != "" { + if opts.namespace != "" { operatorRunningState = OperatorV2Mode } } - scheme := controller.UnifiedScheme - mgrOptions.Scheme = scheme - - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOptions) - if err != nil { - setupLog.Error(err, "Unable to start manager") - return err - } - // Now we start different processes depending on state switch operatorRunningState { case OperatorV1Mode: ctrl.Log.Info("running in v1", "mode", OperatorV1Mode) + configurator := resources.ConfiguratorSettings{ + ConfiguratorBaseImage: opts.configuratorBaseImage, + ConfiguratorTag: opts.configuratorTag, + ImagePullPolicy: corev1.PullPolicy(opts.configuratorImagePullPolicy), + CloudSecretsEnabled: opts.cloudSecretsEnabled, + CloudSecretsPrefix: opts.cloudSecretsPrefix, + CloudSecretsAWSRegion: opts.cloudSecretsConfig.AWSRegion, + CloudSecretsAWSRoleARN: opts.cloudSecretsConfig.AWSRoleARN, + CloudSecretsGCPProjectID: opts.cloudSecretsConfig.GCPProjectID, + CloudSecretsAzureKeyVaultURI: opts.cloudSecretsConfig.AzureKeyVaultURI, + } + adminAPIClientFactory := adminutils.CachedNodePoolAdminAPIClientFactory(adminutils.NewNodePoolInternalAdminAPI) if err = (&vectorizedcontrollers.ClusterReconciler{ @@ -516,14 +433,14 @@ func Run( Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"), Scheme: mgr.GetScheme(), AdminAPIClientFactory: adminAPIClientFactory, - DecommissionWaitInterval: decommissionWaitInterval, - MetricsTimeout: metricsTimeout, - RestrictToRedpandaVersion: restrictToRedpandaVersion, - GhostDecommissioning: ghostbuster, - AutoDeletePVCs: autoDeletePVCs, + DecommissionWaitInterval: opts.decommissionWaitInterval, + MetricsTimeout: opts.metricsTimeout, + RestrictToRedpandaVersion: opts.restrictToRedpandaVersion, + GhostDecommissioning: opts.ghostbuster, + AutoDeletePVCs: opts.autoDeletePVCs, CloudSecretsExpander: cloudExpander, - Timeout: rpClientTimeout, - }).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil { + Timeout: opts.rpClientTimeout, + }).WithClusterDomain(opts.clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Unable to create controller", "controller", "Cluster") return err } @@ -542,14 +459,14 @@ func Run( Store: consolepkg.NewStore(mgr.GetClient(), mgr.GetScheme()), EventRecorder: mgr.GetEventRecorderFor("Console"), KafkaAdminClientFactory: consolepkg.NewKafkaAdmin, - }).WithClusterDomain(clusterDomain).SetupWithManager(mgr); err != nil { + }).WithClusterDomain(opts.clusterDomain).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Console") return err } if err = (&redpandacontrollers.TopicReconciler{ Client: mgr.GetClient(), - Factory: internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient()).WithAdminClientTimeout(rpClientTimeout), + Factory: internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient()).WithAdminClientTimeout(opts.rpClientTimeout), Scheme: mgr.GetScheme(), EventRecorder: mgr.GetEventRecorderFor("TopicReconciler"), }).SetupWithManager(mgr); err != nil { @@ -558,7 +475,7 @@ func Run( } // Setup webhooks - if webhookEnabled { + if opts.webhookEnabled { setupLog.Info("Setup webhook") if err = (&vectorizedv1alpha1.Cluster{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "Unable to create webhook", "webhook", "RedpandaCluster") @@ -568,32 +485,32 @@ func Run( hookServer.Register("/mutate-redpanda-vectorized-io-v1alpha1-console", &webhook.Admission{ Handler: &redpandawebhooks.ConsoleDefaulter{ Client: mgr.GetClient(), - Decoder: admission.NewDecoder(scheme), + Decoder: admission.NewDecoder(mgr.GetScheme()), }, }) hookServer.Register("/validate-redpanda-vectorized-io-v1alpha1-console", &webhook.Admission{ Handler: &redpandawebhooks.ConsoleValidator{ Client: mgr.GetClient(), - Decoder: admission.NewDecoder(scheme), + Decoder: admission.NewDecoder(mgr.GetScheme()), }, }) } case OperatorV2Mode: - ctrl.Log.Info("running in v2", "mode", OperatorV2Mode, "namespace", namespace) + ctrl.Log.Info("running in v2", "mode", OperatorV2Mode, "namespace", opts.namespace) - factory := internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient()).WithAdminClientTimeout(rpClientTimeout) + factory := internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient()).WithAdminClientTimeout(opts.rpClientTimeout) cloudSecrets := lifecycle.CloudSecretsFlags{ - CloudSecretsEnabled: cloudSecretsEnabled, - CloudSecretsPrefix: cloudSecretsPrefix, - CloudSecretsAWSRegion: cloudSecretsAWSRegion, - CloudSecretsAWSRoleARN: cloudSecretsAWSRoleARN, - CloudSecretsGCPProjectID: cloudSecretsGCPProjectID, - CloudSecretsAzureKeyVaultURI: cloudSecretsAzureKeyVaultURI, + CloudSecretsEnabled: opts.cloudSecretsEnabled, + CloudSecretsPrefix: opts.cloudSecretsPrefix, + CloudSecretsAWSRegion: opts.cloudSecretsConfig.AWSRegion, + CloudSecretsAWSRoleARN: opts.cloudSecretsConfig.AWSRoleARN, + CloudSecretsGCPProjectID: opts.cloudSecretsConfig.GCPProjectID, + CloudSecretsAzureKeyVaultURI: opts.cloudSecretsConfig.AzureKeyVaultURI, } redpandaImage := lifecycle.Image{ - Repository: configuratorBaseImage, - Tag: configuratorTag, + Repository: opts.configuratorBaseImage, + Tag: opts.configuratorTag, } // Redpanda Reconciler @@ -629,28 +546,28 @@ func Run( return err } - if runThisController(NodeController, additionalControllers) { + if opts.ControllerEnabled(NodeController) { if err = (&nodewatcher.RedpandaNodePVCReconciler{ Client: mgr.GetClient(), - OperatorMode: operatorMode, + OperatorMode: opts.operatorMode, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "RedpandaNodePVCReconciler") return err } } - if runThisController(DecommissionController, additionalControllers) { + if opts.ControllerEnabled(DecommissionController) { if err = (&olddecommission.DecommissionReconciler{ Client: mgr.GetClient(), - OperatorMode: operatorMode, - DecommissionWaitInterval: decommissionWaitInterval, + OperatorMode: opts.operatorMode, + DecommissionWaitInterval: opts.decommissionWaitInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DecommissionReconciler") return err } } - if webhookEnabled { + if opts.webhookEnabled { setupLog.Info("Setup Redpanda conversion webhook") if err = (&redpandav1alpha2.Redpanda{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "Unable to create webhook", "webhook", "RedpandaConversion") @@ -659,22 +576,22 @@ func Run( } case NamespaceControllerMode: - ctrl.Log.Info("running as a namespace controller", "mode", NamespaceControllerMode, "namespace", namespace) - if runThisController(NodeController, additionalControllers) { + ctrl.Log.Info("running as a namespace controller", "mode", NamespaceControllerMode, "namespace", opts.namespace) + if opts.ControllerEnabled(NodeController) { if err = (&nodewatcher.RedpandaNodePVCReconciler{ Client: mgr.GetClient(), - OperatorMode: operatorMode, + OperatorMode: opts.operatorMode, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "RedpandaNodePVCReconciler") return err } } - if runThisController(DecommissionController, additionalControllers) { + if opts.ControllerEnabled(DecommissionController) { if err = (&olddecommission.DecommissionReconciler{ Client: mgr.GetClient(), - OperatorMode: operatorMode, - DecommissionWaitInterval: decommissionWaitInterval, + OperatorMode: opts.operatorMode, + DecommissionWaitInterval: opts.decommissionWaitInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DecommissionReconciler") return err @@ -687,25 +604,25 @@ func Run( } // The unbinder gets to run in any mode, if it's enabled. - if unbindPVCsAfter <= 0 { - setupLog.Info("PVCUnbinder controller not active", "unbind-after", unbindPVCsAfter, "selector", unbinderSelector, "allow-pv-rebinding", allowPVRebinding) + if opts.unbindPVCsAfter <= 0 { + setupLog.Info("PVCUnbinder controller not active", "unbind-after", opts.unbindPVCsAfter, "selector", opts.unbinderSelector, "allow-pv-rebinding", opts.allowPVRebinding) } else { - setupLog.Info("starting PVCUnbinder controller", "unbind-after", unbindPVCsAfter, "selector", unbinderSelector, "allow-pv-rebinding", allowPVRebinding) + setupLog.Info("starting PVCUnbinder controller", "unbind-after", opts.unbindPVCsAfter, "selector", opts.unbinderSelector, "allow-pv-rebinding", opts.allowPVRebinding) if err := (&pvcunbinder.Controller{ Client: mgr.GetClient(), - Timeout: unbindPVCsAfter, - Selector: unbinderSelector, - AllowRebinding: allowPVRebinding, + Timeout: opts.unbindPVCsAfter, + Selector: opts.unbinderSelector.Selector, + AllowRebinding: opts.allowPVRebinding, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PVCUnbinder") return err } } - if enableGhostBrokerDecommissioner { + if opts.enableGhostBrokerDecommissioner { d := decommissioning.NewStatefulSetDecommissioner(mgr, &v1Fetcher{client: mgr.GetClient()}, - decommissioning.WithSyncPeriod(ghostBrokerDecommissionerSyncPeriod), + decommissioning.WithSyncPeriod(opts.ghostBrokerDecommissionerSyncPeriod), decommissioning.WithCleanupPVCs(false), // In Operator v1, decommissioning based on pod ordinal is not correct because // it has controller code that manages decommissioning. If something else decommissions the node, it can not deal with this under all circumstances because of various reasons, eg. bercause of a protection against stale status reads of status.currentReplicas @@ -832,7 +749,7 @@ func Run( return err } - if webhookEnabled { + if opts.webhookEnabled { hookServer := mgr.GetWebhookServer() if err := mgr.AddReadyzCheck("webhook", hookServer.StartedChecker()); err != nil { setupLog.Error(err, "unable to create ready check") @@ -854,16 +771,3 @@ func Run( return nil } - -func runThisController(rc RedpandaController, controllers []string) bool { - if len(controllers) == 0 { - return false - } - - for _, c := range controllers { - if RedpandaController(c) == AllControllers || RedpandaController(c) == rc { - return true - } - } - return false -} diff --git a/operator/pkg/secrets/secrets.go b/operator/pkg/secrets/secrets.go index ff5f02bae..5397418de 100644 --- a/operator/pkg/secrets/secrets.go +++ b/operator/pkg/secrets/secrets.go @@ -33,38 +33,39 @@ func NewCloudExpander( logger := log.FromContext(ctx) slogLogger := slog.New(logr.ToSlogHandler(logger.WithName("slog").WithValues("mode", "slog"))) - var secretsAPI secrets.SecretAPI var err error - if cloudConfig.AWSRegion != "" { + var secretsAPI secrets.SecretAPI + + switch { + case cloudConfig.AWSRegion != "": secretsAPI, err = secrets.NewAWSSecretsManager( ctx, slogLogger, cloudConfig.AWSRegion, cloudConfig.AWSRoleARN, ) - if err != nil { - return nil, err - } - } else if cloudConfig.GCPProjectID != "" { + + case cloudConfig.GCPProjectID != "": secretsAPI, err = secrets.NewGCPSecretsManager( ctx, slogLogger, cloudConfig.GCPProjectID, ) - if err != nil { - return nil, err - } - } else if cloudConfig.AzureKeyVaultURI != "" { + + case cloudConfig.AzureKeyVaultURI != "": secretsAPI, err = secrets.NewAzSecretsManager( slogLogger, cloudConfig.AzureKeyVaultURI, ) - if err != nil { - return nil, err - } - } else { - return nil, errors.New("incorrect cloud secret store configuration") + + default: + return nil, errors.New("failed to construct SecretAPI: none of AWSRegion, GCPProjectID, nor AzureKeyVaultURI provided") } + + if err != nil { + return nil, errors.Wrapf(err, "constructing %T", secretsAPI) + } + provider, err := secrets.NewSecretProvider(secretsAPI, prefix, "") if err != nil { return nil, errors.WithStack(err)