@@ -17,26 +17,18 @@ import (
1717 "fmt"
1818 "os"
1919 "path/filepath"
20- "slices"
2120 "strings"
2221 "time"
2322
2423 "github.com/cockroachdb/errors"
2524 "github.com/spf13/cobra"
26- "github.com/spf13/pflag"
2725 helmkube "helm.sh/helm/v3/pkg/kube"
28- appsv1 "k8s.io/api/apps/v1"
2926 corev1 "k8s.io/api/core/v1"
30- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3127 "k8s.io/apimachinery/pkg/labels"
32- "k8s.io/apimachinery/pkg/types"
3328 _ "k8s.io/client-go/plugin/pkg/client/auth"
34- "k8s.io/utils/ptr"
3529 ctrl "sigs.k8s.io/controller-runtime"
3630 "sigs.k8s.io/controller-runtime/pkg/cache"
3731 "sigs.k8s.io/controller-runtime/pkg/certwatcher"
38- "sigs.k8s.io/controller-runtime/pkg/client"
39- kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
4032 "sigs.k8s.io/controller-runtime/pkg/healthz"
4133 "sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4234 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -57,10 +49,10 @@ import (
5749 adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin"
5850 internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
5951 consolepkg "github.com/redpanda-data/redpanda-operator/operator/pkg/console"
60- pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
6152 "github.com/redpanda-data/redpanda-operator/operator/pkg/resources"
6253 pkgsecrets "github.com/redpanda-data/redpanda-operator/operator/pkg/secrets"
6354 redpandawebhooks "github.com/redpanda-data/redpanda-operator/operator/webhooks/redpanda"
55+ "github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
6456)
6557
6658type RedpandaController string
@@ -90,32 +82,6 @@ var availableControllers = []string{
9082 DecommissionController .toString (),
9183}
9284
93- type LabelSelectorValue struct {
94- Selector labels.Selector
95- }
96-
97- var _ pflag.Value = ((* LabelSelectorValue )(nil ))
98-
99- func (s * LabelSelectorValue ) Set (value string ) error {
100- if value == "" {
101- return nil
102- }
103- var err error
104- s .Selector , err = labels .Parse (value )
105- return err
106- }
107-
108- func (s * LabelSelectorValue ) String () string {
109- if s .Selector == nil {
110- return ""
111- }
112- return s .Selector .String ()
113- }
114-
115- func (s * LabelSelectorValue ) Type () string {
116- return "label selector"
117- }
118-
11985// Metrics RBAC permissions
12086// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create;
12187// +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create;
@@ -148,7 +114,7 @@ func Command() *cobra.Command {
148114 operatorMode bool
149115 ghostbuster bool
150116 unbindPVCsAfter time.Duration
151- unbinderSelector LabelSelectorValue
117+ unbinderSelector pflagutil. LabelSelectorValue
152118 allowPVRebinding bool
153119 autoDeletePVCs bool
154120 webhookCertPath string
@@ -295,21 +261,6 @@ func Command() *cobra.Command {
295261 return cmd
296262}
297263
298- type v1Fetcher struct {
299- client kubeClient.Client
300- }
301-
302- func (f * v1Fetcher ) FetchLatest (ctx context.Context , name , namespace string ) (any , error ) {
303- var vectorizedCluster vectorizedv1alpha1.Cluster
304- if err := f .client .Get (ctx , types.NamespacedName {
305- Name : name ,
306- Namespace : namespace ,
307- }, & vectorizedCluster ); err != nil {
308- return nil , err
309- }
310- return & vectorizedCluster , nil
311- }
312-
313264//nolint:funlen,gocyclo // length looks good
314265func Run (
315266 ctx context.Context ,
@@ -704,7 +655,11 @@ func Run(
704655 }
705656
706657 if enableGhostBrokerDecommissioner {
707- d := decommissioning .NewStatefulSetDecommissioner (mgr , & v1Fetcher {client : mgr .GetClient ()},
658+ factory := internalclient .NewFactory (mgr .GetConfig (), mgr .GetClient ()).WithAdminClientTimeout (rpClientTimeout )
659+ adapter := vectorizedDecommissionerAdapter {factory : factory , client : mgr .GetClient ()}
660+ d := decommissioning .NewStatefulSetDecommissioner (
661+ mgr ,
662+ adapter .getAdminClient ,
708663 decommissioning .WithSyncPeriod (ghostBrokerDecommissionerSyncPeriod ),
709664 decommissioning .WithCleanupPVCs (false ),
710665 // In Operator v1, decommissioning based on pod ordinal is not correct because
@@ -714,105 +669,8 @@ func Run(
714669 decommissioning .WithDecommisionOnTooHighOrdinal (false ),
715670 // Operator v1 supports multiple NodePools, and therefore multiple STS.
716671 // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
717- decommissioning .WithDesiredReplicasFetcher (func (ctx context.Context , sts * appsv1.StatefulSet ) (int32 , error ) {
718- // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
719- idx := slices .IndexFunc (
720- sts .OwnerReferences ,
721- func (ownerRef metav1.OwnerReference ) bool {
722- return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
723- })
724- if idx == - 1 {
725- return 0 , nil
726- }
727-
728- var vectorizedCluster vectorizedv1alpha1.Cluster
729- if err := mgr .GetClient ().Get (ctx , types.NamespacedName {
730- Name : sts .OwnerReferences [idx ].Name ,
731- Namespace : sts .Namespace ,
732- }, & vectorizedCluster ); err != nil {
733- return 0 , fmt .Errorf ("could not get Cluster: %w" , err )
734- }
735-
736- // We assume the cluster is fine and synced, checks have been performed in the filter already.
737-
738- // Get all nodepool-sts for this Cluster
739- var stsList appsv1.StatefulSetList
740- err := mgr .GetClient ().List (ctx , & stsList , & client.ListOptions {
741- LabelSelector : pkglabels .ForCluster (& vectorizedCluster ).AsClientSelector (),
742- })
743- if err != nil {
744- return 0 , fmt .Errorf ("failed to list statefulsets of Cluster: %w" , err )
745- }
746-
747- if len (stsList .Items ) == 0 {
748- return 0 , errors .New ("found 0 StatefulSets for this Cluster" )
749- }
750-
751- var allReplicas int32
752- for _ , sts := range stsList .Items {
753- allReplicas += ptr .Deref (sts .Spec .Replicas , 0 )
754- }
755-
756- // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
757- if allReplicas < 3 {
758- return 0 , fmt .Errorf ("found %d desiredReplicas, but want >= 3" , allReplicas )
759- }
760-
761- if allReplicas != vectorizedCluster .Status .CurrentReplicas || allReplicas != vectorizedCluster .Status .Replicas {
762- return 0 , fmt .Errorf ("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d" , vectorizedCluster .Status .CurrentReplicas , vectorizedCluster .Status .Replicas , allReplicas )
763- }
764-
765- return allReplicas , nil
766- }),
767- decommissioning .WithFactory (internalclient .NewFactory (mgr .GetConfig (), mgr .GetClient ())),
768- decommissioning .WithFilter (func (ctx context.Context , sts * appsv1.StatefulSet ) (bool , error ) {
769- log := ctrl .LoggerFrom (ctx , "namespace" , sts .Namespace ).WithName ("StatefulSetDecomissioner.Filter" )
770- idx := slices .IndexFunc (
771- sts .OwnerReferences ,
772- func (ownerRef metav1.OwnerReference ) bool {
773- return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
774- })
775- if idx == - 1 {
776- return false , nil
777- }
778-
779- var vectorizedCluster vectorizedv1alpha1.Cluster
780- if err := mgr .GetClient ().Get (ctx , types.NamespacedName {
781- Name : sts .OwnerReferences [idx ].Name ,
782- Namespace : sts .Namespace ,
783- }, & vectorizedCluster ); err != nil {
784- return false , fmt .Errorf ("could not get Cluster: %w" , err )
785- }
786-
787- managedAnnotationKey := vectorizedv1alpha1 .GroupVersion .Group + "/managed"
788- if managed , exists := vectorizedCluster .Annotations [managedAnnotationKey ]; exists && managed == "false" {
789- log .V (1 ).Info ("ignoring StatefulSet of unmanaged V1 Cluster" , "sts" , sts .Name , "namespace" , sts .Namespace )
790- return false , nil
791- }
792-
793- // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
794- // (and we can therefore not use it to check if the cluster is synced otherwise)
795- if vectorizedCluster .Status .CurrentReplicas != vectorizedCluster .Status .Replicas {
796- log .V (1 ).Info ("replicas are not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
797- return false , nil
798- }
799- if vectorizedCluster .Status .Restarting {
800- log .V (1 ).Info ("cluster is restarting" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
801- return false , nil
802- }
803-
804- if vectorizedCluster .Status .ObservedGeneration != vectorizedCluster .Generation {
805- log .V (1 ).Info ("generation not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "generation" , vectorizedCluster .Generation , "observedGeneration" , vectorizedCluster .Status .ObservedGeneration )
806- return false , nil
807- }
808-
809- if vectorizedCluster .Status .DecommissioningNode != nil {
810- log .V (1 ).Info ("decommission in progress" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "node" , * vectorizedCluster .Status .DecommissioningNode )
811- return false , nil
812- }
813-
814- return true , nil
815- }),
672+ decommissioning .WithDesiredReplicasFetcher (adapter .desiredReplicas ),
673+ decommissioning .WithFilter (adapter .filter ),
816674 )
817675 if err := d .SetupWithManager (mgr ); err != nil {
818676 setupLog .Error (err , "unable to create controller" , "controller" , "StatefulSetDecommissioner" )
0 commit comments