Skip to content

Commit 12f4fb2

Browse files
committed
operator: various sidecar fixes
Prior to this commit the operator sidecar's decommissioner and pvcunbinder controllers did not work. This was due to: - RBAC issues, the sidecar did not correctly scope itself to a single namespace. - Incorrect label selectors hidden within the controllers in question. Additionally, the statefulset decommissioner's sole test case has been disabled for quite sometime. There's been zero test coverage of this functionality. This commit: - Restores the decommissioner's tests to a working state - Strips out the "fetcher" to reduce duplication and remove reliance on fetching live helm values. - Replaces baked in filtering with a label selector argument that will be constructed by the helm chart. A follow up commit with chart changes and acceptance tests will be submitted. It's been made separate to ease the process of backporting to the v2.x.x branches.
1 parent 2ebaa24 commit 12f4fb2

File tree

14 files changed

+388
-643
lines changed

14 files changed

+388
-643
lines changed

operator/cmd/run/run.go

Lines changed: 13 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,17 @@ import (
1616
"crypto/tls"
1717
"fmt"
1818
"path/filepath"
19-
"slices"
2019
"strings"
2120
"time"
2221

2322
"github.com/cockroachdb/errors"
2423
"github.com/spf13/cobra"
25-
"github.com/spf13/pflag"
26-
appsv1 "k8s.io/api/apps/v1"
2724
corev1 "k8s.io/api/core/v1"
28-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29-
"k8s.io/apimachinery/pkg/labels"
30-
"k8s.io/apimachinery/pkg/types"
3125
_ "k8s.io/client-go/plugin/pkg/client/auth"
32-
"k8s.io/utils/ptr"
3326
ctrl "sigs.k8s.io/controller-runtime"
3427
"sigs.k8s.io/controller-runtime/pkg/cache"
3528
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
3629
"sigs.k8s.io/controller-runtime/pkg/client"
37-
kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
3830
"sigs.k8s.io/controller-runtime/pkg/healthz"
3931
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4032
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -53,10 +45,10 @@ import (
5345
"github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle"
5446
adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin"
5547
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
56-
pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
5748
"github.com/redpanda-data/redpanda-operator/operator/pkg/resources"
5849
"github.com/redpanda-data/redpanda-operator/pkg/kube"
5950
"github.com/redpanda-data/redpanda-operator/pkg/otelutil/log"
51+
"github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
6052
pkgsecrets "github.com/redpanda-data/redpanda-operator/pkg/secrets"
6153
)
6254

@@ -110,7 +102,7 @@ type RunOptions struct {
110102
restrictToRedpandaVersion string
111103
ghostbuster bool
112104
unbindPVCsAfter time.Duration
113-
unbinderSelector LabelSelectorValue
105+
unbinderSelector pflagutil.LabelSelectorValue
114106
allowPVRebinding bool
115107
autoDeletePVCs bool
116108
webhookCertPath string
@@ -204,32 +196,6 @@ func (o *RunOptions) ControllerEnabled(controller Controller) bool {
204196
return false
205197
}
206198

207-
type LabelSelectorValue struct {
208-
Selector labels.Selector
209-
}
210-
211-
var _ pflag.Value = ((*LabelSelectorValue)(nil))
212-
213-
func (s *LabelSelectorValue) Set(value string) error {
214-
if value == "" {
215-
return nil
216-
}
217-
var err error
218-
s.Selector, err = labels.Parse(value)
219-
return err
220-
}
221-
222-
func (s *LabelSelectorValue) String() string {
223-
if s.Selector == nil {
224-
return ""
225-
}
226-
return s.Selector.String()
227-
}
228-
229-
func (s *LabelSelectorValue) Type() string {
230-
return "label selector"
231-
}
232-
233199
// Metrics RBAC permissions
234200
// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create;
235201
// +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create;
@@ -506,7 +472,7 @@ func Run(
506472

507473
if v1Controllers {
508474
setupLog.Info("setting up vectorized controllers")
509-
if err := setupVectorizedControllers(ctx, mgr, cloudExpander, opts); err != nil {
475+
if err := setupVectorizedControllers(ctx, mgr, factory, cloudExpander, opts); err != nil {
510476
return err
511477
}
512478
}
@@ -584,25 +550,10 @@ func Run(
584550
return nil
585551
}
586552

587-
type v1Fetcher struct {
588-
client kubeClient.Client
589-
}
590-
591-
func (f *v1Fetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) {
592-
var vectorizedCluster vectorizedv1alpha1.Cluster
593-
if err := f.client.Get(ctx, types.NamespacedName{
594-
Name: name,
595-
Namespace: namespace,
596-
}, &vectorizedCluster); err != nil {
597-
return nil, err
598-
}
599-
return &vectorizedCluster, nil
600-
}
601-
602553
// setupVectorizedControllers configures and registers controllers and
603554
// runnables for the custom resources in the vectorized group, AKA the V1
604555
// operator.
605-
func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpander *pkgsecrets.CloudExpander, opts *RunOptions) error {
556+
func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, factory internalclient.ClientFactory, cloudExpander *pkgsecrets.CloudExpander, opts *RunOptions) error {
606557
log.Info(ctx, "Starting Vectorized (V1) Controllers")
607558

608559
configurator := resources.ConfiguratorSettings{
@@ -650,116 +601,22 @@ func setupVectorizedControllers(ctx context.Context, mgr ctrl.Manager, cloudExpa
650601
}
651602
}
652603

653-
if opts.enableGhostBrokerDecommissioner {
654-
d := decommissioning.NewStatefulSetDecommissioner(mgr, &v1Fetcher{client: mgr.GetClient()},
604+
if opts.enableGhostBrokerDecommissioner && opts.enableVectorizedControllers {
605+
adapter := vectorizedDecommissionerAdapter{factory: factory, client: mgr.GetClient()}
606+
d := decommissioning.NewStatefulSetDecommissioner(
607+
mgr,
608+
adapter.getAdminClient,
609+
decommissioning.WithFilter(adapter.filter),
610+
// Operator v1 supports multiple NodePools, and therefore multiple STS.
611+
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
612+
decommissioning.WithDesiredReplicasFetcher(adapter.desiredReplicas),
655613
decommissioning.WithSyncPeriod(opts.ghostBrokerDecommissionerSyncPeriod),
656614
decommissioning.WithCleanupPVCs(false),
657615
// In Operator v1, decommissioning based on pod ordinal is not correct because
658616
// it has controller code that manages decommissioning. If something else decommissions the node, it can not deal with this under all circumstances because of various reasons, eg. bercause of a protection against stale status reads of status.currentReplicas
659617
// (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139)
660618
// In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode)
661619
decommissioning.WithDecommisionOnTooHighOrdinal(false),
662-
// Operator v1 supports multiple NodePools, and therefore multiple STS.
663-
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
664-
decommissioning.WithDesiredReplicasFetcher(func(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) {
665-
// Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
666-
idx := slices.IndexFunc(
667-
sts.OwnerReferences,
668-
func(ownerRef metav1.OwnerReference) bool {
669-
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
670-
})
671-
if idx == -1 {
672-
return 0, nil
673-
}
674-
675-
var vectorizedCluster vectorizedv1alpha1.Cluster
676-
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
677-
Name: sts.OwnerReferences[idx].Name,
678-
Namespace: sts.Namespace,
679-
}, &vectorizedCluster); err != nil {
680-
return 0, fmt.Errorf("could not get Cluster: %w", err)
681-
}
682-
683-
// We assume the cluster is fine and synced, checks have been performed in the filter already.
684-
685-
// Get all nodepool-sts for this Cluster
686-
var stsList appsv1.StatefulSetList
687-
err := mgr.GetClient().List(ctx, &stsList, &client.ListOptions{
688-
LabelSelector: pkglabels.ForCluster(&vectorizedCluster).AsClientSelector(),
689-
})
690-
if err != nil {
691-
return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err)
692-
}
693-
694-
if len(stsList.Items) == 0 {
695-
return 0, errors.New("found 0 StatefulSets for this Cluster")
696-
}
697-
698-
var allReplicas int32
699-
for _, sts := range stsList.Items {
700-
allReplicas += ptr.Deref(sts.Spec.Replicas, 0)
701-
}
702-
703-
// Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
704-
if allReplicas < 3 {
705-
return 0, fmt.Errorf("found %d desiredReplicas, but want >= 3", allReplicas)
706-
}
707-
708-
if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas {
709-
return 0, fmt.Errorf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas)
710-
}
711-
712-
return allReplicas, nil
713-
}),
714-
decommissioning.WithFactory(internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())),
715-
decommissioning.WithFilter(func(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
716-
log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter")
717-
idx := slices.IndexFunc(
718-
sts.OwnerReferences,
719-
func(ownerRef metav1.OwnerReference) bool {
720-
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
721-
})
722-
if idx == -1 {
723-
return false, nil
724-
}
725-
726-
var vectorizedCluster vectorizedv1alpha1.Cluster
727-
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
728-
Name: sts.OwnerReferences[idx].Name,
729-
Namespace: sts.Namespace,
730-
}, &vectorizedCluster); err != nil {
731-
return false, fmt.Errorf("could not get Cluster: %w", err)
732-
}
733-
734-
managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed"
735-
if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
736-
log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace)
737-
return false, nil
738-
}
739-
740-
// Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
741-
// (and we can therefore not use it to check if the cluster is synced otherwise)
742-
if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas {
743-
log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
744-
return false, nil
745-
}
746-
if vectorizedCluster.Status.Restarting {
747-
log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
748-
return false, nil
749-
}
750-
751-
if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation {
752-
log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration)
753-
return false, nil
754-
}
755-
756-
if vectorizedCluster.Status.DecommissioningNode != nil {
757-
log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode)
758-
return false, nil
759-
}
760-
761-
return true, nil
762-
}),
763620
)
764621

765622
if err := d.SetupWithManager(mgr); err != nil {

operator/cmd/run/vectorized.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
package run
11+
12+
import (
13+
"context"
14+
"fmt"
15+
"slices"
16+
17+
"github.com/cockroachdb/errors"
18+
"github.com/redpanda-data/common-go/rpadmin"
19+
appsv1 "k8s.io/api/apps/v1"
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21+
"k8s.io/apimachinery/pkg/types"
22+
"k8s.io/utils/ptr"
23+
ctrl "sigs.k8s.io/controller-runtime"
24+
"sigs.k8s.io/controller-runtime/pkg/client"
25+
26+
vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1"
27+
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
28+
pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
29+
)
30+
31+
// vectorizedDecommissionerAdapter is a helper struct that implements various methods
32+
// of mapping StatefulSets through Vectorized Clusters to arguments for the
33+
// StatefulSetDecommissioner.
34+
type vectorizedDecommissionerAdapter struct {
35+
client client.Client
36+
factory internalclient.ClientFactory
37+
}
38+
39+
func (b *vectorizedDecommissionerAdapter) desiredReplicas(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) {
40+
// Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
41+
vectorizedCluster, err := b.getCluster(ctx, sts)
42+
if err != nil {
43+
return 0, err
44+
}
45+
46+
if vectorizedCluster == nil {
47+
return 0, nil
48+
}
49+
50+
// We assume the cluster is fine and synced, checks have been performed in the filter already.
51+
52+
// Get all nodepool-sts for this Cluster
53+
var stsList appsv1.StatefulSetList
54+
if err := b.client.List(ctx, &stsList, &client.ListOptions{
55+
LabelSelector: pkglabels.ForCluster(vectorizedCluster).AsClientSelector(),
56+
Namespace: vectorizedCluster.Namespace,
57+
}); err != nil {
58+
return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err)
59+
}
60+
61+
if len(stsList.Items) == 0 {
62+
return 0, errors.New("found 0 StatefulSets for this Cluster")
63+
}
64+
65+
var allReplicas int32
66+
for _, sts := range stsList.Items {
67+
allReplicas += ptr.Deref(sts.Spec.Replicas, 0)
68+
}
69+
70+
// Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
71+
if allReplicas < 3 {
72+
return 0, errors.Newf("found %d desiredReplicas, but want >= 3", allReplicas)
73+
}
74+
75+
if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas {
76+
return 0, errors.Newf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas)
77+
}
78+
79+
return allReplicas, nil
80+
}
81+
82+
func (b *vectorizedDecommissionerAdapter) filter(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
83+
log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter")
84+
85+
vectorizedCluster, err := b.getCluster(ctx, sts)
86+
if err != nil {
87+
return false, err
88+
}
89+
90+
if vectorizedCluster == nil {
91+
return false, nil
92+
}
93+
94+
managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed"
95+
if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
96+
log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace)
97+
return false, nil
98+
}
99+
100+
// Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
101+
// (and we can therefore not use it to check if the cluster is synced otherwise)
102+
if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas {
103+
log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
104+
return false, nil
105+
}
106+
if vectorizedCluster.Status.Restarting {
107+
log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
108+
return false, nil
109+
}
110+
111+
if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation {
112+
log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration)
113+
return false, nil
114+
}
115+
116+
if vectorizedCluster.Status.DecommissioningNode != nil {
117+
log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode)
118+
return false, nil
119+
}
120+
121+
return true, nil
122+
}
123+
124+
func (b *vectorizedDecommissionerAdapter) getAdminClient(ctx context.Context, sts *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) {
125+
cluster, err := b.getCluster(ctx, sts)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
if cluster == nil {
131+
return nil, errors.Newf("failed to resolve %s/%s to vectorized cluster", sts.Namespace, sts.Name)
132+
}
133+
134+
return b.factory.RedpandaAdminClient(ctx, cluster)
135+
}
136+
137+
func (b *vectorizedDecommissionerAdapter) getCluster(ctx context.Context, sts *appsv1.StatefulSet) (*vectorizedv1alpha1.Cluster, error) {
138+
idx := slices.IndexFunc(
139+
sts.OwnerReferences,
140+
func(ownerRef metav1.OwnerReference) bool {
141+
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
142+
})
143+
if idx == -1 {
144+
return nil, nil
145+
}
146+
147+
var vectorizedCluster vectorizedv1alpha1.Cluster
148+
if err := b.client.Get(ctx, types.NamespacedName{
149+
Name: sts.OwnerReferences[idx].Name,
150+
Namespace: sts.Namespace,
151+
}, &vectorizedCluster); err != nil {
152+
return nil, errors.Wrap(err, "could not get Cluster")
153+
}
154+
155+
return &vectorizedCluster, nil
156+
}

0 commit comments

Comments
 (0)