diff --git a/operator/cmd/run/run.go b/operator/cmd/run/run.go index 8c10942d8..43d8c7045 100644 --- a/operator/cmd/run/run.go +++ b/operator/cmd/run/run.go @@ -16,25 +16,17 @@ import ( "crypto/tls" "fmt" "path/filepath" - "slices" "strings" "time" "github.com/cockroachdb/errors" "github.com/spf13/cobra" - "github.com/spf13/pflag" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/client" - kubeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -53,10 +45,10 @@ import ( "github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle" adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin" internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" - pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels" "github.com/redpanda-data/redpanda-operator/operator/pkg/resources" "github.com/redpanda-data/redpanda-operator/pkg/kube" "github.com/redpanda-data/redpanda-operator/pkg/otelutil/log" + "github.com/redpanda-data/redpanda-operator/pkg/pflagutil" pkgsecrets "github.com/redpanda-data/redpanda-operator/pkg/secrets" ) @@ -110,7 +102,7 @@ type RunOptions struct { restrictToRedpandaVersion string ghostbuster bool unbindPVCsAfter time.Duration - unbinderSelector LabelSelectorValue + unbinderSelector pflagutil.LabelSelectorValue allowPVRebinding bool autoDeletePVCs bool webhookCertPath string @@ -204,32 +196,6 @@ func (o *RunOptions) ControllerEnabled(controller Controller) bool { return false } -type LabelSelectorValue struct { - Selector labels.Selector -} - -var _ pflag.Value = ((*LabelSelectorValue)(nil)) - -func (s *LabelSelectorValue) Set(value string) error { - if value == "" { - return nil - } - var err error - s.Selector, err = labels.Parse(value) - return err -} - -func (s *LabelSelectorValue) String() string { - if s.Selector == nil { - return "" - } - return s.Selector.String() -} - -func (s *LabelSelectorValue) Type() string { - return "label selector" -} - // Metrics RBAC permissions // +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create; // +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create; @@ -506,7 +472,7 @@ func Run( if v1Controllers { setupLog.Info("setting up vectorized controllers") - if err := setupVectorizedControllers(ctx, mgr, cloudExpander, opts); err != nil { + if err := setupVectorizedControllers(ctx, mgr, factory, cloudExpander, opts); err != nil { return err } } @@ -584,25 +550,10 @@ func Run( return nil } -type v1Fetcher struct { - client kubeClient.Client -} - -func (f *v1Fetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) { - var vectorizedCluster vectorizedv1alpha1.Cluster - if err := f.client.Get(ctx, types.NamespacedName{ - Name: name, - Namespace: namespace, - }, &vectorizedCluster); err != nil { - return nil, err - } - return &vectorizedCluster, nil -} - // setupVectorizedControllers configures and registers controllers and // runnables for the custom resources in the vectorized group, AKA the V1 // operator. -func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpander *pkgsecrets.CloudExpander, opts *RunOptions) error { +func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, factory internalclient.ClientFactory, cloudExpander *pkgsecrets.CloudExpander, opts *RunOptions) error { log.Info(ctx, "Starting Vectorized (V1) Controllers") configurator := resources.ConfiguratorSettings{ @@ -650,8 +601,15 @@ func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpa } } - if opts.enableGhostBrokerDecommissioner { - d := decommissioning.NewStatefulSetDecommissioner(mgr, &v1Fetcher{client: mgr.GetClient()}, + if opts.enableGhostBrokerDecommissioner && opts.enableVectorizedControllers { + adapter := vectorizedDecommissionerAdapter{factory: factory, client: mgr.GetClient()} + d := decommissioning.NewStatefulSetDecommissioner( + mgr, + adapter.getAdminClient, + decommissioning.WithFilter(adapter.filter), + // Operator v1 supports multiple NodePools, and therefore multiple STS. + // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS. + decommissioning.WithDesiredReplicasFetcher(adapter.desiredReplicas), decommissioning.WithSyncPeriod(opts.ghostBrokerDecommissionerSyncPeriod), decommissioning.WithCleanupPVCs(false), // In Operator v1, decommissioning based on pod ordinal is not correct because @@ -659,107 +617,6 @@ func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpa // (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139) // In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode) decommissioning.WithDecommisionOnTooHighOrdinal(false), - // Operator v1 supports multiple NodePools, and therefore multiple STS. - // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS. - decommissioning.WithDesiredReplicasFetcher(func(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) { - // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas. - idx := slices.IndexFunc( - sts.OwnerReferences, - func(ownerRef metav1.OwnerReference) bool { - return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster" - }) - if idx == -1 { - return 0, nil - } - - var vectorizedCluster vectorizedv1alpha1.Cluster - if err := mgr.GetClient().Get(ctx, types.NamespacedName{ - Name: sts.OwnerReferences[idx].Name, - Namespace: sts.Namespace, - }, &vectorizedCluster); err != nil { - return 0, fmt.Errorf("could not get Cluster: %w", err) - } - - // We assume the cluster is fine and synced, checks have been performed in the filter already. - - // Get all nodepool-sts for this Cluster - var stsList appsv1.StatefulSetList - err := mgr.GetClient().List(ctx, &stsList, &client.ListOptions{ - LabelSelector: pkglabels.ForCluster(&vectorizedCluster).AsClientSelector(), - }) - if err != nil { - return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err) - } - - if len(stsList.Items) == 0 { - return 0, errors.New("found 0 StatefulSets for this Cluster") - } - - var allReplicas int32 - for _, sts := range stsList.Items { - allReplicas += ptr.Deref(sts.Spec.Replicas, 0) - } - - // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner. - if allReplicas < 3 { - return 0, fmt.Errorf("found %d desiredReplicas, but want >= 3", allReplicas) - } - - if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas { - return 0, fmt.Errorf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas) - } - - return allReplicas, nil - }), - decommissioning.WithFactory(internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())), - decommissioning.WithFilter(func(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) { - log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter") - idx := slices.IndexFunc( - sts.OwnerReferences, - func(ownerRef metav1.OwnerReference) bool { - return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster" - }) - if idx == -1 { - return false, nil - } - - var vectorizedCluster vectorizedv1alpha1.Cluster - if err := mgr.GetClient().Get(ctx, types.NamespacedName{ - Name: sts.OwnerReferences[idx].Name, - Namespace: sts.Namespace, - }, &vectorizedCluster); err != nil { - return false, fmt.Errorf("could not get Cluster: %w", err) - } - - managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed" - if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" { - log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace) - return false, nil - } - - // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster - // (and we can therefore not use it to check if the cluster is synced otherwise) - if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas { - log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace) - return false, nil - } - if vectorizedCluster.Status.Restarting { - log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace) - return false, nil - } - - if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation { - log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration) - return false, nil - } - - if vectorizedCluster.Status.DecommissioningNode != nil { - log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode) - return false, nil - } - - return true, nil - }), ) if err := d.SetupWithManager(mgr); err != nil { diff --git a/operator/cmd/run/vectorized.go b/operator/cmd/run/vectorized.go new file mode 100644 index 000000000..c9ddf52f8 --- /dev/null +++ b/operator/cmd/run/vectorized.go @@ -0,0 +1,156 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package run + +import ( + "context" + "fmt" + "slices" + + "github.com/cockroachdb/errors" + "github.com/redpanda-data/common-go/rpadmin" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1" + internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" + pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels" +) + +// vectorizedDecommissionerAdapter is a helper struct that implements various methods +// of mapping StatefulSets through Vectorized Clusters to arguments for the +// StatefulSetDecommissioner. +type vectorizedDecommissionerAdapter struct { + client client.Client + factory internalclient.ClientFactory +} + +func (b *vectorizedDecommissionerAdapter) desiredReplicas(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) { + // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas. + vectorizedCluster, err := b.getCluster(ctx, sts) + if err != nil { + return 0, err + } + + if vectorizedCluster == nil { + return 0, nil + } + + // We assume the cluster is fine and synced, checks have been performed in the filter already. + + // Get all nodepool-sts for this Cluster + var stsList appsv1.StatefulSetList + if err := b.client.List(ctx, &stsList, &client.ListOptions{ + LabelSelector: pkglabels.ForCluster(vectorizedCluster).AsClientSelector(), + Namespace: vectorizedCluster.Namespace, + }); err != nil { + return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err) + } + + if len(stsList.Items) == 0 { + return 0, errors.New("found 0 StatefulSets for this Cluster") + } + + var allReplicas int32 + for _, sts := range stsList.Items { + allReplicas += ptr.Deref(sts.Spec.Replicas, 0) + } + + // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner. + if allReplicas < 3 { + return 0, errors.Newf("found %d desiredReplicas, but want >= 3", allReplicas) + } + + if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas { + return 0, errors.Newf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas) + } + + return allReplicas, nil +} + +func (b *vectorizedDecommissionerAdapter) filter(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) { + log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter") + + vectorizedCluster, err := b.getCluster(ctx, sts) + if err != nil { + return false, err + } + + if vectorizedCluster == nil { + return false, nil + } + + managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed" + if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" { + log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace) + return false, nil + } + + // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster + // (and we can therefore not use it to check if the cluster is synced otherwise) + if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas { + log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace) + return false, nil + } + if vectorizedCluster.Status.Restarting { + log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace) + return false, nil + } + + if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation { + log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration) + return false, nil + } + + if vectorizedCluster.Status.DecommissioningNode != nil { + log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode) + return false, nil + } + + return true, nil +} + +func (b *vectorizedDecommissionerAdapter) getAdminClient(ctx context.Context, sts *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) { + cluster, err := b.getCluster(ctx, sts) + if err != nil { + return nil, err + } + + if cluster == nil { + return nil, errors.Newf("failed to resolve %s/%s to vectorized cluster", sts.Namespace, sts.Name) + } + + return b.factory.RedpandaAdminClient(ctx, cluster) +} + +func (b *vectorizedDecommissionerAdapter) getCluster(ctx context.Context, sts *appsv1.StatefulSet) (*vectorizedv1alpha1.Cluster, error) { + idx := slices.IndexFunc( + sts.OwnerReferences, + func(ownerRef metav1.OwnerReference) bool { + return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster" + }) + if idx == -1 { + return nil, nil + } + + var vectorizedCluster vectorizedv1alpha1.Cluster + if err := b.client.Get(ctx, types.NamespacedName{ + Name: sts.OwnerReferences[idx].Name, + Namespace: sts.Namespace, + }, &vectorizedCluster); err != nil { + return nil, errors.Wrap(err, "could not get Cluster") + } + + return &vectorizedCluster, nil +} diff --git a/operator/cmd/sidecar/sidecar.go b/operator/cmd/sidecar/sidecar.go index 1e9d8e63d..5b0b3918a 100644 --- a/operator/cmd/sidecar/sidecar.go +++ b/operator/cmd/sidecar/sidecar.go @@ -15,11 +15,18 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/redpanda-data/common-go/rpadmin" + rpkadminapi "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + rpkconfig "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/spf13/afero" "github.com/spf13/cobra" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/redpanda-data/redpanda-operator/operator/internal/configwatcher" @@ -27,6 +34,7 @@ import ( "github.com/redpanda-data/redpanda-operator/operator/internal/controller/pvcunbinder" "github.com/redpanda-data/redpanda-operator/operator/internal/probes" internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" + "github.com/redpanda-data/redpanda-operator/pkg/pflagutil" ) // +kubebuilder:rbac:groups=coordination.k8s.io,namespace=default,resources=leases,verbs=get;list;watch;create;update;patch;delete @@ -56,6 +64,7 @@ func Command() *cobra.Command { brokerProbeBrokerURL string runUnbinder bool unbinderTimeout time.Duration + selector pflagutil.LabelSelectorValue panicAfter time.Duration ) @@ -86,6 +95,7 @@ func Command() *cobra.Command { brokerProbeBrokerURL, runUnbinder, unbinderTimeout, + selector.Selector, panicAfter, ) }, @@ -102,6 +112,7 @@ func Command() *cobra.Command { // cluster flags cmd.Flags().StringVar(&clusterNamespace, "redpanda-cluster-namespace", "", "The namespace of the cluster that this sidecar manages.") cmd.Flags().StringVar(&clusterName, "redpanda-cluster-name", "", "The name of the cluster that this sidecar manages.") + cmd.Flags().Var(&selector, "selector", "Kubernetes label selector that will filter objects to be considered by the all controllers run by the sidecar.") // decommission flags cmd.Flags().BoolVar(&runDecommissioner, "run-decommissioner", false, "Specifies if the sidecar should run the broker decommissioner.") @@ -153,10 +164,14 @@ func Run( brokerProbeBrokerURL string, runUnbinder bool, unbinderTimeout time.Duration, + selector labels.Selector, panicAfter time.Duration, ) error { setupLog := ctrl.LoggerFrom(ctx).WithName("setup") + // Required arguments check, in sidecar mode these MUST be specified to + // ensure the sidecar only affects the helm deployment that's deployed it. + if clusterNamespace == "" { err := errors.New("must specify a cluster-namespace parameter") setupLog.Error(err, "no cluster namespace provided") @@ -169,6 +184,20 @@ func Run( return err } + if selector == nil || selector.Empty() { + // Use a sensible default that's about as correct than the previous + // hard coded values. Hardcoding of name=redpanda is incorrect when + // nameoverride is used. + var err error + selector, err = labels.Parse(fmt.Sprintf( + "apps.kubernetes.io/component,app.kubernetes.io/name=redpanda,app.kubernetes.io/instance=%s", + clusterName, + )) + if err != nil { + panic(err) + } + } + scheme := runtime.NewScheme() for _, fn := range schemes { @@ -183,6 +212,12 @@ func Run( LeaderElectionID: clusterName + "." + clusterNamespace + ".redpanda", Scheme: scheme, LeaderElectionNamespace: clusterNamespace, + Cache: cache.Options{ + // Only watch the specified namespace, we don't have permissions for watch at the ClusterScope. + DefaultNamespaces: map[string]cache.Config{ + clusterNamespace: {}, + }, + }, }) if err != nil { setupLog.Error(err, "unable to initialize manager") @@ -190,28 +225,41 @@ func Run( } if runDecommissioner { - fetcher := decommissioning.NewChainedFetcher( - // prefer RPK profile first and then move on to fetch from helm values - decommissioning.NewRPKProfileFetcher(redpandaYAMLPath), - decommissioning.NewHelmFetcher(mgr), - ) + setupLog.Info("broker decommissioner enabled", "namespace", clusterNamespace, "cluster", clusterName, "selector", selector) - if err := decommissioning.NewStatefulSetDecommissioner(mgr, fetcher, []decommissioning.Option{ - decommissioning.WithFilter(decommissioning.FilterStatefulSetOwner(clusterNamespace, clusterName)), + fs := afero.NewOsFs() + + params := rpkconfig.Params{ConfigFlag: redpandaYAMLPath} + + config, err := params.Load(afero.NewOsFs()) + if err != nil { + return err + } + + if err := decommissioning.NewStatefulSetDecommissioner( + mgr, + func(ctx context.Context, _ *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) { + // Always use the config that's loaded from redpanda.yaml, in + // sidecar mode no other STS's should be watched. + return rpkadminapi.NewClient(ctx, fs, config.VirtualProfile()) + }, + decommissioning.WithSelector(selector), decommissioning.WithRequeueTimeout(decommissionRequeueTimeout), decommissioning.WithDelayedCacheInterval(decommissionVoteInterval), decommissioning.WithDelayedCacheMaxCount(decommissionMaxVoteCount), - }...).SetupWithManager(mgr); err != nil { + ).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner") return err } } if runUnbinder { + setupLog.Info("PVC unbinder enabled", "namespace", clusterNamespace, "selector", selector) + if err := (&pvcunbinder.Controller{ - Client: mgr.GetClient(), - Timeout: unbinderTimeout, - Filter: pvcunbinder.FilterPodOwner(clusterNamespace, clusterName), + Client: mgr.GetClient(), + Timeout: unbinderTimeout, + Selector: selector, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PVCUnbinder") return err diff --git a/operator/go.mod b/operator/go.mod index 95b8ffa75..7c0c28a4f 100644 --- a/operator/go.mod +++ b/operator/go.mod @@ -37,7 +37,6 @@ require ( github.com/scalalang2/golang-fifo v1.0.2 github.com/spf13/afero v1.12.0 github.com/spf13/cobra v1.9.1 - github.com/spf13/pflag v1.0.7 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.39.0 github.com/testcontainers/testcontainers-go/modules/redpanda v0.39.0 @@ -269,6 +268,7 @@ require ( github.com/shopspring/decimal v1.4.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/cast v1.7.0 // indirect + github.com/spf13/pflag v1.0.7 // indirect github.com/stoewer/go-strcase v1.3.1 // indirect github.com/texttheater/golang-levenshtein v1.0.1 // indirect github.com/tidwall/gjson v1.18.0 // indirect diff --git a/operator/internal/controller/decommissioning/chained_fetcher.go b/operator/internal/controller/decommissioning/chained_fetcher.go deleted file mode 100644 index a21764692..000000000 --- a/operator/internal/controller/decommissioning/chained_fetcher.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "context" - "fmt" - - "github.com/cockroachdb/errors" -) - -// ChainedFetcher delegates fetching behavior to a list of sub fetchers -// moving down the list if an error occurs in the previous fetcher. -type ChainedFetcher struct { - fetchers []Fetcher -} - -var _ Fetcher = (*ChainedFetcher)(nil) - -func NewChainedFetcher(fetchers ...Fetcher) *ChainedFetcher { - return &ChainedFetcher{fetchers: fetchers} -} - -func (c *ChainedFetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) { - if len(c.fetchers) == 0 { - return nil, errors.New("chained fetcher does not have any supplied sub-fetchers") - } - - errs := []error{} - - for _, fetcher := range c.fetchers { - object, err := fetcher.FetchLatest(ctx, name, namespace) - if err != nil { - errs = append(errs, err) - continue - } - - if object != nil { - return object, nil - } - } - - return nil, fmt.Errorf("all sub-fetchers failed: %w", errors.Join(errs...)) -} diff --git a/operator/internal/controller/decommissioning/fetcher.go b/operator/internal/controller/decommissioning/fetcher.go deleted file mode 100644 index 5a8618694..000000000 --- a/operator/internal/controller/decommissioning/fetcher.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "context" -) - -// Fetcher acts as a mechanism for fetching an object that can be used in initializing -// a connection to Redpanda. This can come in the form of a Redpanda CR or an RPK profile. -type Fetcher interface { - FetchLatest(ctx context.Context, name, namespace string) (any, error) -} diff --git a/operator/internal/controller/decommissioning/helm_fetcher.go b/operator/internal/controller/decommissioning/helm_fetcher.go deleted file mode 100644 index f303aaa75..000000000 --- a/operator/internal/controller/decommissioning/helm_fetcher.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "bytes" - "compress/gzip" - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - - "github.com/cockroachdb/errors" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2" -) - -// the format logic for helm releases can be found: -// https://github.com/helm/helm/blob/2cea1466d3c27491364eb44bafc7be1ca5461b2d/pkg/storage/driver/util.go#L58 - -var gzipHeader = []byte{0x1f, 0x8b, 0x08} - -// HelmFetcher fetches a Redpanda CR via initializing it virtually with a -// Helm values file stored in a secret. This is to maintain backwards -// compatibility with our current mechanism for decommissioning, but -// it should likely be dropped in the future with preference to using -// an RPK profile. -type HelmFetcher struct { - client client.Client -} - -var _ Fetcher = (*HelmFetcher)(nil) - -func NewHelmFetcher(mgr ctrl.Manager) *HelmFetcher { - return &HelmFetcher{client: mgr.GetClient()} -} - -func (f *HelmFetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) { - log := ctrl.LoggerFrom(ctx, "namespace", namespace, "name", name).WithName("HelmFetcher.FetchLatest") - - var secrets corev1.SecretList - - if err := f.client.List(ctx, &secrets, client.MatchingLabels{ - "name": name, - "owner": "helm", - }, client.InNamespace(namespace)); err != nil { - return nil, fmt.Errorf("fetching secrets list: %w", err) - } - - latestVersion := 0 - var latestValues map[string]any - for _, item := range secrets.Items { - values, version, err := f.decode(item.Data["release"]) - if err != nil { - // just log the error and move on rather than making it terminal - // in case there's some secret that's just badly formatted - log.Error(err, "decoding secret", "secret", item.Name) - continue - } - if version > latestVersion { - latestVersion = version - latestValues = values - } - } - - if latestValues != nil { - data, err := json.Marshal(latestValues) - if err != nil { - return nil, fmt.Errorf("marshaling values: %w", err) - } - - cluster := &redpandav1alpha2.Redpanda{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - Spec: redpandav1alpha2.RedpandaSpec{ClusterSpec: &redpandav1alpha2.RedpandaClusterSpec{}}, - } - - if err := json.Unmarshal(data, cluster.Spec.ClusterSpec); err != nil { - return nil, fmt.Errorf("unmarshaling values: %w", err) - } - - return cluster, nil - } - - err := errors.New("unable to find latest value") - log.Error(err, "no secrets were decodable") - return nil, err -} - -type partialChart struct { - Config map[string]any `json:"config"` - Version int `json:"version"` -} - -func (f *HelmFetcher) decode(data []byte) (map[string]any, int, error) { - decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data))) - n, err := base64.StdEncoding.Decode(decoded, data) - if err != nil { - return nil, 0, err - } - decoded = decoded[:n] - - if len(decoded) > 3 && bytes.Equal(decoded[0:3], gzipHeader) { - reader, err := gzip.NewReader(bytes.NewReader(decoded)) - if err != nil { - return nil, 0, err - } - defer reader.Close() - unzipped, err := io.ReadAll(reader) - if err != nil { - return nil, 0, err - } - decoded = unzipped - } - - var chart partialChart - if err := json.Unmarshal(decoded, &chart); err != nil { - return nil, 0, err - } - - // We only care about the chart.Config here and not the - // merged values with the chart values because our - // client initialization code already does the merging. - return chart.Config, chart.Version, nil -} diff --git a/operator/internal/controller/decommissioning/redpanda_fetcher.go b/operator/internal/controller/decommissioning/redpanda_fetcher.go deleted file mode 100644 index 0dff57ac7..000000000 --- a/operator/internal/controller/decommissioning/redpanda_fetcher.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "context" - "fmt" - - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2" -) - -// RedpandaFetcher fetches a Redpanda cluster via CR. -type RedpandaFetcher struct { - client client.Client -} - -var _ Fetcher = (*RedpandaFetcher)(nil) - -func NewRedpandaFetcher(mgr ctrl.Manager) *RedpandaFetcher { - return &RedpandaFetcher{client: mgr.GetClient()} -} - -func (f *RedpandaFetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) { - var cluster redpandav1alpha2.Redpanda - if err := f.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &cluster); err != nil { - return nil, fmt.Errorf("fetching cluster: %w", err) - } - - return cluster, nil -} diff --git a/operator/internal/controller/decommissioning/rpk_profile_fetcher.go b/operator/internal/controller/decommissioning/rpk_profile_fetcher.go deleted file mode 100644 index 2b17155bd..000000000 --- a/operator/internal/controller/decommissioning/rpk_profile_fetcher.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2025 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package decommissioning - -import ( - "context" - "sync" - - rpkconfig "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" - "github.com/spf13/afero" -) - -// RPKProfileFetcher loads up an RPK profile used in initializing a cluster -// connection. It should only ever be used when we only care about managing -// a single cluster, as it ignores the name and the namespace handed it and -// solely uses a profile found on disk. -type RPKProfileFetcher struct { - configPath string - fs afero.Fs - profile *rpkconfig.RpkProfile - mutex sync.Mutex -} - -var _ Fetcher = (*RPKProfileFetcher)(nil) - -func NewRPKProfileFetcher(configPath string) *RPKProfileFetcher { - return &RPKProfileFetcher{configPath: configPath, fs: afero.NewOsFs()} -} - -func (f *RPKProfileFetcher) FetchLatest(_ context.Context, _, _ string) (any, error) { - f.mutex.Lock() - defer f.mutex.Unlock() - - if f.profile != nil { - // returned the memoized profile so we don't have to keep reading it, if we need - // to handle the profile on disk changing, then we should implement something like - // an fsnotify watcher that clears the memoized profile when the file changes on disk - return f.profile, nil - } - - params := rpkconfig.Params{ConfigFlag: f.configPath} - - config, err := params.Load(f.fs) - if err != nil { - return nil, err - } - - f.profile = config.VirtualProfile() - - return f.profile, nil -} diff --git a/operator/internal/controller/decommissioning/statefulset_decomissioner.go b/operator/internal/controller/decommissioning/statefulset_decomissioner.go index a73da5e0c..041b4a8b1 100644 --- a/operator/internal/controller/decommissioning/statefulset_decomissioner.go +++ b/operator/internal/controller/decommissioning/statefulset_decomissioner.go @@ -12,17 +12,18 @@ package decommissioning import ( "context" "fmt" + "regexp" "sort" "strconv" "strings" "time" - "github.com/cockroachdb/errors" "github.com/redpanda-data/common-go/rpadmin" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" @@ -34,7 +35,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" - internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" "github.com/redpanda-data/redpanda-operator/operator/pkg/client/kubernetes" "github.com/redpanda-data/redpanda-operator/operator/pkg/collections" "github.com/redpanda-data/redpanda-operator/operator/pkg/functional" @@ -44,11 +44,7 @@ const ( eventReasonBroker = "DecommissioningBroker" eventReasonUnboundPersistentVolumeClaims = "DecommissioningUnboundPersistentVolumeClaims" - k8sManagedByLabelKey = "app.kubernetes.io/managed-by" - k8sInstanceLabelKey = "app.kubernetes.io/instance" - k8sComponentLabelKey = "app.kubernetes.io/component" - k8sNameLabelKey = "app.kubernetes.io/name" - datadirVolume = "datadir" + datadirVolume = "datadir" traceLevel = 2 debugLevel = 1 @@ -62,24 +58,9 @@ const ( defaultDelayedMaxCacheCount = 2 ) -type Option func(*StatefulSetDecomissioner) +var datadirRE = regexp.MustCompile(`^datadir-(.+)-\d+$`) -func FilterStatefulSetOwner(ownerNamespace, ownerName string) func(ctx context.Context, set *appsv1.StatefulSet) (bool, error) { - filter := filterOwner(ownerNamespace, ownerName) - return func(ctx context.Context, set *appsv1.StatefulSet) (bool, error) { - return filter(set), nil - } -} - -func filterOwner(ownerNamespace, ownerName string) func(o client.Object) bool { - return func(o client.Object) bool { - labels := o.GetLabels() - if o.GetNamespace() == ownerNamespace && labels != nil && labels[k8sInstanceLabelKey] == ownerName { - return true - } - return false - } -} +type Option func(*StatefulSetDecomissioner) func WithDecommisionOnTooHighOrdinal(val bool) Option { return func(decommissioner *StatefulSetDecomissioner) { @@ -93,15 +74,15 @@ func WithDesiredReplicasFetcher(desiredReplicasFetcher func(ctx context.Context, } } -func WithFilter(filter func(ctx context.Context, set *appsv1.StatefulSet) (bool, error)) Option { +func WithSelector(selector labels.Selector) Option { return func(decommissioner *StatefulSetDecomissioner) { - decommissioner.filter = filter + decommissioner.selector = selector } } -func WithFactory(factory internalclient.ClientFactory) Option { +func WithFilter(fn func(context.Context, *appsv1.StatefulSet) (bool, error)) Option { return func(decommissioner *StatefulSetDecomissioner) { - decommissioner.factory = factory + decommissioner.filter = fn } } @@ -135,38 +116,40 @@ func WithSyncPeriod(d time.Duration) Option { } } +type ClientGetter func(context.Context, *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) + type StatefulSetDecomissioner struct { client client.Client - factory internalclient.ClientFactory - fetcher Fetcher + getAdminClient ClientGetter recorder record.EventRecorder requeueTimeout time.Duration delayedCacheInterval time.Duration delayedCacheMaxCount int delayedBrokerIDCache *CategorizedDelayedCache[types.NamespacedName, int] delayedVolumeCache *CategorizedDelayedCache[types.NamespacedName, types.NamespacedName] - filter func(ctx context.Context, set *appsv1.StatefulSet) (bool, error) cleanupPVCs bool syncPeriod time.Duration + selector labels.Selector + filter func(context.Context, *appsv1.StatefulSet) (bool, error) // decommissionOnTooHighOrdinal allows the decommissioner to decommission a node, if it has an decommissionOnTooHighOrdinal bool desiredReplicasFetcher func(ctx context.Context, set *appsv1.StatefulSet) (int32, error) } -func NewStatefulSetDecommissioner(mgr ctrl.Manager, fetcher Fetcher, options ...Option) *StatefulSetDecomissioner { +func NewStatefulSetDecommissioner(mgr ctrl.Manager, getter ClientGetter, options ...Option) *StatefulSetDecomissioner { k8sClient := mgr.GetClient() decommissioner := &StatefulSetDecomissioner{ recorder: mgr.GetEventRecorderFor("broker-decommissioner"), client: k8sClient, - fetcher: fetcher, - factory: internalclient.NewFactory(mgr.GetConfig(), k8sClient), + getAdminClient: getter, requeueTimeout: defaultRequeueTimeout, delayedCacheInterval: defaultDelayedCacheInterval, delayedCacheMaxCount: defaultDelayedMaxCacheCount, - filter: func(ctx context.Context, set *appsv1.StatefulSet) (bool, error) { return true, nil }, cleanupPVCs: true, + // By default don't match anything. + selector: labels.Nothing(), // By default, just look at the single STS // If multiple nodePools are supported, we can use Cluster CR to the the total desired replicas. desiredReplicasFetcher: func(ctx context.Context, set *appsv1.StatefulSet) (int32, error) { @@ -194,58 +177,47 @@ func NewStatefulSetDecommissioner(mgr ctrl.Manager, fetcher Fetcher, options ... // +kubebuilder:rbac:groups=core,namespace=default,resources=secrets,verbs=get;list;watch func (s *StatefulSetDecomissioner) SetupWithManager(mgr ctrl.Manager) error { - pvcPredicate, err := predicate.LabelSelectorPredicate( - metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{{ - Key: k8sNameLabelKey, // make sure we have a name - Operator: metav1.LabelSelectorOpExists, - }, { - Key: k8sComponentLabelKey, // make sure we have a component label - Operator: metav1.LabelSelectorOpExists, - }, { - Key: k8sInstanceLabelKey, // make sure we have a cluster name - Operator: metav1.LabelSelectorOpExists, - }}, - }, - ) - if err != nil { - return err - } + selectorPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool { + lbls := object.GetLabels() + if lbls == nil { + lbls = map[string]string{} + } + return s.selector.Matches(labels.Set(lbls)) + }) return ctrl.NewControllerManagedBy(mgr). + Named("statefulset_decommissioner"). For(&appsv1.StatefulSet{}). Owns(&corev1.Pod{}). // PVCs don't have a "true" owner ref, so instead we attempt to map backwards via labels Watches(&corev1.PersistentVolumeClaim{}, handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []ctrl.Request { claim := o.(*corev1.PersistentVolumeClaim) - labels := claim.GetLabels() - - // a bit of defensive programming, but we should always have labels due to our use - // of a predicate - if labels == nil { - // we have no labels, so we can't map anything - return nil + lbls := claim.GetLabels() + if lbls == nil { + lbls = map[string]string{} } - release := labels[k8sInstanceLabelKey] - if release == "" { - // we have an invalid release name, so skip + // Validate that this PVC is within our purview by making sure it + // satisfies our label selector. (This selector MUST match both PVCs and + // STSs.) + if !s.selector.Matches(labels.Set(lbls)) { return nil } - if !strings.HasPrefix(claim.Name, datadirVolume+"-") { - // we only care about the datadir volume + // Extract the STS name by parsing the naming convention $VOL-$STS-$ORDINAL. + match := datadirRE.FindStringSubmatch(claim.Name) + if match == nil { return nil } // if we are here, it means we can map to a real stateful set return []ctrl.Request{ {NamespacedName: types.NamespacedName{ - Name: release, + Name: match[1], Namespace: claim.Namespace, }}, } - }), builder.WithPredicates(pvcPredicate)). + }), builder.WithPredicates(selectorPredicate)). Complete(s) } @@ -276,6 +248,18 @@ func (s *StatefulSetDecomissioner) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, nil } + // If a filter function has been provided, run it. + if s.filter != nil { + inbounds, err := s.filter(ctx, set) + if err != nil { + return ctrl.Result{}, err + } + + if !inbounds { + return ctrl.Result{}, nil + } + } + requeue, err := s.Decommission(ctx, set) if err != nil { // we already logged any error, just requeue directly, delegating to the @@ -340,17 +324,6 @@ func (s *StatefulSetDecomissioner) Decommission(ctx context.Context, set *appsv1 log := ctrl.LoggerFrom(ctx, "namespace", set.Namespace, "name", set.Name).WithName("StatefulSetDecommissioner.Decomission") - keep, err := s.filter(ctx, set) - if err != nil { - log.Error(err, "error filtering StatefulSet") - return false, err - } - - if !keep { - log.V(traceLevel).Info("skipping decommission, StatefulSet filtered out") - return false, nil - } - setCacheKey := client.ObjectKeyFromObject(set) if s.cleanupPVCs { unboundVolumeClaims, err := s.findUnboundVolumeClaims(ctx, set) @@ -373,7 +346,7 @@ func (s *StatefulSetDecomissioner) Decommission(ctx context.Context, set *appsv1 s.delayedVolumeCache.Mark(setCacheKey, client.ObjectKeyFromObject(claim)) } - // now we attempt to clean up the first of the PVCs that meets the treshold of the cache, + // now we attempt to clean up the first of the PVCs that meets the threshold of the cache, // ensuring that their PVs have a retain policy, and short-circuiting the rest of reconciliation // if we actually delete a claim for _, claim := range unboundVolumeClaims { @@ -763,24 +736,6 @@ func (s *StatefulSetDecomissioner) findUnboundVolumeClaims(ctx context.Context, return unbound, nil } -// getAdminClient initializes an admin API client for a cluster that a statefulset manages. It does this by -// delegating to a "fetcher" which fetches the equivalent values.yaml map from either a Redpanda CR or an -// installed helm release. It then effectively turns this into a Redpanda CR that can be used for initializing -// clients based on existing factory code. -func (s *StatefulSetDecomissioner) getAdminClient(ctx context.Context, set *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) { - release, ok := set.Labels[k8sInstanceLabelKey] - if !ok { - return nil, errors.New("unable to get release name") - } - - fetched, err := s.fetcher.FetchLatest(ctx, release, set.Namespace) - if err != nil { - return nil, fmt.Errorf("fetching latest values: %w", err) - } - - return s.factory.RedpandaAdminClient(ctx, fetched) -} - // podNameFromFQDN takes a hostname and attempt to map the // name back to a stateful set pod ordinal based on the left // most DNS segment containing the form SETNAME-ORDINAL. diff --git a/operator/internal/controller/decommissioning/statefulset_decommissioner_test.go b/operator/internal/controller/decommissioning/statefulset_decommissioner_test.go index 46b1e4c78..4e654fbff 100644 --- a/operator/internal/controller/decommissioning/statefulset_decommissioner_test.go +++ b/operator/internal/controller/decommissioning/statefulset_decommissioner_test.go @@ -16,11 +16,12 @@ import ( "testing" "time" + "github.com/cockroachdb/errors" "github.com/go-logr/logr/testr" "github.com/redpanda-data/common-go/rpadmin" "github.com/stretchr/testify/suite" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -41,6 +42,8 @@ import ( "github.com/redpanda-data/redpanda-operator/pkg/testutil" ) +const redpandaChartPath = "../../../../charts/redpanda/chart" + //go:embed testdata/role.yaml var decommissionerRBAC []byte @@ -58,13 +61,13 @@ type StatefulSetDecommissionerSuite struct { client client.Client helm *helm.Client clientFactory internalclient.ClientFactory + releases map[string]*chart } var _ suite.SetupAllSuite = (*StatefulSetDecommissionerSuite)(nil) func (s *StatefulSetDecommissionerSuite) TestDecommission() { - s.T().Skip("we currently have issues with the eviction code in this test due to pod disruption budgets") - chart := s.installChart("basic", "", map[string]any{ + chart := s.installChart("basic", map[string]any{ "statefulset": map[string]any{ "replicas": 5, }, @@ -109,7 +112,10 @@ func (s *StatefulSetDecommissionerSuite) TestDecommission() { s.T().Cleanup(func() { s.untaintNode(firstBrokerNode) }) - s.Require().NoError(s.client.SubResource("eviction").Create(s.ctx, &firstBroker, &policyv1.Eviction{})) + // TODO(chrisseto): Evictions fail in CI with `Cannot evict pod as it would violate the pod's disruption budget.` but not locally. + // For now use a forced delete as that mimics node failure equally well. + // s.Require().NoError(s.client.SubResource("eviction").Create(s.ctx, &firstBroker, &policyv1.Eviction{})) + s.Require().NoError(s.client.Delete(s.ctx, &firstBroker, client.GracePeriodSeconds(0))) s.waitFor(func(ctx context.Context) (bool, error) { health, err := adminClient.GetHealthOverview(ctx) @@ -169,14 +175,16 @@ func (s *StatefulSetDecommissionerSuite) SetupSuite() { }) s.ctx = context.Background() + s.releases = map[string]*chart{} s.env = testenv.New(t, testenv.Options{ // We need our own cluster for these tests since we need additional // agents. Otherwise we can just turn up the default... but we'll // need a different cluster to manipulate for node cleanup anyway. - Name: "decommissioning", - Agents: 5, - Scheme: scheme, - Logger: log, + Name: "decommissioning", + Agents: 5, + SkipVCluster: true, + Scheme: scheme, + Logger: log, }) s.client = s.env.Client() @@ -188,25 +196,31 @@ func (s *StatefulSetDecommissionerSuite) SetupSuite() { if err != nil { return err } - if err := helmClient.RepoAdd(s.ctx, "redpandadata", "https://charts.redpanda.com"); err != nil { - return err - } s.helm = helmClient dialer := kube.NewPodDialer(mgr.GetConfig()) s.clientFactory = internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient()).WithDialer(dialer.DialContext) - options := []decommissioning.Option{ - // override this so we can dial directly to our Redpanda pods - decommissioning.WithFactory(s.clientFactory), + decommissioner := decommissioning.NewStatefulSetDecommissioner( + mgr, + func(ctx context.Context, sts *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) { + // NB: We expect to see some errors here. The release isn't + // populated until after the chart installation finishes. + name := sts.Labels["app.kubernetes.io/instance"] + release, ok := s.releases[name] + if !ok { + return nil, errors.Newf("no release: %q", name) + } + return s.adminClientFor(release), nil + }, // set these low so that we don't have to wait forever in the test // these settings should give about a 5-10 second window before // actually running a decommission - decommissioning.WithDelayedCacheInterval(5 * time.Second), + decommissioning.WithDelayedCacheInterval(5*time.Second), decommissioning.WithDelayedCacheMaxCount(2), - decommissioning.WithRequeueTimeout(2 * time.Second), - } - decommissioner := decommissioning.NewStatefulSetDecommissioner(mgr, decommissioning.NewHelmFetcher(mgr), options...) + decommissioning.WithRequeueTimeout(2*time.Second), + ) + if err := decommissioner.SetupWithManager(mgr); err != nil { return err } @@ -217,12 +231,11 @@ func (s *StatefulSetDecommissionerSuite) SetupSuite() { type chart struct { name string - version string release helm.Release values map[string]any } -func (s *StatefulSetDecommissionerSuite) installChart(name, version string, overrides map[string]any) *chart { +func (s *StatefulSetDecommissionerSuite) installChart(name string, overrides map[string]any) *chart { values := map[string]any{ "statefulset": map[string]any{ "replicas": 1, @@ -233,18 +246,13 @@ func (s *StatefulSetDecommissionerSuite) installChart(name, version string, over "external": map[string]any{ "enabled": false, }, - "image": map[string]any{ - "repository": "redpandadata/redpanda", - "tag": "v24.3.1", - }, } if overrides != nil { values = functional.MergeMaps(values, overrides) } - release, err := s.helm.Install(s.ctx, "redpandadata/redpanda", helm.InstallOptions{ - Version: version, + release, err := s.helm.Install(s.ctx, redpandaChartPath, helm.InstallOptions{ CreateNamespace: true, Name: name, Namespace: s.env.Namespace(), @@ -252,12 +260,15 @@ func (s *StatefulSetDecommissionerSuite) installChart(name, version string, over }) s.Require().NoError(err) - return &chart{ + c := &chart{ name: name, - version: version, values: values, release: release, } + + s.releases[name] = c + + return c } func (s *StatefulSetDecommissionerSuite) adminClientFor(chart *chart) *rpadmin.AdminAPI { @@ -283,8 +294,7 @@ func (s *StatefulSetDecommissionerSuite) adminClientFor(chart *chart) *rpadmin.A func (s *StatefulSetDecommissionerSuite) upgradeChart(chart *chart, overrides map[string]any) { values := functional.MergeMaps(chart.values, overrides) - release, err := s.helm.Upgrade(s.ctx, chart.release.Name, "redpandadata/redpanda", helm.UpgradeOptions{ - Version: chart.version, + release, err := s.helm.Upgrade(s.ctx, chart.release.Name, redpandaChartPath, helm.UpgradeOptions{ Namespace: s.env.Namespace(), Values: values, }) @@ -306,10 +316,14 @@ func (s *StatefulSetDecommissionerSuite) setupRBAC() string { clusterRole := roles[0].(*rbacv1.ClusterRole) // Inject additional permissions required for running in testenv. - role.Rules = append(role.Rules, rbacv1.PolicyRule{ + clusterRole.Rules = append(clusterRole.Rules, rbacv1.PolicyRule{ APIGroups: []string{""}, Resources: []string{"pods/portforward"}, Verbs: []string{"*"}, + }, rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list"}, }) name := "testenv-" + testenv.RandString(6) diff --git a/operator/internal/controller/pvcunbinder/pvcunbinder.go b/operator/internal/controller/pvcunbinder/pvcunbinder.go index a28315ecf..38d8d2b21 100644 --- a/operator/internal/controller/pvcunbinder/pvcunbinder.go +++ b/operator/internal/controller/pvcunbinder/pvcunbinder.go @@ -58,10 +58,6 @@ type Controller struct { // Selector, if specified, will narrow the scope of Pods that this // Reconciler will consider for remediation. Selector labels.Selector - // Filter, if specified, will narrow the scope of Pods that this - // Reconciler will consider for remediation via some sort of filtering - // function. - Filter func(ctx context.Context, pod *corev1.Pod) (bool, error) // AllowRebinding optionally enables clearing of the unbound PV's ClaimRef // which effectively makes the PVs "re-bindable" if the underlying Node // become capable of scheduling Pods once again. @@ -73,20 +69,26 @@ type Controller struct { AllowRebinding bool } -func FilterPodOwner(ownerNamespace, ownerName string) func(ctx context.Context, pod *corev1.Pod) (bool, error) { - filter := filterOwner(ownerNamespace, ownerName) - return func(ctx context.Context, pod *corev1.Pod) (bool, error) { - return filter(pod), nil - } -} - // +kubebuilder:rbac:groups=core,resources=persistentvolumes,verbs=get;list;watch;patch // +kubebuilder:rbac:groups=core,namespace=default,resources=pods,verbs=get;list;watch;delete // +kubebuilder:rbac:groups=core,namespace=default,resources=persistentvolumeclaims,verbs=get;list;watch;delete; func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr).For(&corev1.Pod{}, builder.WithPredicates(predicate.NewPredicateFuncs(pvcUnbinderPredicate))).Complete(r) + selectorPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool { + if r.Selector == nil { + return true + } + + lbls := object.GetLabels() + if lbls == nil { + lbls = map[string]string{} + } + return r.Selector.Matches(labels.Set(lbls)) + }) + unbinderPredicate := predicate.NewPredicateFuncs(pvcUnbinderPredicate) + + return ctrl.NewControllerManagedBy(mgr).For(&corev1.Pod{}, builder.WithPredicates(selectorPredicate, unbinderPredicate)).Complete(r) } // Reconcile implements the algorithm described in the docs of [Reconciler]. To @@ -297,19 +299,6 @@ func (r *Controller) ShouldRemediate(ctx context.Context, pod *corev1.Pod) (bool return false, 0 } - if r.Filter != nil { - keep, err := r.Filter(ctx, pod) - if err != nil { - log.FromContext(ctx).Error(err, "error filtering pod", "name", pod.Name, "labels", pod.Labels) - return false, 0 - } - - if !keep { - log.FromContext(ctx).Info("filter not satisfied; skipping", "name", pod.Name, "labels", pod.Labels) - return false, 0 - } - } - idx := slices.IndexFunc(pod.Status.Conditions, func(cond corev1.PodCondition) bool { return cond.Type == corev1.PodScheduled && cond.Status == corev1.ConditionFalse && cond.Reason == "Unschedulable" }) @@ -379,16 +368,3 @@ func StsPVCs(pod *corev1.Pod) []client.ObjectKey { } return found } - -// TODO extract to wellknown labels package? -const k8sInstanceLabelKey = "app.kubernetes.io/instance" - -func filterOwner(ownerNamespace, ownerName string) func(o client.Object) bool { - return func(o client.Object) bool { - labels := o.GetLabels() - if o.GetNamespace() == ownerNamespace && labels != nil && labels[k8sInstanceLabelKey] == ownerName { - return true - } - return false - } -} diff --git a/pkg/go.mod b/pkg/go.mod index a19ca573a..b10ba91b3 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -18,6 +18,7 @@ require ( github.com/redpanda-data/common-go/secrets v0.1.3 github.com/redpanda-data/redpanda/src/go/rpk v0.0.0-20250716004441-6e1647296ad6 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 + github.com/spf13/pflag v1.0.7 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go/modules/k3s v0.39.0 github.com/wk8/go-ordered-map/v2 v2.1.8 @@ -234,7 +235,6 @@ require ( github.com/spf13/afero v1.12.0 // indirect github.com/spf13/cast v1.7.0 // indirect github.com/spf13/cobra v1.9.1 // indirect - github.com/spf13/pflag v1.0.7 // indirect github.com/stoewer/go-strcase v1.3.1 // indirect github.com/testcontainers/testcontainers-go v0.39.0 // indirect github.com/texttheater/golang-levenshtein v1.0.1 // indirect diff --git a/pkg/pflagutil/flags.go b/pkg/pflagutil/flags.go new file mode 100644 index 000000000..18ee01569 --- /dev/null +++ b/pkg/pflagutil/flags.go @@ -0,0 +1,48 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +// package pflagutil contains custom implementations of [pflag.Value] for +// commonly used CLI options. +package pflagutil + +import ( + "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/labels" +) + +// LabelSelectorValue is a [pflag.Value] implementation for Kubernetes label selectors. +// Usage: +// +// var selector LabelSelectorValue +// cmd.Flags().Var(&selector, ...) +type LabelSelectorValue struct { + Selector labels.Selector +} + +var _ pflag.Value = ((*LabelSelectorValue)(nil)) + +func (s *LabelSelectorValue) Set(value string) error { + if value == "" { + return nil + } + var err error + s.Selector, err = labels.Parse(value) + return err +} + +func (s *LabelSelectorValue) String() string { + if s.Selector == nil { + return "" + } + return s.Selector.String() +} + +func (s *LabelSelectorValue) Type() string { + return "label-selector" +}