@@ -17,27 +17,19 @@ import (
1717 "fmt"
1818 "os"
1919 "path/filepath"
20- "slices"
2120 "strings"
2221 "time"
2322
2423 "github.com/cockroachdb/errors"
2524 fluxclient "github.com/fluxcd/pkg/runtime/client"
2625 "github.com/spf13/cobra"
27- "github.com/spf13/pflag"
2826 helmkube "helm.sh/helm/v3/pkg/kube"
29- appsv1 "k8s.io/api/apps/v1"
3027 corev1 "k8s.io/api/core/v1"
31- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3228 "k8s.io/apimachinery/pkg/labels"
33- "k8s.io/apimachinery/pkg/types"
3429 _ "k8s.io/client-go/plugin/pkg/client/auth"
35- "k8s.io/utils/ptr"
3630 ctrl "sigs.k8s.io/controller-runtime"
3731 "sigs.k8s.io/controller-runtime/pkg/cache"
3832 "sigs.k8s.io/controller-runtime/pkg/certwatcher"
39- "sigs.k8s.io/controller-runtime/pkg/client"
40- kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
4133 "sigs.k8s.io/controller-runtime/pkg/healthz"
4234 "sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4335 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -58,10 +50,10 @@ import (
5850 adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin"
5951 internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
6052 consolepkg "github.com/redpanda-data/redpanda-operator/operator/pkg/console"
61- pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
6253 "github.com/redpanda-data/redpanda-operator/operator/pkg/resources"
6354 pkgsecrets "github.com/redpanda-data/redpanda-operator/operator/pkg/secrets"
6455 redpandawebhooks "github.com/redpanda-data/redpanda-operator/operator/webhooks/redpanda"
56+ "github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
6557)
6658
6759type RedpandaController string
9991 }
10092)
10193
102- type LabelSelectorValue struct {
103- Selector labels.Selector
104- }
105-
106- var _ pflag.Value = ((* LabelSelectorValue )(nil ))
107-
108- func (s * LabelSelectorValue ) Set (value string ) error {
109- if value == "" {
110- return nil
111- }
112- var err error
113- s .Selector , err = labels .Parse (value )
114- return err
115- }
116-
117- func (s * LabelSelectorValue ) String () string {
118- if s .Selector == nil {
119- return ""
120- }
121- return s .Selector .String ()
122- }
123-
124- func (s * LabelSelectorValue ) Type () string {
125- return "label selector"
126- }
127-
12894// Metrics RBAC permissions
12995// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create;
13096// +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create;
@@ -157,7 +123,7 @@ func Command() *cobra.Command {
157123 enableHelmControllers bool
158124 ghostbuster bool
159125 unbindPVCsAfter time.Duration
160- unbinderSelector LabelSelectorValue
126+ unbinderSelector pflagutil. LabelSelectorValue
161127 allowPVRebinding bool
162128 autoDeletePVCs bool
163129 forceDefluxedMode bool
@@ -313,21 +279,6 @@ func Command() *cobra.Command {
313279 return cmd
314280}
315281
316- type v1Fetcher struct {
317- client kubeClient.Client
318- }
319-
320- func (f * v1Fetcher ) FetchLatest (ctx context.Context , name , namespace string ) (any , error ) {
321- var vectorizedCluster vectorizedv1alpha1.Cluster
322- if err := f .client .Get (ctx , types.NamespacedName {
323- Name : name ,
324- Namespace : namespace ,
325- }, & vectorizedCluster ); err != nil {
326- return nil , err
327- }
328- return & vectorizedCluster , nil
329- }
330-
331282//nolint:funlen,gocyclo // length looks good
332283func Run (
333284 ctx context.Context ,
@@ -723,115 +674,22 @@ func Run(
723674 }
724675
725676 if enableGhostBrokerDecommissioner {
726- d := decommissioning .NewStatefulSetDecommissioner (mgr , & v1Fetcher {client : mgr .GetClient ()},
677+ factory := internalclient .NewFactory (mgr .GetConfig (), mgr .GetClient ())
678+ adapter := vectorizedDecommissionerAdapter {factory : factory , client : mgr .GetClient ()}
679+ d := decommissioning .NewStatefulSetDecommissioner (
680+ mgr ,
681+ adapter .getAdminClient ,
682+ decommissioning .WithFilter (adapter .filter ),
683+ // Operator v1 supports multiple NodePools, and therefore multiple STS.
684+ // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
685+ decommissioning .WithDesiredReplicasFetcher (adapter .desiredReplicas ),
727686 decommissioning .WithSyncPeriod (ghostBrokerDecommissionerSyncPeriod ),
728687 decommissioning .WithCleanupPVCs (false ),
729688 // In Operator v1, decommissioning based on pod ordinal is not correct because
730689 // it has controller code that manages decommissioning. If something else decommissions the node, it can not deal with this under all circumstances because of various reasons, eg. bercause of a protection against stale status reads of status.currentReplicas
731690 // (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139)
732691 // In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode)
733692 decommissioning .WithDecommisionOnTooHighOrdinal (false ),
734- // Operator v1 supports multiple NodePools, and therefore multiple STS.
735- // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
736- decommissioning .WithDesiredReplicasFetcher (func (ctx context.Context , sts * appsv1.StatefulSet ) (int32 , error ) {
737- // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
738- idx := slices .IndexFunc (
739- sts .OwnerReferences ,
740- func (ownerRef metav1.OwnerReference ) bool {
741- return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
742- })
743- if idx == - 1 {
744- return 0 , nil
745- }
746-
747- var vectorizedCluster vectorizedv1alpha1.Cluster
748- if err := mgr .GetClient ().Get (ctx , types.NamespacedName {
749- Name : sts .OwnerReferences [idx ].Name ,
750- Namespace : sts .Namespace ,
751- }, & vectorizedCluster ); err != nil {
752- return 0 , fmt .Errorf ("could not get Cluster: %w" , err )
753- }
754-
755- // We assume the cluster is fine and synced, checks have been performed in the filter already.
756-
757- // Get all nodepool-sts for this Cluster
758- var stsList appsv1.StatefulSetList
759- err := mgr .GetClient ().List (ctx , & stsList , & client.ListOptions {
760- LabelSelector : pkglabels .ForCluster (& vectorizedCluster ).AsClientSelector (),
761- })
762- if err != nil {
763- return 0 , fmt .Errorf ("failed to list statefulsets of Cluster: %w" , err )
764- }
765-
766- if len (stsList .Items ) == 0 {
767- return 0 , errors .New ("found 0 StatefulSets for this Cluster" )
768- }
769-
770- var allReplicas int32
771- for _ , sts := range stsList .Items {
772- allReplicas += ptr .Deref (sts .Spec .Replicas , 0 )
773- }
774-
775- // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
776- if allReplicas < 3 {
777- return 0 , fmt .Errorf ("found %d desiredReplicas, but want >= 3" , allReplicas )
778- }
779-
780- if allReplicas != vectorizedCluster .Status .CurrentReplicas || allReplicas != vectorizedCluster .Status .Replicas {
781- return 0 , fmt .Errorf ("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d" , vectorizedCluster .Status .CurrentReplicas , vectorizedCluster .Status .Replicas , allReplicas )
782- }
783-
784- return allReplicas , nil
785- }),
786- decommissioning .WithFactory (internalclient .NewFactory (mgr .GetConfig (), mgr .GetClient ())),
787- decommissioning .WithFilter (func (ctx context.Context , sts * appsv1.StatefulSet ) (bool , error ) {
788- log := ctrl .LoggerFrom (ctx , "namespace" , sts .Namespace ).WithName ("StatefulSetDecomissioner.Filter" )
789- idx := slices .IndexFunc (
790- sts .OwnerReferences ,
791- func (ownerRef metav1.OwnerReference ) bool {
792- return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
793- })
794- if idx == - 1 {
795- return false , nil
796- }
797-
798- var vectorizedCluster vectorizedv1alpha1.Cluster
799- if err := mgr .GetClient ().Get (ctx , types.NamespacedName {
800- Name : sts .OwnerReferences [idx ].Name ,
801- Namespace : sts .Namespace ,
802- }, & vectorizedCluster ); err != nil {
803- return false , fmt .Errorf ("could not get Cluster: %w" , err )
804- }
805-
806- managedAnnotationKey := vectorizedv1alpha1 .GroupVersion .Group + "/managed"
807- if managed , exists := vectorizedCluster .Annotations [managedAnnotationKey ]; exists && managed == "false" {
808- log .V (1 ).Info ("ignoring StatefulSet of unmanaged V1 Cluster" , "sts" , sts .Name , "namespace" , sts .Namespace )
809- return false , nil
810- }
811-
812- // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
813- // (and we can therefore not use it to check if the cluster is synced otherwise)
814- if vectorizedCluster .Status .CurrentReplicas != vectorizedCluster .Status .Replicas {
815- log .V (1 ).Info ("replicas are not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
816- return false , nil
817- }
818- if vectorizedCluster .Status .Restarting {
819- log .V (1 ).Info ("cluster is restarting" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
820- return false , nil
821- }
822-
823- if vectorizedCluster .Status .ObservedGeneration != vectorizedCluster .Generation {
824- log .V (1 ).Info ("generation not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "generation" , vectorizedCluster .Generation , "observedGeneration" , vectorizedCluster .Status .ObservedGeneration )
825- return false , nil
826- }
827-
828- if vectorizedCluster .Status .DecommissioningNode != nil {
829- log .V (1 ).Info ("decommission in progress" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "node" , * vectorizedCluster .Status .DecommissioningNode )
830- return false , nil
831- }
832-
833- return true , nil
834- }),
835693 )
836694 if err := d .SetupWithManager (mgr ); err != nil {
837695 setupLog .Error (err , "unable to create controller" , "controller" , "StatefulSetDecommissioner" )
0 commit comments