From 5ede0f2c05092e044ca061d69c08d11430a2af2f Mon Sep 17 00:00:00 2001 From: Chris Seto Date: Mon, 13 Oct 2025 15:41:46 -0400 Subject: [PATCH] operator: various sidecar fixes Prior to this commit the operator sidecar's decommissioner and pvcunbinder controllers did not work. This was due to: - RBAC issues, the sidecar did not correctly scope itself to a single namespace. - Incorrect label selectors hidden within the controllers in question. Additionally, the statefulset decommissioner's sole test case has been disabled for quite sometime. There's been zero test coverage of this functionality. This commit: - Restores the decommissioner's tests to a working state - Strips out the "fetcher" to reduce duplication and remove reliance on fetching live helm values. - Replaces baked in filtering with a label selector argument that will be constructed by the helm chart. A follow up commit with chart changes and acceptance tests will be submitted. It's been made separate to ease the process of backporting to the v2.x.x branches. (cherry picked from commit 03dd39424afd706da4d7b74d07e296a536aeb0d0) # Conflicts: # operator/cmd/run/run.go # operator/go.mod # pkg/go.mod --- Taskfile.yml | 8 + operator/cmd/run/run.go | 164 ++---------------- operator/cmd/run/vectorized.go | 156 +++++++++++++++++ operator/cmd/sidecar/sidecar.go | 70 ++++++-- operator/go.mod | 2 +- .../decommissioning/chained_fetcher.go | 51 ------ .../controller/decommissioning/fetcher.go | 20 --- .../decommissioning/helm_fetcher.go | 139 --------------- .../decommissioning/redpanda_fetcher.go | 41 ----- .../decommissioning/rpk_profile_fetcher.go | 58 ------- .../statefulset_decomissioner.go | 147 ++++++---------- .../statefulset_decommissioner_test.go | 67 ++++--- .../controller/pvcunbinder/pvcunbinder.go | 52 ++---- operator/internal/testenv/testenv.go | 11 +- .../03-delete-redpandas.yaml | 2 +- pkg/go.mod | 2 +- pkg/pflagutil/flags.go | 48 +++++ 17 files changed, 400 insertions(+), 638 deletions(-) create mode 100644 operator/cmd/run/vectorized.go delete mode 100644 operator/internal/controller/decommissioning/chained_fetcher.go delete mode 100644 operator/internal/controller/decommissioning/fetcher.go delete mode 100644 operator/internal/controller/decommissioning/helm_fetcher.go delete mode 100644 operator/internal/controller/decommissioning/redpanda_fetcher.go delete mode 100644 operator/internal/controller/decommissioning/rpk_profile_fetcher.go create mode 100644 pkg/pflagutil/flags.go diff --git a/Taskfile.yml b/Taskfile.yml index f2e9d5b2a..0335f8525 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -146,6 +146,12 @@ tasks: vars: CLI_ARGS: '--load {{.CLI_ARGS}}' + build:charts: + desc: "Run helm dep build for all charts" + cmds: + - helm repo add redpanda https://charts.redpanda.com + - helm dep build ./charts/redpanda + test:unit: desc: "Run all unit tests (~5m)" vars: @@ -161,6 +167,7 @@ tasks: # The operator image is required to test the configurator and sidecar. # In integration tests, the operator itself will be run from the go test process. - build:image + - build:charts cmds: - task: charts:kind-cluster - kind load docker-image localhost/redpanda-operator:dev @@ -174,6 +181,7 @@ tasks: desc: "Run all acceptance tests (~90m)" deps: - build:image + - build:charts vars: GO_TEST_RUNNER: '{{default "go test" .GO_TEST_RUNNER}}' CLI_ARGS: '{{.CLI_ARGS}} -tags=acceptance -run "^TestAcceptance" -timeout 20m -v' diff --git a/operator/cmd/run/run.go b/operator/cmd/run/run.go index 31b7fa370..3761621dc 100644 --- a/operator/cmd/run/run.go +++ b/operator/cmd/run/run.go @@ -17,27 +17,19 @@ import ( "fmt" "os" "path/filepath" - "slices" "strings" "time" "github.com/cockroachdb/errors" fluxclient "github.com/fluxcd/pkg/runtime/client" "github.com/spf13/cobra" - "github.com/spf13/pflag" helmkube "helm.sh/helm/v3/pkg/kube" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/certwatcher" - "sigs.k8s.io/controller-runtime/pkg/client" - kubeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -58,10 +50,10 @@ import ( adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin" internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" consolepkg "github.com/redpanda-data/redpanda-operator/operator/pkg/console" - pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels" "github.com/redpanda-data/redpanda-operator/operator/pkg/resources" pkgsecrets "github.com/redpanda-data/redpanda-operator/operator/pkg/secrets" redpandawebhooks "github.com/redpanda-data/redpanda-operator/operator/webhooks/redpanda" + "github.com/redpanda-data/redpanda-operator/pkg/pflagutil" ) type RedpandaController string @@ -99,32 +91,6 @@ var ( } ) -type LabelSelectorValue struct { - Selector labels.Selector -} - -var _ pflag.Value = ((*LabelSelectorValue)(nil)) - -func (s *LabelSelectorValue) Set(value string) error { - if value == "" { - return nil - } - var err error - s.Selector, err = labels.Parse(value) - return err -} - -func (s *LabelSelectorValue) String() string { - if s.Selector == nil { - return "" - } - return s.Selector.String() -} - -func (s *LabelSelectorValue) Type() string { - return "label selector" -} - // Metrics RBAC permissions // +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create; // +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create; @@ -157,7 +123,7 @@ func Command() *cobra.Command { enableHelmControllers bool ghostbuster bool unbindPVCsAfter time.Duration - unbinderSelector LabelSelectorValue + unbinderSelector pflagutil.LabelSelectorValue allowPVRebinding bool autoDeletePVCs bool forceDefluxedMode bool @@ -313,21 +279,6 @@ func Command() *cobra.Command { return cmd } -type v1Fetcher struct { - client kubeClient.Client -} - -func (f *v1Fetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) { - var vectorizedCluster vectorizedv1alpha1.Cluster - if err := f.client.Get(ctx, types.NamespacedName{ - Name: name, - Namespace: namespace, - }, &vectorizedCluster); err != nil { - return nil, err - } - return &vectorizedCluster, nil -} - //nolint:funlen,gocyclo // length looks good func Run( ctx context.Context, @@ -723,7 +674,15 @@ func Run( } if enableGhostBrokerDecommissioner { - d := decommissioning.NewStatefulSetDecommissioner(mgr, &v1Fetcher{client: mgr.GetClient()}, + factory := internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient()) + adapter := vectorizedDecommissionerAdapter{factory: factory, client: mgr.GetClient()} + d := decommissioning.NewStatefulSetDecommissioner( + mgr, + adapter.getAdminClient, + decommissioning.WithFilter(adapter.filter), + // Operator v1 supports multiple NodePools, and therefore multiple STS. + // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS. + decommissioning.WithDesiredReplicasFetcher(adapter.desiredReplicas), decommissioning.WithSyncPeriod(ghostBrokerDecommissionerSyncPeriod), decommissioning.WithCleanupPVCs(false), // In Operator v1, decommissioning based on pod ordinal is not correct because @@ -731,107 +690,6 @@ func Run( // (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139) // In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode) decommissioning.WithDecommisionOnTooHighOrdinal(false), - // Operator v1 supports multiple NodePools, and therefore multiple STS. - // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS. - decommissioning.WithDesiredReplicasFetcher(func(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) { - // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas. - idx := slices.IndexFunc( - sts.OwnerReferences, - func(ownerRef metav1.OwnerReference) bool { - return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster" - }) - if idx == -1 { - return 0, nil - } - - var vectorizedCluster vectorizedv1alpha1.Cluster - if err := mgr.GetClient().Get(ctx, types.NamespacedName{ - Name: sts.OwnerReferences[idx].Name, - Namespace: sts.Namespace, - }, &vectorizedCluster); err != nil { - return 0, fmt.Errorf("could not get Cluster: %w", err) - } - - // We assume the cluster is fine and synced, checks have been performed in the filter already. - - // Get all nodepool-sts for this Cluster - var stsList appsv1.StatefulSetList - err := mgr.GetClient().List(ctx, &stsList, &client.ListOptions{ - LabelSelector: pkglabels.ForCluster(&vectorizedCluster).AsClientSelector(), - }) - if err != nil { - return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err) - } - - if len(stsList.Items) == 0 { - return 0, errors.New("found 0 StatefulSets for this Cluster") - } - - var allReplicas int32 - for _, sts := range stsList.Items { - allReplicas += ptr.Deref(sts.Spec.Replicas, 0) - } - - // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner. - if allReplicas < 3 { - return 0, fmt.Errorf("found %d desiredReplicas, but want >= 3", allReplicas) - } - - if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas { - return 0, fmt.Errorf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas) - } - - return allReplicas, nil - }), - decommissioning.WithFactory(internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())), - decommissioning.WithFilter(func(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) { - log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter") - idx := slices.IndexFunc( - sts.OwnerReferences, - func(ownerRef metav1.OwnerReference) bool { - return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster" - }) - if idx == -1 { - return false, nil - } - - var vectorizedCluster vectorizedv1alpha1.Cluster - if err := mgr.GetClient().Get(ctx, types.NamespacedName{ - Name: sts.OwnerReferences[idx].Name, - Namespace: sts.Namespace, - }, &vectorizedCluster); err != nil { - return false, fmt.Errorf("could not get Cluster: %w", err) - } - - managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed" - if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" { - log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace) - return false, nil - } - - // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster - // (and we can therefore not use it to check if the cluster is synced otherwise) - if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas { - log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace) - return false, nil - } - if vectorizedCluster.Status.Restarting { - log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace) - return false, nil - } - - if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation { - log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration) - return false, nil - } - - if vectorizedCluster.Status.DecommissioningNode != nil { - log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode) - return false, nil - } - - return true, nil - }), ) if err := d.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner") diff --git a/operator/cmd/run/vectorized.go b/operator/cmd/run/vectorized.go new file mode 100644 index 000000000..c9ddf52f8 --- /dev/null +++ b/operator/cmd/run/vectorized.go @@ -0,0 +1,156 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package run + +import ( + "context" + "fmt" + "slices" + + "github.com/cockroachdb/errors" + "github.com/redpanda-data/common-go/rpadmin" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1" + internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" + pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels" +) + +// vectorizedDecommissionerAdapter is a helper struct that implements various methods +// of mapping StatefulSets through Vectorized Clusters to arguments for the +// StatefulSetDecommissioner. +type vectorizedDecommissionerAdapter struct { + client client.Client + factory internalclient.ClientFactory +} + +func (b *vectorizedDecommissionerAdapter) desiredReplicas(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) { + // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas. + vectorizedCluster, err := b.getCluster(ctx, sts) + if err != nil { + return 0, err + } + + if vectorizedCluster == nil { + return 0, nil + } + + // We assume the cluster is fine and synced, checks have been performed in the filter already. + + // Get all nodepool-sts for this Cluster + var stsList appsv1.StatefulSetList + if err := b.client.List(ctx, &stsList, &client.ListOptions{ + LabelSelector: pkglabels.ForCluster(vectorizedCluster).AsClientSelector(), + Namespace: vectorizedCluster.Namespace, + }); err != nil { + return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err) + } + + if len(stsList.Items) == 0 { + return 0, errors.New("found 0 StatefulSets for this Cluster") + } + + var allReplicas int32 + for _, sts := range stsList.Items { + allReplicas += ptr.Deref(sts.Spec.Replicas, 0) + } + + // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner. + if allReplicas < 3 { + return 0, errors.Newf("found %d desiredReplicas, but want >= 3", allReplicas) + } + + if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas { + return 0, errors.Newf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas) + } + + return allReplicas, nil +} + +func (b *vectorizedDecommissionerAdapter) filter(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) { + log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter") + + vectorizedCluster, err := b.getCluster(ctx, sts) + if err != nil { + return false, err + } + + if vectorizedCluster == nil { + return false, nil + } + + managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed" + if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" { + log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace) + return false, nil + } + + // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster + // (and we can therefore not use it to check if the cluster is synced otherwise) + if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas { + log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace) + return false, nil + } + if vectorizedCluster.Status.Restarting { + log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace) + return false, nil + } + + if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation { + log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration) + return false, nil + } + + if vectorizedCluster.Status.DecommissioningNode != nil { + log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode) + return false, nil + } + + return true, nil +} + +func (b *vectorizedDecommissionerAdapter) getAdminClient(ctx context.Context, sts *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) { + cluster, err := b.getCluster(ctx, sts) + if err != nil { + return nil, err + } + + if cluster == nil { + return nil, errors.Newf("failed to resolve %s/%s to vectorized cluster", sts.Namespace, sts.Name) + } + + return b.factory.RedpandaAdminClient(ctx, cluster) +} + +func (b *vectorizedDecommissionerAdapter) getCluster(ctx context.Context, sts *appsv1.StatefulSet) (*vectorizedv1alpha1.Cluster, error) { + idx := slices.IndexFunc( + sts.OwnerReferences, + func(ownerRef metav1.OwnerReference) bool { + return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster" + }) + if idx == -1 { + return nil, nil + } + + var vectorizedCluster vectorizedv1alpha1.Cluster + if err := b.client.Get(ctx, types.NamespacedName{ + Name: sts.OwnerReferences[idx].Name, + Namespace: sts.Namespace, + }, &vectorizedCluster); err != nil { + return nil, errors.Wrap(err, "could not get Cluster") + } + + return &vectorizedCluster, nil +} diff --git a/operator/cmd/sidecar/sidecar.go b/operator/cmd/sidecar/sidecar.go index 1e9d8e63d..9c7a3878e 100644 --- a/operator/cmd/sidecar/sidecar.go +++ b/operator/cmd/sidecar/sidecar.go @@ -15,11 +15,18 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/redpanda-data/common-go/rpadmin" + rpkadminapi "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + rpkconfig "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/spf13/afero" "github.com/spf13/cobra" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/redpanda-data/redpanda-operator/operator/internal/configwatcher" @@ -27,6 +34,7 @@ import ( "github.com/redpanda-data/redpanda-operator/operator/internal/controller/pvcunbinder" "github.com/redpanda-data/redpanda-operator/operator/internal/probes" internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" + "github.com/redpanda-data/redpanda-operator/pkg/pflagutil" ) // +kubebuilder:rbac:groups=coordination.k8s.io,namespace=default,resources=leases,verbs=get;list;watch;create;update;patch;delete @@ -56,6 +64,7 @@ func Command() *cobra.Command { brokerProbeBrokerURL string runUnbinder bool unbinderTimeout time.Duration + selector pflagutil.LabelSelectorValue panicAfter time.Duration ) @@ -86,6 +95,7 @@ func Command() *cobra.Command { brokerProbeBrokerURL, runUnbinder, unbinderTimeout, + selector.Selector, panicAfter, ) }, @@ -102,6 +112,7 @@ func Command() *cobra.Command { // cluster flags cmd.Flags().StringVar(&clusterNamespace, "redpanda-cluster-namespace", "", "The namespace of the cluster that this sidecar manages.") cmd.Flags().StringVar(&clusterName, "redpanda-cluster-name", "", "The name of the cluster that this sidecar manages.") + cmd.Flags().Var(&selector, "selector", "Kubernetes label selector that will filter objects to be considered by the all controllers run by the sidecar.") // decommission flags cmd.Flags().BoolVar(&runDecommissioner, "run-decommissioner", false, "Specifies if the sidecar should run the broker decommissioner.") @@ -153,10 +164,14 @@ func Run( brokerProbeBrokerURL string, runUnbinder bool, unbinderTimeout time.Duration, + selector labels.Selector, panicAfter time.Duration, ) error { setupLog := ctrl.LoggerFrom(ctx).WithName("setup") + // Required arguments check, in sidecar mode these MUST be specified to + // ensure the sidecar only affects the helm deployment that's deployed it. + if clusterNamespace == "" { err := errors.New("must specify a cluster-namespace parameter") setupLog.Error(err, "no cluster namespace provided") @@ -169,6 +184,20 @@ func Run( return err } + if selector == nil || selector.Empty() { + // Use a sensible default that's about as correct than the previous + // hard coded values. Hardcoding of name=redpanda is incorrect when + // nameoverride is used. + var err error + selector, err = labels.Parse(fmt.Sprintf( + "apps.kubernetes.io/component,app.kubernetes.io/name=redpanda,app.kubernetes.io/instance=%s", + clusterName, + )) + if err != nil { + panic(err) + } + } + scheme := runtime.NewScheme() for _, fn := range schemes { @@ -183,6 +212,12 @@ func Run( LeaderElectionID: clusterName + "." + clusterNamespace + ".redpanda", Scheme: scheme, LeaderElectionNamespace: clusterNamespace, + Cache: cache.Options{ + // Only watch the specified namespace, we don't have permissions for watch at the ClusterScope. + DefaultNamespaces: map[string]cache.Config{ + clusterNamespace: {}, + }, + }, }) if err != nil { setupLog.Error(err, "unable to initialize manager") @@ -190,28 +225,41 @@ func Run( } if runDecommissioner { - fetcher := decommissioning.NewChainedFetcher( - // prefer RPK profile first and then move on to fetch from helm values - decommissioning.NewRPKProfileFetcher(redpandaYAMLPath), - decommissioning.NewHelmFetcher(mgr), - ) + setupLog.Info("broker decommissioner enabled", "namespace", clusterNamespace, "cluster", clusterName, "selector", selector) - if err := decommissioning.NewStatefulSetDecommissioner(mgr, fetcher, []decommissioning.Option{ - decommissioning.WithFilter(decommissioning.FilterStatefulSetOwner(clusterNamespace, clusterName)), + fs := afero.NewOsFs() + + params := rpkconfig.Params{ConfigFlag: redpandaYAMLPath} + + config, err := params.Load(afero.NewOsFs()) + if err != nil { + return err + } + + if err := decommissioning.NewStatefulSetDecommissioner( + mgr, + func(ctx context.Context, _ *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) { + // Always use the config that's loaded from redpanda.yaml, in + // sidecar mode no other STS's should be watched. + return rpkadminapi.NewClient(fs, config.VirtualProfile()) + }, + decommissioning.WithSelector(selector), decommissioning.WithRequeueTimeout(decommissionRequeueTimeout), decommissioning.WithDelayedCacheInterval(decommissionVoteInterval), decommissioning.WithDelayedCacheMaxCount(decommissionMaxVoteCount), - }...).SetupWithManager(mgr); err != nil { + ).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner") return err } } if runUnbinder { + setupLog.Info("PVC unbinder enabled", "namespace", clusterNamespace, "selector", selector) + if err := (&pvcunbinder.Controller{ - Client: mgr.GetClient(), - Timeout: unbinderTimeout, - Filter: pvcunbinder.FilterPodOwner(clusterNamespace, clusterName), + Client: mgr.GetClient(), + Timeout: unbinderTimeout, + Selector: selector, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PVCUnbinder") return err diff --git a/operator/go.mod b/operator/go.mod index 962a76582..15dd7c60a 100644 --- a/operator/go.mod +++ b/operator/go.mod @@ -44,7 +44,6 @@ require ( github.com/scalalang2/golang-fifo v1.0.2 github.com/spf13/afero v1.11.0 github.com/spf13/cobra v1.8.1 - github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.33.0 github.com/testcontainers/testcontainers-go/modules/redpanda v0.32.0 @@ -384,6 +383,7 @@ require ( github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/cast v1.7.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.18.1 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect diff --git a/operator/internal/controller/decommissioning/chained_fetcher.go b/operator/internal/controller/decommissioning/chained_fetcher.go deleted file mode 100644 index a21764692..000000000 --- a/operator/internal/controller/decommissioning/chained_fetcher.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "context" - "fmt" - - "github.com/cockroachdb/errors" -) - -// ChainedFetcher delegates fetching behavior to a list of sub fetchers -// moving down the list if an error occurs in the previous fetcher. -type ChainedFetcher struct { - fetchers []Fetcher -} - -var _ Fetcher = (*ChainedFetcher)(nil) - -func NewChainedFetcher(fetchers ...Fetcher) *ChainedFetcher { - return &ChainedFetcher{fetchers: fetchers} -} - -func (c *ChainedFetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) { - if len(c.fetchers) == 0 { - return nil, errors.New("chained fetcher does not have any supplied sub-fetchers") - } - - errs := []error{} - - for _, fetcher := range c.fetchers { - object, err := fetcher.FetchLatest(ctx, name, namespace) - if err != nil { - errs = append(errs, err) - continue - } - - if object != nil { - return object, nil - } - } - - return nil, fmt.Errorf("all sub-fetchers failed: %w", errors.Join(errs...)) -} diff --git a/operator/internal/controller/decommissioning/fetcher.go b/operator/internal/controller/decommissioning/fetcher.go deleted file mode 100644 index 5a8618694..000000000 --- a/operator/internal/controller/decommissioning/fetcher.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "context" -) - -// Fetcher acts as a mechanism for fetching an object that can be used in initializing -// a connection to Redpanda. This can come in the form of a Redpanda CR or an RPK profile. -type Fetcher interface { - FetchLatest(ctx context.Context, name, namespace string) (any, error) -} diff --git a/operator/internal/controller/decommissioning/helm_fetcher.go b/operator/internal/controller/decommissioning/helm_fetcher.go deleted file mode 100644 index f303aaa75..000000000 --- a/operator/internal/controller/decommissioning/helm_fetcher.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "bytes" - "compress/gzip" - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - - "github.com/cockroachdb/errors" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2" -) - -// the format logic for helm releases can be found: -// https://github.com/helm/helm/blob/2cea1466d3c27491364eb44bafc7be1ca5461b2d/pkg/storage/driver/util.go#L58 - -var gzipHeader = []byte{0x1f, 0x8b, 0x08} - -// HelmFetcher fetches a Redpanda CR via initializing it virtually with a -// Helm values file stored in a secret. This is to maintain backwards -// compatibility with our current mechanism for decommissioning, but -// it should likely be dropped in the future with preference to using -// an RPK profile. -type HelmFetcher struct { - client client.Client -} - -var _ Fetcher = (*HelmFetcher)(nil) - -func NewHelmFetcher(mgr ctrl.Manager) *HelmFetcher { - return &HelmFetcher{client: mgr.GetClient()} -} - -func (f *HelmFetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) { - log := ctrl.LoggerFrom(ctx, "namespace", namespace, "name", name).WithName("HelmFetcher.FetchLatest") - - var secrets corev1.SecretList - - if err := f.client.List(ctx, &secrets, client.MatchingLabels{ - "name": name, - "owner": "helm", - }, client.InNamespace(namespace)); err != nil { - return nil, fmt.Errorf("fetching secrets list: %w", err) - } - - latestVersion := 0 - var latestValues map[string]any - for _, item := range secrets.Items { - values, version, err := f.decode(item.Data["release"]) - if err != nil { - // just log the error and move on rather than making it terminal - // in case there's some secret that's just badly formatted - log.Error(err, "decoding secret", "secret", item.Name) - continue - } - if version > latestVersion { - latestVersion = version - latestValues = values - } - } - - if latestValues != nil { - data, err := json.Marshal(latestValues) - if err != nil { - return nil, fmt.Errorf("marshaling values: %w", err) - } - - cluster := &redpandav1alpha2.Redpanda{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - Spec: redpandav1alpha2.RedpandaSpec{ClusterSpec: &redpandav1alpha2.RedpandaClusterSpec{}}, - } - - if err := json.Unmarshal(data, cluster.Spec.ClusterSpec); err != nil { - return nil, fmt.Errorf("unmarshaling values: %w", err) - } - - return cluster, nil - } - - err := errors.New("unable to find latest value") - log.Error(err, "no secrets were decodable") - return nil, err -} - -type partialChart struct { - Config map[string]any `json:"config"` - Version int `json:"version"` -} - -func (f *HelmFetcher) decode(data []byte) (map[string]any, int, error) { - decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data))) - n, err := base64.StdEncoding.Decode(decoded, data) - if err != nil { - return nil, 0, err - } - decoded = decoded[:n] - - if len(decoded) > 3 && bytes.Equal(decoded[0:3], gzipHeader) { - reader, err := gzip.NewReader(bytes.NewReader(decoded)) - if err != nil { - return nil, 0, err - } - defer reader.Close() - unzipped, err := io.ReadAll(reader) - if err != nil { - return nil, 0, err - } - decoded = unzipped - } - - var chart partialChart - if err := json.Unmarshal(decoded, &chart); err != nil { - return nil, 0, err - } - - // We only care about the chart.Config here and not the - // merged values with the chart values because our - // client initialization code already does the merging. - return chart.Config, chart.Version, nil -} diff --git a/operator/internal/controller/decommissioning/redpanda_fetcher.go b/operator/internal/controller/decommissioning/redpanda_fetcher.go deleted file mode 100644 index 0dff57ac7..000000000 --- a/operator/internal/controller/decommissioning/redpanda_fetcher.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "context" - "fmt" - - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2" -) - -// RedpandaFetcher fetches a Redpanda cluster via CR. -type RedpandaFetcher struct { - client client.Client -} - -var _ Fetcher = (*RedpandaFetcher)(nil) - -func NewRedpandaFetcher(mgr ctrl.Manager) *RedpandaFetcher { - return &RedpandaFetcher{client: mgr.GetClient()} -} - -func (f *RedpandaFetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) { - var cluster redpandav1alpha2.Redpanda - if err := f.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &cluster); err != nil { - return nil, fmt.Errorf("fetching cluster: %w", err) - } - - return cluster, nil -} diff --git a/operator/internal/controller/decommissioning/rpk_profile_fetcher.go b/operator/internal/controller/decommissioning/rpk_profile_fetcher.go deleted file mode 100644 index 2b17155bd..000000000 --- a/operator/internal/controller/decommissioning/rpk_profile_fetcher.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "context" - "sync" - - rpkconfig "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" - "github.com/spf13/afero" -) - -// RPKProfileFetcher loads up an RPK profile used in initializing a cluster -// connection. It should only ever be used when we only care about managing -// a single cluster, as it ignores the name and the namespace handed it and -// solely uses a profile found on disk. -type RPKProfileFetcher struct { - configPath string - fs afero.Fs - profile *rpkconfig.RpkProfile - mutex sync.Mutex -} - -var _ Fetcher = (*RPKProfileFetcher)(nil) - -func NewRPKProfileFetcher(configPath string) *RPKProfileFetcher { - return &RPKProfileFetcher{configPath: configPath, fs: afero.NewOsFs()} -} - -func (f *RPKProfileFetcher) FetchLatest(_ context.Context, _, _ string) (any, error) { - f.mutex.Lock() - defer f.mutex.Unlock() - - if f.profile != nil { - // returned the memoized profile so we don't have to keep reading it, if we need - // to handle the profile on disk changing, then we should implement something like - // an fsnotify watcher that clears the memoized profile when the file changes on disk - return f.profile, nil - } - - params := rpkconfig.Params{ConfigFlag: f.configPath} - - config, err := params.Load(f.fs) - if err != nil { - return nil, err - } - - f.profile = config.VirtualProfile() - - return f.profile, nil -} diff --git a/operator/internal/controller/decommissioning/statefulset_decomissioner.go b/operator/internal/controller/decommissioning/statefulset_decomissioner.go index a73da5e0c..041b4a8b1 100644 --- a/operator/internal/controller/decommissioning/statefulset_decomissioner.go +++ b/operator/internal/controller/decommissioning/statefulset_decomissioner.go @@ -12,17 +12,18 @@ package decommissioning import ( "context" "fmt" + "regexp" "sort" "strconv" "strings" "time" - "github.com/cockroachdb/errors" "github.com/redpanda-data/common-go/rpadmin" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" @@ -34,7 +35,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" - internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" "github.com/redpanda-data/redpanda-operator/operator/pkg/client/kubernetes" "github.com/redpanda-data/redpanda-operator/operator/pkg/collections" "github.com/redpanda-data/redpanda-operator/operator/pkg/functional" @@ -44,11 +44,7 @@ const ( eventReasonBroker = "DecommissioningBroker" eventReasonUnboundPersistentVolumeClaims = "DecommissioningUnboundPersistentVolumeClaims" - k8sManagedByLabelKey = "app.kubernetes.io/managed-by" - k8sInstanceLabelKey = "app.kubernetes.io/instance" - k8sComponentLabelKey = "app.kubernetes.io/component" - k8sNameLabelKey = "app.kubernetes.io/name" - datadirVolume = "datadir" + datadirVolume = "datadir" traceLevel = 2 debugLevel = 1 @@ -62,24 +58,9 @@ const ( defaultDelayedMaxCacheCount = 2 ) -type Option func(*StatefulSetDecomissioner) +var datadirRE = regexp.MustCompile(`^datadir-(.+)-\d+$`) -func FilterStatefulSetOwner(ownerNamespace, ownerName string) func(ctx context.Context, set *appsv1.StatefulSet) (bool, error) { - filter := filterOwner(ownerNamespace, ownerName) - return func(ctx context.Context, set *appsv1.StatefulSet) (bool, error) { - return filter(set), nil - } -} - -func filterOwner(ownerNamespace, ownerName string) func(o client.Object) bool { - return func(o client.Object) bool { - labels := o.GetLabels() - if o.GetNamespace() == ownerNamespace && labels != nil && labels[k8sInstanceLabelKey] == ownerName { - return true - } - return false - } -} +type Option func(*StatefulSetDecomissioner) func WithDecommisionOnTooHighOrdinal(val bool) Option { return func(decommissioner *StatefulSetDecomissioner) { @@ -93,15 +74,15 @@ func WithDesiredReplicasFetcher(desiredReplicasFetcher func(ctx context.Context, } } -func WithFilter(filter func(ctx context.Context, set *appsv1.StatefulSet) (bool, error)) Option { +func WithSelector(selector labels.Selector) Option { return func(decommissioner *StatefulSetDecomissioner) { - decommissioner.filter = filter + decommissioner.selector = selector } } -func WithFactory(factory internalclient.ClientFactory) Option { +func WithFilter(fn func(context.Context, *appsv1.StatefulSet) (bool, error)) Option { return func(decommissioner *StatefulSetDecomissioner) { - decommissioner.factory = factory + decommissioner.filter = fn } } @@ -135,38 +116,40 @@ func WithSyncPeriod(d time.Duration) Option { } } +type ClientGetter func(context.Context, *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) + type StatefulSetDecomissioner struct { client client.Client - factory internalclient.ClientFactory - fetcher Fetcher + getAdminClient ClientGetter recorder record.EventRecorder requeueTimeout time.Duration delayedCacheInterval time.Duration delayedCacheMaxCount int delayedBrokerIDCache *CategorizedDelayedCache[types.NamespacedName, int] delayedVolumeCache *CategorizedDelayedCache[types.NamespacedName, types.NamespacedName] - filter func(ctx context.Context, set *appsv1.StatefulSet) (bool, error) cleanupPVCs bool syncPeriod time.Duration + selector labels.Selector + filter func(context.Context, *appsv1.StatefulSet) (bool, error) // decommissionOnTooHighOrdinal allows the decommissioner to decommission a node, if it has an decommissionOnTooHighOrdinal bool desiredReplicasFetcher func(ctx context.Context, set *appsv1.StatefulSet) (int32, error) } -func NewStatefulSetDecommissioner(mgr ctrl.Manager, fetcher Fetcher, options ...Option) *StatefulSetDecomissioner { +func NewStatefulSetDecommissioner(mgr ctrl.Manager, getter ClientGetter, options ...Option) *StatefulSetDecomissioner { k8sClient := mgr.GetClient() decommissioner := &StatefulSetDecomissioner{ recorder: mgr.GetEventRecorderFor("broker-decommissioner"), client: k8sClient, - fetcher: fetcher, - factory: internalclient.NewFactory(mgr.GetConfig(), k8sClient), + getAdminClient: getter, requeueTimeout: defaultRequeueTimeout, delayedCacheInterval: defaultDelayedCacheInterval, delayedCacheMaxCount: defaultDelayedMaxCacheCount, - filter: func(ctx context.Context, set *appsv1.StatefulSet) (bool, error) { return true, nil }, cleanupPVCs: true, + // By default don't match anything. + selector: labels.Nothing(), // By default, just look at the single STS // If multiple nodePools are supported, we can use Cluster CR to the the total desired replicas. desiredReplicasFetcher: func(ctx context.Context, set *appsv1.StatefulSet) (int32, error) { @@ -194,58 +177,47 @@ func NewStatefulSetDecommissioner(mgr ctrl.Manager, fetcher Fetcher, options ... // +kubebuilder:rbac:groups=core,namespace=default,resources=secrets,verbs=get;list;watch func (s *StatefulSetDecomissioner) SetupWithManager(mgr ctrl.Manager) error { - pvcPredicate, err := predicate.LabelSelectorPredicate( - metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{{ - Key: k8sNameLabelKey, // make sure we have a name - Operator: metav1.LabelSelectorOpExists, - }, { - Key: k8sComponentLabelKey, // make sure we have a component label - Operator: metav1.LabelSelectorOpExists, - }, { - Key: k8sInstanceLabelKey, // make sure we have a cluster name - Operator: metav1.LabelSelectorOpExists, - }}, - }, - ) - if err != nil { - return err - } + selectorPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool { + lbls := object.GetLabels() + if lbls == nil { + lbls = map[string]string{} + } + return s.selector.Matches(labels.Set(lbls)) + }) return ctrl.NewControllerManagedBy(mgr). + Named("statefulset_decommissioner"). For(&appsv1.StatefulSet{}). Owns(&corev1.Pod{}). // PVCs don't have a "true" owner ref, so instead we attempt to map backwards via labels Watches(&corev1.PersistentVolumeClaim{}, handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []ctrl.Request { claim := o.(*corev1.PersistentVolumeClaim) - labels := claim.GetLabels() - - // a bit of defensive programming, but we should always have labels due to our use - // of a predicate - if labels == nil { - // we have no labels, so we can't map anything - return nil + lbls := claim.GetLabels() + if lbls == nil { + lbls = map[string]string{} } - release := labels[k8sInstanceLabelKey] - if release == "" { - // we have an invalid release name, so skip + // Validate that this PVC is within our purview by making sure it + // satisfies our label selector. (This selector MUST match both PVCs and + // STSs.) + if !s.selector.Matches(labels.Set(lbls)) { return nil } - if !strings.HasPrefix(claim.Name, datadirVolume+"-") { - // we only care about the datadir volume + // Extract the STS name by parsing the naming convention $VOL-$STS-$ORDINAL. + match := datadirRE.FindStringSubmatch(claim.Name) + if match == nil { return nil } // if we are here, it means we can map to a real stateful set return []ctrl.Request{ {NamespacedName: types.NamespacedName{ - Name: release, + Name: match[1], Namespace: claim.Namespace, }}, } - }), builder.WithPredicates(pvcPredicate)). + }), builder.WithPredicates(selectorPredicate)). Complete(s) } @@ -276,6 +248,18 @@ func (s *StatefulSetDecomissioner) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, nil } + // If a filter function has been provided, run it. + if s.filter != nil { + inbounds, err := s.filter(ctx, set) + if err != nil { + return ctrl.Result{}, err + } + + if !inbounds { + return ctrl.Result{}, nil + } + } + requeue, err := s.Decommission(ctx, set) if err != nil { // we already logged any error, just requeue directly, delegating to the @@ -340,17 +324,6 @@ func (s *StatefulSetDecomissioner) Decommission(ctx context.Context, set *appsv1 log := ctrl.LoggerFrom(ctx, "namespace", set.Namespace, "name", set.Name).WithName("StatefulSetDecommissioner.Decomission") - keep, err := s.filter(ctx, set) - if err != nil { - log.Error(err, "error filtering StatefulSet") - return false, err - } - - if !keep { - log.V(traceLevel).Info("skipping decommission, StatefulSet filtered out") - return false, nil - } - setCacheKey := client.ObjectKeyFromObject(set) if s.cleanupPVCs { unboundVolumeClaims, err := s.findUnboundVolumeClaims(ctx, set) @@ -373,7 +346,7 @@ func (s *StatefulSetDecomissioner) Decommission(ctx context.Context, set *appsv1 s.delayedVolumeCache.Mark(setCacheKey, client.ObjectKeyFromObject(claim)) } - // now we attempt to clean up the first of the PVCs that meets the treshold of the cache, + // now we attempt to clean up the first of the PVCs that meets the threshold of the cache, // ensuring that their PVs have a retain policy, and short-circuiting the rest of reconciliation // if we actually delete a claim for _, claim := range unboundVolumeClaims { @@ -763,24 +736,6 @@ func (s *StatefulSetDecomissioner) findUnboundVolumeClaims(ctx context.Context, return unbound, nil } -// getAdminClient initializes an admin API client for a cluster that a statefulset manages. It does this by -// delegating to a "fetcher" which fetches the equivalent values.yaml map from either a Redpanda CR or an -// installed helm release. It then effectively turns this into a Redpanda CR that can be used for initializing -// clients based on existing factory code. -func (s *StatefulSetDecomissioner) getAdminClient(ctx context.Context, set *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) { - release, ok := set.Labels[k8sInstanceLabelKey] - if !ok { - return nil, errors.New("unable to get release name") - } - - fetched, err := s.fetcher.FetchLatest(ctx, release, set.Namespace) - if err != nil { - return nil, fmt.Errorf("fetching latest values: %w", err) - } - - return s.factory.RedpandaAdminClient(ctx, fetched) -} - // podNameFromFQDN takes a hostname and attempt to map the // name back to a stateful set pod ordinal based on the left // most DNS segment containing the form SETNAME-ORDINAL. diff --git a/operator/internal/controller/decommissioning/statefulset_decommissioner_test.go b/operator/internal/controller/decommissioning/statefulset_decommissioner_test.go index 46b1e4c78..9b6e524fb 100644 --- a/operator/internal/controller/decommissioning/statefulset_decommissioner_test.go +++ b/operator/internal/controller/decommissioning/statefulset_decommissioner_test.go @@ -16,11 +16,12 @@ import ( "testing" "time" + "github.com/cockroachdb/errors" "github.com/go-logr/logr/testr" "github.com/redpanda-data/common-go/rpadmin" "github.com/stretchr/testify/suite" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -41,6 +42,8 @@ import ( "github.com/redpanda-data/redpanda-operator/pkg/testutil" ) +const redpandaChartPath = "../../../../charts/redpanda" + //go:embed testdata/role.yaml var decommissionerRBAC []byte @@ -58,13 +61,13 @@ type StatefulSetDecommissionerSuite struct { client client.Client helm *helm.Client clientFactory internalclient.ClientFactory + releases map[string]*chart } var _ suite.SetupAllSuite = (*StatefulSetDecommissionerSuite)(nil) func (s *StatefulSetDecommissionerSuite) TestDecommission() { - s.T().Skip("we currently have issues with the eviction code in this test due to pod disruption budgets") - chart := s.installChart("basic", "", map[string]any{ + chart := s.installChart("basic", map[string]any{ "statefulset": map[string]any{ "replicas": 5, }, @@ -109,7 +112,10 @@ func (s *StatefulSetDecommissionerSuite) TestDecommission() { s.T().Cleanup(func() { s.untaintNode(firstBrokerNode) }) - s.Require().NoError(s.client.SubResource("eviction").Create(s.ctx, &firstBroker, &policyv1.Eviction{})) + // TODO(chrisseto): Evictions fail in CI with `Cannot evict pod as it would violate the pod's disruption budget.` but not locally. + // For now use a forced delete as that mimics node failure equally well. + // s.Require().NoError(s.client.SubResource("eviction").Create(s.ctx, &firstBroker, &policyv1.Eviction{})) + s.Require().NoError(s.client.Delete(s.ctx, &firstBroker, client.GracePeriodSeconds(0))) s.waitFor(func(ctx context.Context) (bool, error) { health, err := adminClient.GetHealthOverview(ctx) @@ -169,6 +175,7 @@ func (s *StatefulSetDecommissionerSuite) SetupSuite() { }) s.ctx = context.Background() + s.releases = map[string]*chart{} s.env = testenv.New(t, testenv.Options{ // We need our own cluster for these tests since we need additional // agents. Otherwise we can just turn up the default... but we'll @@ -188,25 +195,31 @@ func (s *StatefulSetDecommissionerSuite) SetupSuite() { if err != nil { return err } - if err := helmClient.RepoAdd(s.ctx, "redpandadata", "https://charts.redpanda.com"); err != nil { - return err - } s.helm = helmClient dialer := kube.NewPodDialer(mgr.GetConfig()) s.clientFactory = internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient()).WithDialer(dialer.DialContext) - options := []decommissioning.Option{ - // override this so we can dial directly to our Redpanda pods - decommissioning.WithFactory(s.clientFactory), + decommissioner := decommissioning.NewStatefulSetDecommissioner( + mgr, + func(ctx context.Context, sts *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) { + // NB: We expect to see some errors here. The release isn't + // populated until after the chart installation finishes. + name := sts.Labels["app.kubernetes.io/instance"] + release, ok := s.releases[name] + if !ok { + return nil, errors.Newf("no release: %q", name) + } + return s.adminClientFor(release), nil + }, // set these low so that we don't have to wait forever in the test // these settings should give about a 5-10 second window before // actually running a decommission - decommissioning.WithDelayedCacheInterval(5 * time.Second), + decommissioning.WithDelayedCacheInterval(5*time.Second), decommissioning.WithDelayedCacheMaxCount(2), - decommissioning.WithRequeueTimeout(2 * time.Second), - } - decommissioner := decommissioning.NewStatefulSetDecommissioner(mgr, decommissioning.NewHelmFetcher(mgr), options...) + decommissioning.WithRequeueTimeout(2*time.Second), + ) + if err := decommissioner.SetupWithManager(mgr); err != nil { return err } @@ -217,12 +230,11 @@ func (s *StatefulSetDecommissionerSuite) SetupSuite() { type chart struct { name string - version string release helm.Release values map[string]any } -func (s *StatefulSetDecommissionerSuite) installChart(name, version string, overrides map[string]any) *chart { +func (s *StatefulSetDecommissionerSuite) installChart(name string, overrides map[string]any) *chart { values := map[string]any{ "statefulset": map[string]any{ "replicas": 1, @@ -233,18 +245,13 @@ func (s *StatefulSetDecommissionerSuite) installChart(name, version string, over "external": map[string]any{ "enabled": false, }, - "image": map[string]any{ - "repository": "redpandadata/redpanda", - "tag": "v24.3.1", - }, } if overrides != nil { values = functional.MergeMaps(values, overrides) } - release, err := s.helm.Install(s.ctx, "redpandadata/redpanda", helm.InstallOptions{ - Version: version, + release, err := s.helm.Install(s.ctx, redpandaChartPath, helm.InstallOptions{ CreateNamespace: true, Name: name, Namespace: s.env.Namespace(), @@ -252,12 +259,15 @@ func (s *StatefulSetDecommissionerSuite) installChart(name, version string, over }) s.Require().NoError(err) - return &chart{ + c := &chart{ name: name, - version: version, values: values, release: release, } + + s.releases[name] = c + + return c } func (s *StatefulSetDecommissionerSuite) adminClientFor(chart *chart) *rpadmin.AdminAPI { @@ -283,8 +293,7 @@ func (s *StatefulSetDecommissionerSuite) adminClientFor(chart *chart) *rpadmin.A func (s *StatefulSetDecommissionerSuite) upgradeChart(chart *chart, overrides map[string]any) { values := functional.MergeMaps(chart.values, overrides) - release, err := s.helm.Upgrade(s.ctx, chart.release.Name, "redpandadata/redpanda", helm.UpgradeOptions{ - Version: chart.version, + release, err := s.helm.Upgrade(s.ctx, chart.release.Name, redpandaChartPath, helm.UpgradeOptions{ Namespace: s.env.Namespace(), Values: values, }) @@ -306,10 +315,14 @@ func (s *StatefulSetDecommissionerSuite) setupRBAC() string { clusterRole := roles[0].(*rbacv1.ClusterRole) // Inject additional permissions required for running in testenv. - role.Rules = append(role.Rules, rbacv1.PolicyRule{ + clusterRole.Rules = append(clusterRole.Rules, rbacv1.PolicyRule{ APIGroups: []string{""}, Resources: []string{"pods/portforward"}, Verbs: []string{"*"}, + }, rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list"}, }) name := "testenv-" + testenv.RandString(6) diff --git a/operator/internal/controller/pvcunbinder/pvcunbinder.go b/operator/internal/controller/pvcunbinder/pvcunbinder.go index a28315ecf..38d8d2b21 100644 --- a/operator/internal/controller/pvcunbinder/pvcunbinder.go +++ b/operator/internal/controller/pvcunbinder/pvcunbinder.go @@ -58,10 +58,6 @@ type Controller struct { // Selector, if specified, will narrow the scope of Pods that this // Reconciler will consider for remediation. Selector labels.Selector - // Filter, if specified, will narrow the scope of Pods that this - // Reconciler will consider for remediation via some sort of filtering - // function. - Filter func(ctx context.Context, pod *corev1.Pod) (bool, error) // AllowRebinding optionally enables clearing of the unbound PV's ClaimRef // which effectively makes the PVs "re-bindable" if the underlying Node // become capable of scheduling Pods once again. @@ -73,20 +69,26 @@ type Controller struct { AllowRebinding bool } -func FilterPodOwner(ownerNamespace, ownerName string) func(ctx context.Context, pod *corev1.Pod) (bool, error) { - filter := filterOwner(ownerNamespace, ownerName) - return func(ctx context.Context, pod *corev1.Pod) (bool, error) { - return filter(pod), nil - } -} - // +kubebuilder:rbac:groups=core,resources=persistentvolumes,verbs=get;list;watch;patch // +kubebuilder:rbac:groups=core,namespace=default,resources=pods,verbs=get;list;watch;delete // +kubebuilder:rbac:groups=core,namespace=default,resources=persistentvolumeclaims,verbs=get;list;watch;delete; func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr).For(&corev1.Pod{}, builder.WithPredicates(predicate.NewPredicateFuncs(pvcUnbinderPredicate))).Complete(r) + selectorPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool { + if r.Selector == nil { + return true + } + + lbls := object.GetLabels() + if lbls == nil { + lbls = map[string]string{} + } + return r.Selector.Matches(labels.Set(lbls)) + }) + unbinderPredicate := predicate.NewPredicateFuncs(pvcUnbinderPredicate) + + return ctrl.NewControllerManagedBy(mgr).For(&corev1.Pod{}, builder.WithPredicates(selectorPredicate, unbinderPredicate)).Complete(r) } // Reconcile implements the algorithm described in the docs of [Reconciler]. To @@ -297,19 +299,6 @@ func (r *Controller) ShouldRemediate(ctx context.Context, pod *corev1.Pod) (bool return false, 0 } - if r.Filter != nil { - keep, err := r.Filter(ctx, pod) - if err != nil { - log.FromContext(ctx).Error(err, "error filtering pod", "name", pod.Name, "labels", pod.Labels) - return false, 0 - } - - if !keep { - log.FromContext(ctx).Info("filter not satisfied; skipping", "name", pod.Name, "labels", pod.Labels) - return false, 0 - } - } - idx := slices.IndexFunc(pod.Status.Conditions, func(cond corev1.PodCondition) bool { return cond.Type == corev1.PodScheduled && cond.Status == corev1.ConditionFalse && cond.Reason == "Unschedulable" }) @@ -379,16 +368,3 @@ func StsPVCs(pod *corev1.Pod) []client.ObjectKey { } return found } - -// TODO extract to wellknown labels package? -const k8sInstanceLabelKey = "app.kubernetes.io/instance" - -func filterOwner(ownerNamespace, ownerName string) func(o client.Object) bool { - return func(o client.Object) bool { - labels := o.GetLabels() - if o.GetNamespace() == ownerNamespace && labels != nil && labels[k8sInstanceLabelKey] == ownerName { - return true - } - return false - } -} diff --git a/operator/internal/testenv/testenv.go b/operator/internal/testenv/testenv.go index 749d36f9e..b8279bc2c 100644 --- a/operator/internal/testenv/testenv.go +++ b/operator/internal/testenv/testenv.go @@ -136,7 +136,16 @@ func New(t *testing.T, options Options) *Env { t.Logf("Executing in namespace '%s'", ns.Name) - t.Cleanup(env.shutdown) + // If this is operating in the "shared" testenv cluster, we tear down the + // isolated namespace and retain the cluster itself. Otherwise we just tear + // down the cluster as it's faster and reduces resource usage. + if options.Name != k3dClusterName { + t.Cleanup(env.shutdown) + } else { + t.Cleanup(func() { + require.NoError(t, cluster.Cleanup()) + }) + } return env } diff --git a/operator/tests/e2e-v2/disable-helm-controllers/03-delete-redpandas.yaml b/operator/tests/e2e-v2/disable-helm-controllers/03-delete-redpandas.yaml index 4500587d8..7d64db472 100644 --- a/operator/tests/e2e-v2/disable-helm-controllers/03-delete-redpandas.yaml +++ b/operator/tests/e2e-v2/disable-helm-controllers/03-delete-redpandas.yaml @@ -2,7 +2,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - - command: kubectl delete -f https://github.com/fluxcd/flux2/releases/latest/download/install.yaml + - command: kubectl delete -f https://github.com/fluxcd/flux2/releases/download/v2.3.0/install.yaml delete: - apiVersion: cluster.redpanda.com/v1alpha2 kind: Redpanda diff --git a/pkg/go.mod b/pkg/go.mod index 9d71ae159..08670d169 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -13,6 +13,7 @@ require ( github.com/quasilyte/go-ruleguard/dsl v0.3.22 github.com/redpanda-data/redpanda-operator/operator v0.0.0-20250417174137-234f93a35ce2 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 + github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go/modules/k3s v0.32.0 github.com/wk8/go-ordered-map/v2 v2.1.8 @@ -166,7 +167,6 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/cast v1.7.0 // indirect github.com/spf13/cobra v1.8.1 // indirect - github.com/spf13/pflag v1.0.5 // indirect github.com/testcontainers/testcontainers-go v0.33.0 // indirect github.com/texttheater/golang-levenshtein v1.0.1 // indirect github.com/tklauser/go-sysconf v0.3.14 // indirect diff --git a/pkg/pflagutil/flags.go b/pkg/pflagutil/flags.go new file mode 100644 index 000000000..18ee01569 --- /dev/null +++ b/pkg/pflagutil/flags.go @@ -0,0 +1,48 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +// package pflagutil contains custom implementations of [pflag.Value] for +// commonly used CLI options. +package pflagutil + +import ( + "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/labels" +) + +// LabelSelectorValue is a [pflag.Value] implementation for Kubernetes label selectors. +// Usage: +// +// var selector LabelSelectorValue +// cmd.Flags().Var(&selector, ...) +type LabelSelectorValue struct { + Selector labels.Selector +} + +var _ pflag.Value = ((*LabelSelectorValue)(nil)) + +func (s *LabelSelectorValue) Set(value string) error { + if value == "" { + return nil + } + var err error + s.Selector, err = labels.Parse(value) + return err +} + +func (s *LabelSelectorValue) String() string { + if s.Selector == nil { + return "" + } + return s.Selector.String() +} + +func (s *LabelSelectorValue) Type() string { + return "label-selector" +}