Skip to content

Commit bda9c64

Browse files
committed
operator: various sidecar fixes
Prior to this commit the operator sidecar's decommissioner and pvcunbinder controllers did not work. This was due to: - RBAC issues, the sidecar did not correctly scope itself to a single namespace. - Incorrect label selectors hidden within the controllers in question. Additionally, the statefulset decommissioner's sole test case has been disabled for quite sometime. There's been zero test coverage of this functionality. This commit: - Restores the decommissioner's tests to a working state - Strips out the "fetcher" to reduce duplication and remove reliance on fetching live helm values. - Replaces baked in filtering with a label selector argument that will be constructed by the helm chart. A follow up commit with chart changes and acceptance tests will be submitted. It's been made separate to ease the process of backporting to the v2.x.x branches. (cherry picked from commit 03dd394) # Conflicts: # operator/cmd/run/run.go # operator/go.mod # pkg/go.mod
1 parent b61a93a commit bda9c64

File tree

14 files changed

+695
-607
lines changed

14 files changed

+695
-607
lines changed

operator/cmd/run/run.go

Lines changed: 305 additions & 122 deletions
Large diffs are not rendered by default.

operator/cmd/run/vectorized.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
package run
11+
12+
import (
13+
"context"
14+
"fmt"
15+
"slices"
16+
17+
"github.com/cockroachdb/errors"
18+
"github.com/redpanda-data/common-go/rpadmin"
19+
appsv1 "k8s.io/api/apps/v1"
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21+
"k8s.io/apimachinery/pkg/types"
22+
"k8s.io/utils/ptr"
23+
ctrl "sigs.k8s.io/controller-runtime"
24+
"sigs.k8s.io/controller-runtime/pkg/client"
25+
26+
vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1"
27+
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
28+
pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
29+
)
30+
31+
// vectorizedDecommissionerAdapter is a helper struct that implements various methods
32+
// of mapping StatefulSets through Vectorized Clusters to arguments for the
33+
// StatefulSetDecommissioner.
34+
type vectorizedDecommissionerAdapter struct {
35+
client client.Client
36+
factory internalclient.ClientFactory
37+
}
38+
39+
func (b *vectorizedDecommissionerAdapter) desiredReplicas(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) {
40+
// Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
41+
vectorizedCluster, err := b.getCluster(ctx, sts)
42+
if err != nil {
43+
return 0, err
44+
}
45+
46+
if vectorizedCluster == nil {
47+
return 0, nil
48+
}
49+
50+
// We assume the cluster is fine and synced, checks have been performed in the filter already.
51+
52+
// Get all nodepool-sts for this Cluster
53+
var stsList appsv1.StatefulSetList
54+
if err := b.client.List(ctx, &stsList, &client.ListOptions{
55+
LabelSelector: pkglabels.ForCluster(vectorizedCluster).AsClientSelector(),
56+
Namespace: vectorizedCluster.Namespace,
57+
}); err != nil {
58+
return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err)
59+
}
60+
61+
if len(stsList.Items) == 0 {
62+
return 0, errors.New("found 0 StatefulSets for this Cluster")
63+
}
64+
65+
var allReplicas int32
66+
for _, sts := range stsList.Items {
67+
allReplicas += ptr.Deref(sts.Spec.Replicas, 0)
68+
}
69+
70+
// Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
71+
if allReplicas < 3 {
72+
return 0, errors.Newf("found %d desiredReplicas, but want >= 3", allReplicas)
73+
}
74+
75+
if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas {
76+
return 0, errors.Newf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas)
77+
}
78+
79+
return allReplicas, nil
80+
}
81+
82+
func (b *vectorizedDecommissionerAdapter) filter(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
83+
log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter")
84+
85+
vectorizedCluster, err := b.getCluster(ctx, sts)
86+
if err != nil {
87+
return false, err
88+
}
89+
90+
if vectorizedCluster == nil {
91+
return false, nil
92+
}
93+
94+
managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed"
95+
if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
96+
log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace)
97+
return false, nil
98+
}
99+
100+
// Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
101+
// (and we can therefore not use it to check if the cluster is synced otherwise)
102+
if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas {
103+
log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
104+
return false, nil
105+
}
106+
if vectorizedCluster.Status.Restarting {
107+
log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
108+
return false, nil
109+
}
110+
111+
if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation {
112+
log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration)
113+
return false, nil
114+
}
115+
116+
if vectorizedCluster.Status.DecommissioningNode != nil {
117+
log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode)
118+
return false, nil
119+
}
120+
121+
return true, nil
122+
}
123+
124+
func (b *vectorizedDecommissionerAdapter) getAdminClient(ctx context.Context, sts *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) {
125+
cluster, err := b.getCluster(ctx, sts)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
if cluster == nil {
131+
return nil, errors.Newf("failed to resolve %s/%s to vectorized cluster", sts.Namespace, sts.Name)
132+
}
133+
134+
return b.factory.RedpandaAdminClient(ctx, cluster)
135+
}
136+
137+
func (b *vectorizedDecommissionerAdapter) getCluster(ctx context.Context, sts *appsv1.StatefulSet) (*vectorizedv1alpha1.Cluster, error) {
138+
idx := slices.IndexFunc(
139+
sts.OwnerReferences,
140+
func(ownerRef metav1.OwnerReference) bool {
141+
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
142+
})
143+
if idx == -1 {
144+
return nil, nil
145+
}
146+
147+
var vectorizedCluster vectorizedv1alpha1.Cluster
148+
if err := b.client.Get(ctx, types.NamespacedName{
149+
Name: sts.OwnerReferences[idx].Name,
150+
Namespace: sts.Namespace,
151+
}, &vectorizedCluster); err != nil {
152+
return nil, errors.Wrap(err, "could not get Cluster")
153+
}
154+
155+
return &vectorizedCluster, nil
156+
}

operator/cmd/sidecar/sidecar.go

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,26 @@ import (
1515
"time"
1616

1717
"github.com/cockroachdb/errors"
18+
"github.com/redpanda-data/common-go/rpadmin"
19+
rpkadminapi "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
20+
rpkconfig "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
21+
"github.com/spf13/afero"
1822
"github.com/spf13/cobra"
23+
appsv1 "k8s.io/api/apps/v1"
24+
"k8s.io/apimachinery/pkg/labels"
1925
"k8s.io/apimachinery/pkg/runtime"
2026
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2127
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2228
ctrl "sigs.k8s.io/controller-runtime"
29+
"sigs.k8s.io/controller-runtime/pkg/cache"
2330
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
2431

2532
"github.com/redpanda-data/redpanda-operator/operator/internal/configwatcher"
2633
"github.com/redpanda-data/redpanda-operator/operator/internal/controller/decommissioning"
2734
"github.com/redpanda-data/redpanda-operator/operator/internal/controller/pvcunbinder"
2835
"github.com/redpanda-data/redpanda-operator/operator/internal/probes"
2936
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
37+
"github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
3038
)
3139

3240
// +kubebuilder:rbac:groups=coordination.k8s.io,namespace=default,resources=leases,verbs=get;list;watch;create;update;patch;delete
@@ -56,6 +64,7 @@ func Command() *cobra.Command {
5664
brokerProbeBrokerURL string
5765
runUnbinder bool
5866
unbinderTimeout time.Duration
67+
selector pflagutil.LabelSelectorValue
5968
panicAfter time.Duration
6069
)
6170

@@ -86,6 +95,7 @@ func Command() *cobra.Command {
8695
brokerProbeBrokerURL,
8796
runUnbinder,
8897
unbinderTimeout,
98+
selector.Selector,
8999
panicAfter,
90100
)
91101
},
@@ -102,6 +112,7 @@ func Command() *cobra.Command {
102112
// cluster flags
103113
cmd.Flags().StringVar(&clusterNamespace, "redpanda-cluster-namespace", "", "The namespace of the cluster that this sidecar manages.")
104114
cmd.Flags().StringVar(&clusterName, "redpanda-cluster-name", "", "The name of the cluster that this sidecar manages.")
115+
cmd.Flags().Var(&selector, "selector", "Kubernetes label selector that will filter objects to be considered by the all controllers run by the sidecar.")
105116

106117
// decommission flags
107118
cmd.Flags().BoolVar(&runDecommissioner, "run-decommissioner", false, "Specifies if the sidecar should run the broker decommissioner.")
@@ -153,10 +164,14 @@ func Run(
153164
brokerProbeBrokerURL string,
154165
runUnbinder bool,
155166
unbinderTimeout time.Duration,
167+
selector labels.Selector,
156168
panicAfter time.Duration,
157169
) error {
158170
setupLog := ctrl.LoggerFrom(ctx).WithName("setup")
159171

172+
// Required arguments check, in sidecar mode these MUST be specified to
173+
// ensure the sidecar only affects the helm deployment that's deployed it.
174+
160175
if clusterNamespace == "" {
161176
err := errors.New("must specify a cluster-namespace parameter")
162177
setupLog.Error(err, "no cluster namespace provided")
@@ -169,6 +184,20 @@ func Run(
169184
return err
170185
}
171186

187+
if selector == nil || selector.Empty() {
188+
// Use a sensible default that's about as correct than the previous
189+
// hard coded values. Hardcoding of name=redpanda is incorrect when
190+
// nameoverride is used.
191+
var err error
192+
selector, err = labels.Parse(fmt.Sprintf(
193+
"apps.kubernetes.io/component,app.kubernetes.io/name=redpanda,app.kubernetes.io/instance=%s",
194+
clusterName,
195+
))
196+
if err != nil {
197+
panic(err)
198+
}
199+
}
200+
172201
scheme := runtime.NewScheme()
173202

174203
for _, fn := range schemes {
@@ -183,35 +212,54 @@ func Run(
183212
LeaderElectionID: clusterName + "." + clusterNamespace + ".redpanda",
184213
Scheme: scheme,
185214
LeaderElectionNamespace: clusterNamespace,
215+
Cache: cache.Options{
216+
// Only watch the specified namespace, we don't have permissions for watch at the ClusterScope.
217+
DefaultNamespaces: map[string]cache.Config{
218+
clusterNamespace: {},
219+
},
220+
},
186221
})
187222
if err != nil {
188223
setupLog.Error(err, "unable to initialize manager")
189224
return err
190225
}
191226

192227
if runDecommissioner {
193-
fetcher := decommissioning.NewChainedFetcher(
194-
// prefer RPK profile first and then move on to fetch from helm values
195-
decommissioning.NewRPKProfileFetcher(redpandaYAMLPath),
196-
decommissioning.NewHelmFetcher(mgr),
197-
)
228+
setupLog.Info("broker decommissioner enabled", "namespace", clusterNamespace, "cluster", clusterName, "selector", selector)
198229

199-
if err := decommissioning.NewStatefulSetDecommissioner(mgr, fetcher, []decommissioning.Option{
200-
decommissioning.WithFilter(decommissioning.FilterStatefulSetOwner(clusterNamespace, clusterName)),
230+
fs := afero.NewOsFs()
231+
232+
params := rpkconfig.Params{ConfigFlag: redpandaYAMLPath}
233+
234+
config, err := params.Load(afero.NewOsFs())
235+
if err != nil {
236+
return err
237+
}
238+
239+
if err := decommissioning.NewStatefulSetDecommissioner(
240+
mgr,
241+
func(ctx context.Context, _ *appsv1.StatefulSet) (*rpadmin.AdminAPI, error) {
242+
// Always use the config that's loaded from redpanda.yaml, in
243+
// sidecar mode no other STS's should be watched.
244+
return rpkadminapi.NewClient(ctx, fs, config.VirtualProfile())
245+
},
246+
decommissioning.WithSelector(selector),
201247
decommissioning.WithRequeueTimeout(decommissionRequeueTimeout),
202248
decommissioning.WithDelayedCacheInterval(decommissionVoteInterval),
203249
decommissioning.WithDelayedCacheMaxCount(decommissionMaxVoteCount),
204-
}...).SetupWithManager(mgr); err != nil {
250+
).SetupWithManager(mgr); err != nil {
205251
setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner")
206252
return err
207253
}
208254
}
209255

210256
if runUnbinder {
257+
setupLog.Info("PVC unbinder enabled", "namespace", clusterNamespace, "selector", selector)
258+
211259
if err := (&pvcunbinder.Controller{
212-
Client: mgr.GetClient(),
213-
Timeout: unbinderTimeout,
214-
Filter: pvcunbinder.FilterPodOwner(clusterNamespace, clusterName),
260+
Client: mgr.GetClient(),
261+
Timeout: unbinderTimeout,
262+
Selector: selector,
215263
}).SetupWithManager(mgr); err != nil {
216264
setupLog.Error(err, "unable to create controller", "controller", "PVCUnbinder")
217265
return err

operator/go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,14 @@ require (
4242
github.com/redpanda-data/redpanda/src/go/rpk v0.0.0-20240827155712-244863ea0ae8
4343
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
4444
github.com/scalalang2/golang-fifo v1.0.2
45+
<<<<<<< HEAD
4546
github.com/spf13/afero v1.11.0
4647
github.com/spf13/cobra v1.8.1
4748
github.com/spf13/pflag v1.0.5
49+
=======
50+
github.com/spf13/afero v1.12.0
51+
github.com/spf13/cobra v1.9.1
52+
>>>>>>> 03dd3942 (operator: various sidecar fixes)
4853
github.com/stretchr/testify v1.10.0
4954
github.com/testcontainers/testcontainers-go v0.33.0
5055
github.com/testcontainers/testcontainers-go/modules/redpanda v0.32.0
@@ -384,10 +389,15 @@ require (
384389
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 // indirect
385390
github.com/sourcegraph/conc v0.3.0 // indirect
386391
github.com/spf13/cast v1.7.0 // indirect
392+
<<<<<<< HEAD
387393
github.com/spf13/viper v1.18.1 // indirect
388394
github.com/stoewer/go-strcase v1.3.0 // indirect
389395
github.com/subosito/gotenv v1.6.0 // indirect
390396
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
397+
=======
398+
github.com/spf13/pflag v1.0.7 // indirect
399+
github.com/stoewer/go-strcase v1.3.1 // indirect
400+
>>>>>>> 03dd3942 (operator: various sidecar fixes)
391401
github.com/texttheater/golang-levenshtein v1.0.1 // indirect
392402
github.com/thales-e-security/pool v0.0.2 // indirect
393403
github.com/theupdateframework/go-tuf v0.7.0 // indirect

operator/internal/controller/decommissioning/chained_fetcher.go

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)