@@ -21,6 +21,7 @@ import (
2121 "time"
2222
2323 "github.com/cockroachdb/errors"
24+ "github.com/redpanda-data/common-go/rpadmin"
2425 "github.com/spf13/cobra"
2526 "github.com/spf13/pflag"
2627 appsv1 "k8s.io/api/apps/v1"
@@ -34,7 +35,6 @@ import (
3435 "sigs.k8s.io/controller-runtime/pkg/cache"
3536 "sigs.k8s.io/controller-runtime/pkg/certwatcher"
3637 "sigs.k8s.io/controller-runtime/pkg/client"
37- kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
3838 "sigs.k8s.io/controller-runtime/pkg/healthz"
3939 "sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4040 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -566,21 +566,6 @@ func Run(
566566 return nil
567567}
568568
569- type v1Fetcher struct {
570- client kubeClient.Client
571- }
572-
573- func (f * v1Fetcher ) FetchLatest (ctx context.Context , name , namespace string ) (any , error ) {
574- var vectorizedCluster vectorizedv1alpha1.Cluster
575- if err := f .client .Get (ctx , types.NamespacedName {
576- Name : name ,
577- Namespace : namespace ,
578- }, & vectorizedCluster ); err != nil {
579- return nil , err
580- }
581- return & vectorizedCluster , nil
582- }
583-
584569// setupVectorizedControllers configures and registers controllers and
585570// runnables for the custom resources in the vectorized group, AKA the V1
586571// operator.
@@ -632,123 +617,155 @@ func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpa
632617 }
633618 }
634619
635- if opts .enableGhostBrokerDecommissioner {
636- d := decommissioning .NewStatefulSetDecommissioner (mgr , & v1Fetcher {client : mgr .GetClient ()},
620+ if opts .enableGhostBrokerDecommissioner && opts .enableVectorizedControllers {
621+ b := vectorizedDecommissioner {client : mgr .GetClient ()}
622+ d := decommissioning .NewStatefulSetDecommissioner (
623+ mgr ,
624+ b .getAdminClient ,
625+ decommissioning .WithFilter (b .filter ),
626+ // Operator v1 supports multiple NodePools, and therefore multiple STS.
627+ // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
628+ decommissioning .WithDesiredReplicasFetcher (b .desiredReplicas ),
637629 decommissioning .WithSyncPeriod (opts .ghostBrokerDecommissionerSyncPeriod ),
638630 decommissioning .WithCleanupPVCs (false ),
639631 // In Operator v1, decommissioning based on pod ordinal is not correct because
640632 // it has controller code that manages decommissioning. If something else decommissions the node, it can not deal with this under all circumstances because of various reasons, eg. bercause of a protection against stale status reads of status.currentReplicas
641633 // (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139)
642634 // In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode)
643635 decommissioning .WithDecommisionOnTooHighOrdinal (false ),
644- // Operator v1 supports multiple NodePools, and therefore multiple STS.
645- // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
646- decommissioning .WithDesiredReplicasFetcher (func (ctx context.Context , sts * appsv1.StatefulSet ) (int32 , error ) {
647- // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
648- idx := slices .IndexFunc (
649- sts .OwnerReferences ,
650- func (ownerRef metav1.OwnerReference ) bool {
651- return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
652- })
653- if idx == - 1 {
654- return 0 , nil
655- }
636+ )
656637
657- var vectorizedCluster vectorizedv1alpha1.Cluster
658- if err := mgr .GetClient ().Get (ctx , types.NamespacedName {
659- Name : sts .OwnerReferences [idx ].Name ,
660- Namespace : sts .Namespace ,
661- }, & vectorizedCluster ); err != nil {
662- return 0 , fmt .Errorf ("could not get Cluster: %w" , err )
663- }
638+ if err := d .SetupWithManager (mgr ); err != nil {
639+ log .Error (ctx , err , "unable to create controller" , "controller" , "StatefulSetDecommissioner" )
640+ return err
641+ }
642+ }
664643
665- // We assume the cluster is fine and synced, checks have been performed in the filter already.
644+ return nil
645+ }
666646
667- // Get all nodepool-sts for this Cluster
668- var stsList appsv1.StatefulSetList
669- err := mgr .GetClient ().List (ctx , & stsList , & client.ListOptions {
670- LabelSelector : pkglabels .ForCluster (& vectorizedCluster ).AsClientSelector (),
671- })
672- if err != nil {
673- return 0 , fmt .Errorf ("failed to list statefulsets of Cluster: %w" , err )
674- }
647+ // vectorizedDecommissioner is a helper struct that implements various methods
648+ // of mapping StatefulSets through Vectorized Clusters to arguments for the
649+ // StatefulSetDecommissioner.
650+ type vectorizedDecommissioner struct {
651+ client client.Client
652+ factory internalclient.ClientFactory
653+ }
675654
676- if len (stsList .Items ) == 0 {
677- return 0 , errors .New ("found 0 StatefulSets for this Cluster" )
678- }
655+ func (b * vectorizedDecommissioner ) desiredReplicas (ctx context.Context , sts * appsv1.StatefulSet ) (int32 , error ) {
656+ // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
657+ vectorizedCluster , err := b .getCluster (ctx , sts )
658+ if err != nil {
659+ return 0 , err
660+ }
679661
680- var allReplicas int32
681- for _ , sts := range stsList .Items {
682- allReplicas += ptr .Deref (sts .Spec .Replicas , 0 )
683- }
662+ if vectorizedCluster == nil {
663+ return 0 , nil
664+ }
684665
685- // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
686- if allReplicas < 3 {
687- return 0 , fmt .Errorf ("found %d desiredReplicas, but want >= 3" , allReplicas )
688- }
666+ // We assume the cluster is fine and synced, checks have been performed in the filter already.
689667
690- if allReplicas != vectorizedCluster .Status .CurrentReplicas || allReplicas != vectorizedCluster .Status .Replicas {
691- return 0 , fmt .Errorf ("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d" , vectorizedCluster .Status .CurrentReplicas , vectorizedCluster .Status .Replicas , allReplicas )
692- }
668+ // Get all nodepool-sts for this Cluster
669+ var stsList appsv1.StatefulSetList
670+ if err := b .client .List (ctx , & stsList , & client.ListOptions {
671+ LabelSelector : pkglabels .ForCluster (vectorizedCluster ).AsClientSelector (),
672+ }); err != nil {
673+ return 0 , fmt .Errorf ("failed to list statefulsets of Cluster: %w" , err )
674+ }
693675
694- return allReplicas , nil
695- }),
696- decommissioning .WithFactory (internalclient .NewFactory (mgr .GetConfig (), mgr .GetClient ())),
697- decommissioning .WithFilter (func (ctx context.Context , sts * appsv1.StatefulSet ) (bool , error ) {
698- log := ctrl .LoggerFrom (ctx , "namespace" , sts .Namespace ).WithName ("StatefulSetDecomissioner.Filter" )
699- idx := slices .IndexFunc (
700- sts .OwnerReferences ,
701- func (ownerRef metav1.OwnerReference ) bool {
702- return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
703- })
704- if idx == - 1 {
705- return false , nil
706- }
676+ if len (stsList .Items ) == 0 {
677+ return 0 , errors .New ("found 0 StatefulSets for this Cluster" )
678+ }
707679
708- var vectorizedCluster vectorizedv1alpha1.Cluster
709- if err := mgr .GetClient ().Get (ctx , types.NamespacedName {
710- Name : sts .OwnerReferences [idx ].Name ,
711- Namespace : sts .Namespace ,
712- }, & vectorizedCluster ); err != nil {
713- return false , fmt .Errorf ("could not get Cluster: %w" , err )
714- }
680+ var allReplicas int32
681+ for _ , sts := range stsList .Items {
682+ allReplicas += ptr .Deref (sts .Spec .Replicas , 0 )
683+ }
715684
716- managedAnnotationKey := vectorizedv1alpha1 .GroupVersion .Group + "/managed"
717- if managed , exists := vectorizedCluster .Annotations [managedAnnotationKey ]; exists && managed == "false" {
718- log .V (1 ).Info ("ignoring StatefulSet of unmanaged V1 Cluster" , "sts" , sts .Name , "namespace" , sts .Namespace )
719- return false , nil
720- }
685+ // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
686+ if allReplicas < 3 {
687+ return 0 , errors .Newf ("found %d desiredReplicas, but want >= 3" , allReplicas )
688+ }
721689
722- // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
723- // (and we can therefore not use it to check if the cluster is synced otherwise)
724- if vectorizedCluster .Status .CurrentReplicas != vectorizedCluster .Status .Replicas {
725- log .V (1 ).Info ("replicas are not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
726- return false , nil
727- }
728- if vectorizedCluster .Status .Restarting {
729- log .V (1 ).Info ("cluster is restarting" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
730- return false , nil
731- }
690+ if allReplicas != vectorizedCluster .Status .CurrentReplicas || allReplicas != vectorizedCluster .Status .Replicas {
691+ return 0 , errors .Newf ("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d" , vectorizedCluster .Status .CurrentReplicas , vectorizedCluster .Status .Replicas , allReplicas )
692+ }
732693
733- if vectorizedCluster .Status .ObservedGeneration != vectorizedCluster .Generation {
734- log .V (1 ).Info ("generation not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "generation" , vectorizedCluster .Generation , "observedGeneration" , vectorizedCluster .Status .ObservedGeneration )
735- return false , nil
736- }
694+ return allReplicas , nil
695+ }
737696
738- if vectorizedCluster .Status .DecommissioningNode != nil {
739- log .V (1 ).Info ("decommission in progress" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "node" , * vectorizedCluster .Status .DecommissioningNode )
740- return false , nil
741- }
697+ func (b * vectorizedDecommissioner ) filter (ctx context.Context , sts * appsv1.StatefulSet ) (bool , error ) {
698+ log := ctrl .LoggerFrom (ctx , "namespace" , sts .Namespace ).WithName ("StatefulSetDecomissioner.Filter" )
742699
743- return true , nil
744- }),
745- )
700+ vectorizedCluster , err := b .getCluster (ctx , sts )
701+ if err != nil {
702+ return false , err
703+ }
746704
747- if err := d .SetupWithManager (mgr ); err != nil {
748- log .Error (ctx , err , "unable to create controller" , "controller" , "StatefulSetDecommissioner" )
749- return err
750- }
705+ if vectorizedCluster == nil {
706+ return false , nil
751707 }
752708
753- return nil
709+ managedAnnotationKey := vectorizedv1alpha1 .GroupVersion .Group + "/managed"
710+ if managed , exists := vectorizedCluster .Annotations [managedAnnotationKey ]; exists && managed == "false" {
711+ log .V (1 ).Info ("ignoring StatefulSet of unmanaged V1 Cluster" , "sts" , sts .Name , "namespace" , sts .Namespace )
712+ return false , nil
713+ }
714+
715+ // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
716+ // (and we can therefore not use it to check if the cluster is synced otherwise)
717+ if vectorizedCluster .Status .CurrentReplicas != vectorizedCluster .Status .Replicas {
718+ log .V (1 ).Info ("replicas are not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
719+ return false , nil
720+ }
721+ if vectorizedCluster .Status .Restarting {
722+ log .V (1 ).Info ("cluster is restarting" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
723+ return false , nil
724+ }
725+
726+ if vectorizedCluster .Status .ObservedGeneration != vectorizedCluster .Generation {
727+ log .V (1 ).Info ("generation not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "generation" , vectorizedCluster .Generation , "observedGeneration" , vectorizedCluster .Status .ObservedGeneration )
728+ return false , nil
729+ }
730+
731+ if vectorizedCluster .Status .DecommissioningNode != nil {
732+ log .V (1 ).Info ("decommission in progress" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "node" , * vectorizedCluster .Status .DecommissioningNode )
733+ return false , nil
734+ }
735+
736+ return true , nil
737+ }
738+
739+ func (b * vectorizedDecommissioner ) getAdminClient (ctx context.Context , sts * appsv1.StatefulSet ) (* rpadmin.AdminAPI , error ) {
740+ cluster , err := b .getCluster (ctx , sts )
741+ if err != nil {
742+ return nil , err
743+ }
744+
745+ if cluster == nil {
746+ return nil , errors .Newf ("failed to resolve %s/%s to vectorized cluster" , sts .Namespace , sts .Name )
747+ }
748+
749+ return b .factory .RedpandaAdminClient (ctx , cluster )
750+ }
751+
752+ func (b * vectorizedDecommissioner ) getCluster (ctx context.Context , sts * appsv1.StatefulSet ) (* vectorizedv1alpha1.Cluster , error ) {
753+ idx := slices .IndexFunc (
754+ sts .OwnerReferences ,
755+ func (ownerRef metav1.OwnerReference ) bool {
756+ return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
757+ })
758+ if idx == - 1 {
759+ return nil , nil
760+ }
761+
762+ var vectorizedCluster vectorizedv1alpha1.Cluster
763+ if err := b .client .Get (ctx , types.NamespacedName {
764+ Name : sts .OwnerReferences [idx ].Name ,
765+ Namespace : sts .Namespace ,
766+ }, & vectorizedCluster ); err != nil {
767+ return nil , errors .Wrap (err , "could not get Cluster" )
768+ }
769+
770+ return & vectorizedCluster , nil
754771}
0 commit comments