Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 13 additions & 156 deletions operator/cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,17 @@ import (
"crypto/tls"
"fmt"
"path/filepath"
"slices"
"strings"
"time"

"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/client"
kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand All @@ -53,10 +45,10 @@ import (
"github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle"
adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin"
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
"github.com/redpanda-data/redpanda-operator/operator/pkg/resources"
"github.com/redpanda-data/redpanda-operator/pkg/kube"
"github.com/redpanda-data/redpanda-operator/pkg/otelutil/log"
"github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
pkgsecrets "github.com/redpanda-data/redpanda-operator/pkg/secrets"
)

Expand Down Expand Up @@ -110,7 +102,7 @@ type RunOptions struct {
restrictToRedpandaVersion string
ghostbuster bool
unbindPVCsAfter time.Duration
unbinderSelector LabelSelectorValue
unbinderSelector pflagutil.LabelSelectorValue
allowPVRebinding bool
autoDeletePVCs bool
webhookCertPath string
Expand Down Expand Up @@ -204,32 +196,6 @@ func (o *RunOptions) ControllerEnabled(controller Controller) bool {
return false
}

type LabelSelectorValue struct {
Selector labels.Selector
}

var _ pflag.Value = ((*LabelSelectorValue)(nil))

func (s *LabelSelectorValue) Set(value string) error {
if value == "" {
return nil
}
var err error
s.Selector, err = labels.Parse(value)
return err
}

func (s *LabelSelectorValue) String() string {
if s.Selector == nil {
return ""
}
return s.Selector.String()
}

func (s *LabelSelectorValue) Type() string {
return "label selector"
}

// Metrics RBAC permissions
// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create;
// +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create;
Expand Down Expand Up @@ -506,7 +472,7 @@ func Run(

if v1Controllers {
setupLog.Info("setting up vectorized controllers")
if err := setupVectorizedControllers(ctx, mgr, cloudExpander, opts); err != nil {
if err := setupVectorizedControllers(ctx, mgr, factory, cloudExpander, opts); err != nil {
return err
}
}
Expand Down Expand Up @@ -584,25 +550,10 @@ func Run(
return nil
}

type v1Fetcher struct {
client kubeClient.Client
}

func (f *v1Fetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) {
var vectorizedCluster vectorizedv1alpha1.Cluster
if err := f.client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &vectorizedCluster); err != nil {
return nil, err
}
return &vectorizedCluster, nil
}

// setupVectorizedControllers configures and registers controllers and
// runnables for the custom resources in the vectorized group, AKA the V1
// operator.
func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpander *pkgsecrets.CloudExpander, opts *RunOptions) error {
func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, factory internalclient.ClientFactory, cloudExpander *pkgsecrets.CloudExpander, opts *RunOptions) error {
log.Info(ctx, "Starting Vectorized (V1) Controllers")

configurator := resources.ConfiguratorSettings{
Expand Down Expand Up @@ -650,116 +601,22 @@ func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpa
}
}

if opts.enableGhostBrokerDecommissioner {
d := decommissioning.NewStatefulSetDecommissioner(mgr, &v1Fetcher{client: mgr.GetClient()},
if opts.enableGhostBrokerDecommissioner && opts.enableVectorizedControllers {
adapter := vectorizedDecommissionerAdapter{factory: factory, client: mgr.GetClient()}
d := decommissioning.NewStatefulSetDecommissioner(
mgr,
adapter.getAdminClient,
decommissioning.WithFilter(adapter.filter),
// Operator v1 supports multiple NodePools, and therefore multiple STS.
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
decommissioning.WithDesiredReplicasFetcher(adapter.desiredReplicas),
decommissioning.WithSyncPeriod(opts.ghostBrokerDecommissionerSyncPeriod),
decommissioning.WithCleanupPVCs(false),
// In Operator v1, decommissioning based on pod ordinal is not correct because
// it has controller code that manages decommissioning. If something else decommissions the node, it can not deal with this under all circumstances because of various reasons, eg. bercause of a protection against stale status reads of status.currentReplicas
// (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139)
// In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode)
decommissioning.WithDecommisionOnTooHighOrdinal(false),
// Operator v1 supports multiple NodePools, and therefore multiple STS.
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
decommissioning.WithDesiredReplicasFetcher(func(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) {
// Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
idx := slices.IndexFunc(
sts.OwnerReferences,
func(ownerRef metav1.OwnerReference) bool {
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
})
if idx == -1 {
return 0, nil
}

var vectorizedCluster vectorizedv1alpha1.Cluster
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
Name: sts.OwnerReferences[idx].Name,
Namespace: sts.Namespace,
}, &vectorizedCluster); err != nil {
return 0, fmt.Errorf("could not get Cluster: %w", err)
}

// We assume the cluster is fine and synced, checks have been performed in the filter already.

// Get all nodepool-sts for this Cluster
var stsList appsv1.StatefulSetList
err := mgr.GetClient().List(ctx, &stsList, &client.ListOptions{
LabelSelector: pkglabels.ForCluster(&vectorizedCluster).AsClientSelector(),
})
if err != nil {
return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err)
}

if len(stsList.Items) == 0 {
return 0, errors.New("found 0 StatefulSets for this Cluster")
}

var allReplicas int32
for _, sts := range stsList.Items {
allReplicas += ptr.Deref(sts.Spec.Replicas, 0)
}

// Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
if allReplicas < 3 {
return 0, fmt.Errorf("found %d desiredReplicas, but want >= 3", allReplicas)
}

if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas {
return 0, fmt.Errorf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas)
}

return allReplicas, nil
}),
decommissioning.WithFactory(internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())),
decommissioning.WithFilter(func(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter")
idx := slices.IndexFunc(
sts.OwnerReferences,
func(ownerRef metav1.OwnerReference) bool {
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
})
if idx == -1 {
return false, nil
}

var vectorizedCluster vectorizedv1alpha1.Cluster
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
Name: sts.OwnerReferences[idx].Name,
Namespace: sts.Namespace,
}, &vectorizedCluster); err != nil {
return false, fmt.Errorf("could not get Cluster: %w", err)
}

managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed"
if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace)
return false, nil
}

// Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
// (and we can therefore not use it to check if the cluster is synced otherwise)
if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas {
log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
return false, nil
}
if vectorizedCluster.Status.Restarting {
log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
return false, nil
}

if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation {
log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration)
return false, nil
}

if vectorizedCluster.Status.DecommissioningNode != nil {
log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode)
return false, nil
}

return true, nil
}),
)

if err := d.SetupWithManager(mgr); err != nil {
Expand Down
156 changes: 156 additions & 0 deletions operator/cmd/run/vectorized.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2025 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package run

import (
"context"
"fmt"
"slices"

"github.com/cockroachdb/errors"
"github.com/redpanda-data/common-go/rpadmin"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1"
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
)

// vectorizedDecommissionerAdapter is a helper struct that implements various methods
// of mapping StatefulSets through Vectorized Clusters to arguments for the
// StatefulSetDecommissioner.
type vectorizedDecommissionerAdapter struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bulk of this code is the same. I've moved it into it's own file because the diff would otherwise be interleaved and abysmal to read.

client client.Client
factory internalclient.ClientFactory
}

func (b *vectorizedDecommissionerAdapter) desiredReplicas(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) {
// Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
vectorizedCluster, err := b.getCluster(ctx, sts)
if err != nil {
return 0, err
}

if vectorizedCluster == nil {
return 0, nil
}

// We assume the cluster is fine and synced, checks have been performed in the filter already.

// Get all nodepool-sts for this Cluster
var stsList appsv1.StatefulSetList
if err := b.client.List(ctx, &stsList, &client.ListOptions{
LabelSelector: pkglabels.ForCluster(vectorizedCluster).AsClientSelector(),
Namespace: vectorizedCluster.Namespace,
}); err != nil {
return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err)
}

if len(stsList.Items) == 0 {
return 0, errors.New("found 0 StatefulSets for this Cluster")
}

var allReplicas int32
for _, sts := range stsList.Items {
allReplicas += ptr.Deref(sts.Spec.Replicas, 0)
}

// Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
if allReplicas < 3 {
return 0, errors.Newf("found %d desiredReplicas, but want >= 3", allReplicas)
}

if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas {
return 0, errors.Newf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas)
}

return allReplicas, nil
}

func (b *vectorizedDecommissionerAdapter) filter(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter")

vectorizedCluster, err := b.getCluster(ctx, sts)
if err != nil {
return false, err
}

if vectorizedCluster == nil {
return false, nil
}

managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed"
if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace)
return false, nil
}

// Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
// (and we can therefore not use it to check if the cluster is synced otherwise)
if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas {
log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
return false, nil
}
if vectorizedCluster.Status.Restarting {
log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
return false, nil
}

if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation {
log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration)
return false, nil
}

if vectorizedCluster.Status.DecommissioningNode != nil {
log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode)
return false, nil
}

return true, nil
}

func (b *vectorizedDecommissionerAdapter) getAdminClient(ctx context.Context, sts *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) {
cluster, err := b.getCluster(ctx, sts)
if err != nil {
return nil, err
}

if cluster == nil {
return nil, errors.Newf("failed to resolve %s/%s to vectorized cluster", sts.Namespace, sts.Name)
}

return b.factory.RedpandaAdminClient(ctx, cluster)
}

func (b *vectorizedDecommissionerAdapter) getCluster(ctx context.Context, sts *appsv1.StatefulSet) (*vectorizedv1alpha1.Cluster, error) {
idx := slices.IndexFunc(
sts.OwnerReferences,
func(ownerRef metav1.OwnerReference) bool {
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
})
if idx == -1 {
return nil, nil
}

var vectorizedCluster vectorizedv1alpha1.Cluster
if err := b.client.Get(ctx, types.NamespacedName{
Name: sts.OwnerReferences[idx].Name,
Namespace: sts.Namespace,
}, &vectorizedCluster); err != nil {
return nil, errors.Wrap(err, "could not get Cluster")
}

return &vectorizedCluster, nil
}
Loading