Skip to content

Commit bc23238

Browse files
committed
operator: various sidecar fixes
Prior to this commit the operator sidecar's decommissioner and pvcunbinder controllers did not work. This was due to: - RBAC issues, the sidecar did not correctly scope itself to a single namespace. - Incorrect label selectors hidden within the controllers in question. Additionally, the statefulset decommissioner's sole test case has been disabled for quite sometime. There's been zero test coverage of this functionality. This commit: - Restores the decommissioner's tests to a working state - Strips out the "fetcher" to reduce duplication and remove reliance on fetching live helm values. - Replaces baked in filtering with a label selector argument that will be constructed by the helm chart. A follow up commit with chart changes and acceptance tests will be submitted. It's been made separate to ease the process of backporting to the v2.x.x branches. (cherry picked from commit 03dd394) # Conflicts: # operator/cmd/run/run.go # operator/go.mod # pkg/go.mod
1 parent 0d1eed3 commit bc23238

File tree

16 files changed

+394
-639
lines changed

16 files changed

+394
-639
lines changed

Taskfile.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ tasks:
161161
vars:
162162
CLI_ARGS: '--load {{.CLI_ARGS}}'
163163

164+
build:charts:
165+
desc: "Run helm dep build for all charts"
166+
cmds:
167+
- helm dep build ./charts/redpanda
168+
164169
test:unit:
165170
desc: "Run all unit tests (~5m)"
166171
vars:
@@ -174,6 +179,9 @@ tasks:
174179
test:integration:
175180
desc: "Run all integration tests (~90m)"
176181
deps:
182+
# Helm deps for redpanda need to be built for anything attempting to run it
183+
# from source.
184+
- task: build:charts
177185
# The operator image is required to test the configurator and sidecar.
178186
# In integration tests, the operator itself will be run from the go test process.
179187
- task: build:image
@@ -193,6 +201,7 @@ tasks:
193201
deps:
194202
- task: test:pull-images
195203
- task: build:image
204+
- task: build:charts
196205
vars:
197206
CLI_ARGS: '' # Don't forward CLI args to build:image
198207
vars:

operator/cmd/run/run.go

Lines changed: 9 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,18 @@ import (
1717
"fmt"
1818
"os"
1919
"path/filepath"
20-
"slices"
2120
"strings"
2221
"time"
2322

2423
"github.com/cockroachdb/errors"
2524
"github.com/spf13/cobra"
26-
"github.com/spf13/pflag"
2725
helmkube "helm.sh/helm/v3/pkg/kube"
28-
appsv1 "k8s.io/api/apps/v1"
2926
corev1 "k8s.io/api/core/v1"
30-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3127
"k8s.io/apimachinery/pkg/labels"
32-
"k8s.io/apimachinery/pkg/types"
3328
_ "k8s.io/client-go/plugin/pkg/client/auth"
34-
"k8s.io/utils/ptr"
3529
ctrl "sigs.k8s.io/controller-runtime"
3630
"sigs.k8s.io/controller-runtime/pkg/cache"
3731
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
38-
"sigs.k8s.io/controller-runtime/pkg/client"
39-
kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
4032
"sigs.k8s.io/controller-runtime/pkg/healthz"
4133
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4234
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -57,10 +49,10 @@ import (
5749
adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin"
5850
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
5951
consolepkg "github.com/redpanda-data/redpanda-operator/operator/pkg/console"
60-
pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
6152
"github.com/redpanda-data/redpanda-operator/operator/pkg/resources"
6253
pkgsecrets "github.com/redpanda-data/redpanda-operator/operator/pkg/secrets"
6354
redpandawebhooks "github.com/redpanda-data/redpanda-operator/operator/webhooks/redpanda"
55+
"github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
6456
)
6557

6658
type RedpandaController string
@@ -90,32 +82,6 @@ var availableControllers = []string{
9082
DecommissionController.toString(),
9183
}
9284

93-
type LabelSelectorValue struct {
94-
Selector labels.Selector
95-
}
96-
97-
var _ pflag.Value = ((*LabelSelectorValue)(nil))
98-
99-
func (s *LabelSelectorValue) Set(value string) error {
100-
if value == "" {
101-
return nil
102-
}
103-
var err error
104-
s.Selector, err = labels.Parse(value)
105-
return err
106-
}
107-
108-
func (s *LabelSelectorValue) String() string {
109-
if s.Selector == nil {
110-
return ""
111-
}
112-
return s.Selector.String()
113-
}
114-
115-
func (s *LabelSelectorValue) Type() string {
116-
return "label selector"
117-
}
118-
11985
// Metrics RBAC permissions
12086
// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create;
12187
// +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create;
@@ -148,7 +114,7 @@ func Command() *cobra.Command {
148114
operatorMode bool
149115
ghostbuster bool
150116
unbindPVCsAfter time.Duration
151-
unbinderSelector LabelSelectorValue
117+
unbinderSelector pflagutil.LabelSelectorValue
152118
allowPVRebinding bool
153119
autoDeletePVCs bool
154120
webhookCertPath string
@@ -295,21 +261,6 @@ func Command() *cobra.Command {
295261
return cmd
296262
}
297263

298-
type v1Fetcher struct {
299-
client kubeClient.Client
300-
}
301-
302-
func (f *v1Fetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) {
303-
var vectorizedCluster vectorizedv1alpha1.Cluster
304-
if err := f.client.Get(ctx, types.NamespacedName{
305-
Name: name,
306-
Namespace: namespace,
307-
}, &vectorizedCluster); err != nil {
308-
return nil, err
309-
}
310-
return &vectorizedCluster, nil
311-
}
312-
313264
//nolint:funlen,gocyclo // length looks good
314265
func Run(
315266
ctx context.Context,
@@ -704,7 +655,11 @@ func Run(
704655
}
705656

706657
if enableGhostBrokerDecommissioner {
707-
d := decommissioning.NewStatefulSetDecommissioner(mgr, &v1Fetcher{client: mgr.GetClient()},
658+
factory := internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient()).WithAdminClientTimeout(rpClientTimeout)
659+
adapter := vectorizedDecommissionerAdapter{factory: factory, client: mgr.GetClient()}
660+
d := decommissioning.NewStatefulSetDecommissioner(
661+
mgr,
662+
adapter.getAdminClient,
708663
decommissioning.WithSyncPeriod(ghostBrokerDecommissionerSyncPeriod),
709664
decommissioning.WithCleanupPVCs(false),
710665
// In Operator v1, decommissioning based on pod ordinal is not correct because
@@ -714,105 +669,8 @@ func Run(
714669
decommissioning.WithDecommisionOnTooHighOrdinal(false),
715670
// Operator v1 supports multiple NodePools, and therefore multiple STS.
716671
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
717-
decommissioning.WithDesiredReplicasFetcher(func(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) {
718-
// Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
719-
idx := slices.IndexFunc(
720-
sts.OwnerReferences,
721-
func(ownerRef metav1.OwnerReference) bool {
722-
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
723-
})
724-
if idx == -1 {
725-
return 0, nil
726-
}
727-
728-
var vectorizedCluster vectorizedv1alpha1.Cluster
729-
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
730-
Name: sts.OwnerReferences[idx].Name,
731-
Namespace: sts.Namespace,
732-
}, &vectorizedCluster); err != nil {
733-
return 0, fmt.Errorf("could not get Cluster: %w", err)
734-
}
735-
736-
// We assume the cluster is fine and synced, checks have been performed in the filter already.
737-
738-
// Get all nodepool-sts for this Cluster
739-
var stsList appsv1.StatefulSetList
740-
err := mgr.GetClient().List(ctx, &stsList, &client.ListOptions{
741-
LabelSelector: pkglabels.ForCluster(&vectorizedCluster).AsClientSelector(),
742-
})
743-
if err != nil {
744-
return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err)
745-
}
746-
747-
if len(stsList.Items) == 0 {
748-
return 0, errors.New("found 0 StatefulSets for this Cluster")
749-
}
750-
751-
var allReplicas int32
752-
for _, sts := range stsList.Items {
753-
allReplicas += ptr.Deref(sts.Spec.Replicas, 0)
754-
}
755-
756-
// Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
757-
if allReplicas < 3 {
758-
return 0, fmt.Errorf("found %d desiredReplicas, but want >= 3", allReplicas)
759-
}
760-
761-
if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas {
762-
return 0, fmt.Errorf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas)
763-
}
764-
765-
return allReplicas, nil
766-
}),
767-
decommissioning.WithFactory(internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())),
768-
decommissioning.WithFilter(func(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
769-
log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter")
770-
idx := slices.IndexFunc(
771-
sts.OwnerReferences,
772-
func(ownerRef metav1.OwnerReference) bool {
773-
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
774-
})
775-
if idx == -1 {
776-
return false, nil
777-
}
778-
779-
var vectorizedCluster vectorizedv1alpha1.Cluster
780-
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
781-
Name: sts.OwnerReferences[idx].Name,
782-
Namespace: sts.Namespace,
783-
}, &vectorizedCluster); err != nil {
784-
return false, fmt.Errorf("could not get Cluster: %w", err)
785-
}
786-
787-
managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed"
788-
if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
789-
log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace)
790-
return false, nil
791-
}
792-
793-
// Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
794-
// (and we can therefore not use it to check if the cluster is synced otherwise)
795-
if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas {
796-
log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
797-
return false, nil
798-
}
799-
if vectorizedCluster.Status.Restarting {
800-
log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
801-
return false, nil
802-
}
803-
804-
if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation {
805-
log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration)
806-
return false, nil
807-
}
808-
809-
if vectorizedCluster.Status.DecommissioningNode != nil {
810-
log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode)
811-
return false, nil
812-
}
813-
814-
return true, nil
815-
}),
672+
decommissioning.WithDesiredReplicasFetcher(adapter.desiredReplicas),
673+
decommissioning.WithFilter(adapter.filter),
816674
)
817675
if err := d.SetupWithManager(mgr); err != nil {
818676
setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner")

0 commit comments

Comments
 (0)