Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ tasks:
vars:
CLI_ARGS: '--load {{.CLI_ARGS}}'

build:charts:
desc: "Run helm dep build for all charts"
cmds:
- helm repo add redpanda https://charts.redpanda.com
- helm dep build ./charts/redpanda

test:unit:
desc: "Run all unit tests (~5m)"
vars:
Expand All @@ -161,6 +167,7 @@ tasks:
# The operator image is required to test the configurator and sidecar.
# In integration tests, the operator itself will be run from the go test process.
- build:image
- build:charts
cmds:
- task: charts:kind-cluster
- kind load docker-image localhost/redpanda-operator:dev
Expand All @@ -174,6 +181,7 @@ tasks:
desc: "Run all acceptance tests (~90m)"
deps:
- build:image
- build:charts
vars:
GO_TEST_RUNNER: '{{default "go test" .GO_TEST_RUNNER}}'
CLI_ARGS: '{{.CLI_ARGS}} -tags=acceptance -run "^TestAcceptance" -timeout 20m -v'
Expand Down
164 changes: 11 additions & 153 deletions operator/cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,19 @@ import (
"fmt"
"os"
"path/filepath"
"slices"
"strings"
"time"

"github.com/cockroachdb/errors"
fluxclient "github.com/fluxcd/pkg/runtime/client"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
helmkube "helm.sh/helm/v3/pkg/kube"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/client"
kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand All @@ -58,10 +50,10 @@ import (
adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin"
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
consolepkg "github.com/redpanda-data/redpanda-operator/operator/pkg/console"
pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
"github.com/redpanda-data/redpanda-operator/operator/pkg/resources"
pkgsecrets "github.com/redpanda-data/redpanda-operator/operator/pkg/secrets"
redpandawebhooks "github.com/redpanda-data/redpanda-operator/operator/webhooks/redpanda"
"github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
)

type RedpandaController string
Expand Down Expand Up @@ -99,32 +91,6 @@ var (
}
)

type LabelSelectorValue struct {
Selector labels.Selector
}

var _ pflag.Value = ((*LabelSelectorValue)(nil))

func (s *LabelSelectorValue) Set(value string) error {
if value == "" {
return nil
}
var err error
s.Selector, err = labels.Parse(value)
return err
}

func (s *LabelSelectorValue) String() string {
if s.Selector == nil {
return ""
}
return s.Selector.String()
}

func (s *LabelSelectorValue) Type() string {
return "label selector"
}

// Metrics RBAC permissions
// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create;
// +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create;
Expand Down Expand Up @@ -157,7 +123,7 @@ func Command() *cobra.Command {
enableHelmControllers bool
ghostbuster bool
unbindPVCsAfter time.Duration
unbinderSelector LabelSelectorValue
unbinderSelector pflagutil.LabelSelectorValue
allowPVRebinding bool
autoDeletePVCs bool
forceDefluxedMode bool
Expand Down Expand Up @@ -313,21 +279,6 @@ func Command() *cobra.Command {
return cmd
}

type v1Fetcher struct {
client kubeClient.Client
}

func (f *v1Fetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) {
var vectorizedCluster vectorizedv1alpha1.Cluster
if err := f.client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &vectorizedCluster); err != nil {
return nil, err
}
return &vectorizedCluster, nil
}

//nolint:funlen,gocyclo // length looks good
func Run(
ctx context.Context,
Expand Down Expand Up @@ -723,115 +674,22 @@ func Run(
}

if enableGhostBrokerDecommissioner {
d := decommissioning.NewStatefulSetDecommissioner(mgr, &v1Fetcher{client: mgr.GetClient()},
factory := internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())
adapter := vectorizedDecommissionerAdapter{factory: factory, client: mgr.GetClient()}
d := decommissioning.NewStatefulSetDecommissioner(
mgr,
adapter.getAdminClient,
decommissioning.WithFilter(adapter.filter),
// Operator v1 supports multiple NodePools, and therefore multiple STS.
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
decommissioning.WithDesiredReplicasFetcher(adapter.desiredReplicas),
decommissioning.WithSyncPeriod(ghostBrokerDecommissionerSyncPeriod),
decommissioning.WithCleanupPVCs(false),
// In Operator v1, decommissioning based on pod ordinal is not correct because
// it has controller code that manages decommissioning. If something else decommissions the node, it can not deal with this under all circumstances because of various reasons, eg. bercause of a protection against stale status reads of status.currentReplicas
// (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139)
// In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode)
decommissioning.WithDecommisionOnTooHighOrdinal(false),
// Operator v1 supports multiple NodePools, and therefore multiple STS.
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
decommissioning.WithDesiredReplicasFetcher(func(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) {
// Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
idx := slices.IndexFunc(
sts.OwnerReferences,
func(ownerRef metav1.OwnerReference) bool {
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
})
if idx == -1 {
return 0, nil
}

var vectorizedCluster vectorizedv1alpha1.Cluster
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
Name: sts.OwnerReferences[idx].Name,
Namespace: sts.Namespace,
}, &vectorizedCluster); err != nil {
return 0, fmt.Errorf("could not get Cluster: %w", err)
}

// We assume the cluster is fine and synced, checks have been performed in the filter already.

// Get all nodepool-sts for this Cluster
var stsList appsv1.StatefulSetList
err := mgr.GetClient().List(ctx, &stsList, &client.ListOptions{
LabelSelector: pkglabels.ForCluster(&vectorizedCluster).AsClientSelector(),
})
if err != nil {
return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err)
}

if len(stsList.Items) == 0 {
return 0, errors.New("found 0 StatefulSets for this Cluster")
}

var allReplicas int32
for _, sts := range stsList.Items {
allReplicas += ptr.Deref(sts.Spec.Replicas, 0)
}

// Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
if allReplicas < 3 {
return 0, fmt.Errorf("found %d desiredReplicas, but want >= 3", allReplicas)
}

if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas {
return 0, fmt.Errorf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas)
}

return allReplicas, nil
}),
decommissioning.WithFactory(internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())),
decommissioning.WithFilter(func(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter")
idx := slices.IndexFunc(
sts.OwnerReferences,
func(ownerRef metav1.OwnerReference) bool {
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
})
if idx == -1 {
return false, nil
}

var vectorizedCluster vectorizedv1alpha1.Cluster
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
Name: sts.OwnerReferences[idx].Name,
Namespace: sts.Namespace,
}, &vectorizedCluster); err != nil {
return false, fmt.Errorf("could not get Cluster: %w", err)
}

managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed"
if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace)
return false, nil
}

// Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
// (and we can therefore not use it to check if the cluster is synced otherwise)
if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas {
log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
return false, nil
}
if vectorizedCluster.Status.Restarting {
log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
return false, nil
}

if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation {
log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration)
return false, nil
}

if vectorizedCluster.Status.DecommissioningNode != nil {
log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode)
return false, nil
}

return true, nil
}),
)
if err := d.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner")
Expand Down
Loading