@@ -16,25 +16,19 @@ import (
1616 "crypto/tls"
1717 "fmt"
1818 "path/filepath"
19- "slices"
2019 "strings"
2120 "time"
2221
2322 "github.com/cockroachdb/errors"
2423 "github.com/spf13/cobra"
2524 "github.com/spf13/pflag"
26- appsv1 "k8s.io/api/apps/v1"
2725 corev1 "k8s.io/api/core/v1"
28- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2926 "k8s.io/apimachinery/pkg/labels"
30- "k8s.io/apimachinery/pkg/types"
3127 _ "k8s.io/client-go/plugin/pkg/client/auth"
32- "k8s.io/utils/ptr"
3328 ctrl "sigs.k8s.io/controller-runtime"
3429 "sigs.k8s.io/controller-runtime/pkg/cache"
3530 "sigs.k8s.io/controller-runtime/pkg/certwatcher"
3631 "sigs.k8s.io/controller-runtime/pkg/client"
37- kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
3832 "sigs.k8s.io/controller-runtime/pkg/healthz"
3933 "sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4034 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -53,7 +47,6 @@ import (
5347 "github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle"
5448 adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin"
5549 internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
56- pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
5750 "github.com/redpanda-data/redpanda-operator/operator/pkg/resources"
5851 "github.com/redpanda-data/redpanda-operator/pkg/kube"
5952 "github.com/redpanda-data/redpanda-operator/pkg/otelutil/log"
@@ -506,7 +499,7 @@ func Run(
506499
507500 if v1Controllers {
508501 setupLog .Info ("setting up vectorized controllers" )
509- if err := setupVectorizedControllers (ctx , mgr , cloudExpander , opts ); err != nil {
502+ if err := setupVectorizedControllers (ctx , mgr , factory , cloudExpander , opts ); err != nil {
510503 return err
511504 }
512505 }
@@ -584,25 +577,10 @@ func Run(
584577 return nil
585578}
586579
587- type v1Fetcher struct {
588- client kubeClient.Client
589- }
590-
591- func (f * v1Fetcher ) FetchLatest (ctx context.Context , name , namespace string ) (any , error ) {
592- var vectorizedCluster vectorizedv1alpha1.Cluster
593- if err := f .client .Get (ctx , types.NamespacedName {
594- Name : name ,
595- Namespace : namespace ,
596- }, & vectorizedCluster ); err != nil {
597- return nil , err
598- }
599- return & vectorizedCluster , nil
600- }
601-
602580// setupVectorizedControllers configures and registers controllers and
603581// runnables for the custom resources in the vectorized group, AKA the V1
604582// operator.
605- func setupVectorizedControllers (ctx context.Context , mgr ctrl.Manager , cloudExpander * pkgsecrets.CloudExpander , opts * RunOptions ) error {
583+ func setupVectorizedControllers (ctx context.Context , mgr ctrl.Manager , factory internalclient. ClientFactory , cloudExpander * pkgsecrets.CloudExpander , opts * RunOptions ) error {
606584 log .Info (ctx , "Starting Vectorized (V1) Controllers" )
607585
608586 configurator := resources.ConfiguratorSettings {
@@ -650,116 +628,22 @@ func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpa
650628 }
651629 }
652630
653- if opts .enableGhostBrokerDecommissioner {
654- d := decommissioning .NewStatefulSetDecommissioner (mgr , & v1Fetcher {client : mgr .GetClient ()},
631+ if opts .enableGhostBrokerDecommissioner && opts .enableVectorizedControllers {
632+ adapter := vectorizedDecommissionerAdapter {factory : factory , client : mgr .GetClient ()}
633+ d := decommissioning .NewStatefulSetDecommissioner (
634+ mgr ,
635+ adapter .getAdminClient ,
636+ decommissioning .WithFilter (adapter .filter ),
637+ // Operator v1 supports multiple NodePools, and therefore multiple STS.
638+ // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
639+ decommissioning .WithDesiredReplicasFetcher (adapter .desiredReplicas ),
655640 decommissioning .WithSyncPeriod (opts .ghostBrokerDecommissionerSyncPeriod ),
656641 decommissioning .WithCleanupPVCs (false ),
657642 // In Operator v1, decommissioning based on pod ordinal is not correct because
658643 // it has controller code that manages decommissioning. If something else decommissions the node, it can not deal with this under all circumstances because of various reasons, eg. bercause of a protection against stale status reads of status.currentReplicas
659644 // (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139)
660645 // In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode)
661646 decommissioning .WithDecommisionOnTooHighOrdinal (false ),
662- // Operator v1 supports multiple NodePools, and therefore multiple STS.
663- // This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
664- decommissioning .WithDesiredReplicasFetcher (func (ctx context.Context , sts * appsv1.StatefulSet ) (int32 , error ) {
665- // Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
666- idx := slices .IndexFunc (
667- sts .OwnerReferences ,
668- func (ownerRef metav1.OwnerReference ) bool {
669- return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
670- })
671- if idx == - 1 {
672- return 0 , nil
673- }
674-
675- var vectorizedCluster vectorizedv1alpha1.Cluster
676- if err := mgr .GetClient ().Get (ctx , types.NamespacedName {
677- Name : sts .OwnerReferences [idx ].Name ,
678- Namespace : sts .Namespace ,
679- }, & vectorizedCluster ); err != nil {
680- return 0 , fmt .Errorf ("could not get Cluster: %w" , err )
681- }
682-
683- // We assume the cluster is fine and synced, checks have been performed in the filter already.
684-
685- // Get all nodepool-sts for this Cluster
686- var stsList appsv1.StatefulSetList
687- err := mgr .GetClient ().List (ctx , & stsList , & client.ListOptions {
688- LabelSelector : pkglabels .ForCluster (& vectorizedCluster ).AsClientSelector (),
689- })
690- if err != nil {
691- return 0 , fmt .Errorf ("failed to list statefulsets of Cluster: %w" , err )
692- }
693-
694- if len (stsList .Items ) == 0 {
695- return 0 , errors .New ("found 0 StatefulSets for this Cluster" )
696- }
697-
698- var allReplicas int32
699- for _ , sts := range stsList .Items {
700- allReplicas += ptr .Deref (sts .Spec .Replicas , 0 )
701- }
702-
703- // Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
704- if allReplicas < 3 {
705- return 0 , fmt .Errorf ("found %d desiredReplicas, but want >= 3" , allReplicas )
706- }
707-
708- if allReplicas != vectorizedCluster .Status .CurrentReplicas || allReplicas != vectorizedCluster .Status .Replicas {
709- return 0 , fmt .Errorf ("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d" , vectorizedCluster .Status .CurrentReplicas , vectorizedCluster .Status .Replicas , allReplicas )
710- }
711-
712- return allReplicas , nil
713- }),
714- decommissioning .WithFactory (internalclient .NewFactory (mgr .GetConfig (), mgr .GetClient ())),
715- decommissioning .WithFilter (func (ctx context.Context , sts * appsv1.StatefulSet ) (bool , error ) {
716- log := ctrl .LoggerFrom (ctx , "namespace" , sts .Namespace ).WithName ("StatefulSetDecomissioner.Filter" )
717- idx := slices .IndexFunc (
718- sts .OwnerReferences ,
719- func (ownerRef metav1.OwnerReference ) bool {
720- return ownerRef .APIVersion == vectorizedv1alpha1 .GroupVersion .String () && ownerRef .Kind == "Cluster"
721- })
722- if idx == - 1 {
723- return false , nil
724- }
725-
726- var vectorizedCluster vectorizedv1alpha1.Cluster
727- if err := mgr .GetClient ().Get (ctx , types.NamespacedName {
728- Name : sts .OwnerReferences [idx ].Name ,
729- Namespace : sts .Namespace ,
730- }, & vectorizedCluster ); err != nil {
731- return false , fmt .Errorf ("could not get Cluster: %w" , err )
732- }
733-
734- managedAnnotationKey := vectorizedv1alpha1 .GroupVersion .Group + "/managed"
735- if managed , exists := vectorizedCluster .Annotations [managedAnnotationKey ]; exists && managed == "false" {
736- log .V (1 ).Info ("ignoring StatefulSet of unmanaged V1 Cluster" , "sts" , sts .Name , "namespace" , sts .Namespace )
737- return false , nil
738- }
739-
740- // Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
741- // (and we can therefore not use it to check if the cluster is synced otherwise)
742- if vectorizedCluster .Status .CurrentReplicas != vectorizedCluster .Status .Replicas {
743- log .V (1 ).Info ("replicas are not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
744- return false , nil
745- }
746- if vectorizedCluster .Status .Restarting {
747- log .V (1 ).Info ("cluster is restarting" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace )
748- return false , nil
749- }
750-
751- if vectorizedCluster .Status .ObservedGeneration != vectorizedCluster .Generation {
752- log .V (1 ).Info ("generation not synced" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "generation" , vectorizedCluster .Generation , "observedGeneration" , vectorizedCluster .Status .ObservedGeneration )
753- return false , nil
754- }
755-
756- if vectorizedCluster .Status .DecommissioningNode != nil {
757- log .V (1 ).Info ("decommission in progress" , "cluster" , vectorizedCluster .Name , "namespace" , vectorizedCluster .Namespace , "node" , * vectorizedCluster .Status .DecommissioningNode )
758- return false , nil
759- }
760-
761- return true , nil
762- }),
763647 )
764648
765649 if err := d .SetupWithManager (mgr ); err != nil {
0 commit comments