Skip to content

Commit c685e83

Browse files
committed
operator: various sidecar fixes
Prior to this commit the operator sidecar's decommissioner and pvcunbinder controllers did not work. This was due to: - RBAC issues, the sidecar did not correctly scope itself to a single namespace. - Incorrect label selectors hidden within the controllers in question. Additionally, the statefulset decommissioner's sole test case has been disabled for quite sometime. There's been zero test coverage of this functionality. This commit: - Restores the decommissioner's tests to a working state - Strips out the "fetcher" to reduce duplication and remove reliance on fetching live helm values. - Replaces baked in filtering with a label selector argument that will be constructed by the helm chart. A follow up commit with chart changes and acceptance tests will be submitted. It's been made separate to ease the process of backporting to the v2.x.x branches. (cherry picked from commit 03dd394) # Conflicts: # operator/cmd/run/run.go # operator/go.mod # pkg/go.mod
1 parent b61a93a commit c685e83

File tree

15 files changed

+389
-636
lines changed

15 files changed

+389
-636
lines changed

Taskfile.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ tasks:
146146
vars:
147147
CLI_ARGS: '--load {{.CLI_ARGS}}'
148148

149+
build:charts:
150+
desc: "Run helm dep build for all charts"
151+
cmds:
152+
- helm repo add redpanda https://charts.redpanda.com
153+
- helm dep build ./charts/redpanda
154+
149155
test:unit:
150156
desc: "Run all unit tests (~5m)"
151157
vars:
@@ -161,6 +167,7 @@ tasks:
161167
# The operator image is required to test the configurator and sidecar.
162168
# In integration tests, the operator itself will be run from the go test process.
163169
- build:image
170+
- build:charts
164171
cmds:
165172
- task: charts:kind-cluster
166173
- kind load docker-image localhost/redpanda-operator:dev
@@ -174,6 +181,7 @@ tasks:
174181
desc: "Run all acceptance tests (~90m)"
175182
deps:
176183
- build:image
184+
- build:charts
177185
vars:
178186
GO_TEST_RUNNER: '{{default "go test" .GO_TEST_RUNNER}}'
179187
CLI_ARGS: '{{.CLI_ARGS}} -tags=acceptance -run "^TestAcceptance" -timeout 20m -v'

operator/cmd/run/run.go

Lines changed: 11 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,19 @@ import (
1717
"fmt"
1818
"os"
1919
"path/filepath"
20-
"slices"
2120
"strings"
2221
"time"
2322

2423
"github.com/cockroachdb/errors"
2524
fluxclient "github.com/fluxcd/pkg/runtime/client"
2625
"github.com/spf13/cobra"
27-
"github.com/spf13/pflag"
2826
helmkube "helm.sh/helm/v3/pkg/kube"
29-
appsv1 "k8s.io/api/apps/v1"
3027
corev1 "k8s.io/api/core/v1"
31-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3228
"k8s.io/apimachinery/pkg/labels"
33-
"k8s.io/apimachinery/pkg/types"
3429
_ "k8s.io/client-go/plugin/pkg/client/auth"
35-
"k8s.io/utils/ptr"
3630
ctrl "sigs.k8s.io/controller-runtime"
3731
"sigs.k8s.io/controller-runtime/pkg/cache"
3832
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
39-
"sigs.k8s.io/controller-runtime/pkg/client"
40-
kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
4133
"sigs.k8s.io/controller-runtime/pkg/healthz"
4234
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4335
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -58,10 +50,10 @@ import (
5850
adminutils "github.com/redpanda-data/redpanda-operator/operator/pkg/admin"
5951
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
6052
consolepkg "github.com/redpanda-data/redpanda-operator/operator/pkg/console"
61-
pkglabels "github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
6253
"github.com/redpanda-data/redpanda-operator/operator/pkg/resources"
6354
pkgsecrets "github.com/redpanda-data/redpanda-operator/operator/pkg/secrets"
6455
redpandawebhooks "github.com/redpanda-data/redpanda-operator/operator/webhooks/redpanda"
56+
"github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
6557
)
6658

6759
type RedpandaController string
@@ -99,32 +91,6 @@ var (
9991
}
10092
)
10193

102-
type LabelSelectorValue struct {
103-
Selector labels.Selector
104-
}
105-
106-
var _ pflag.Value = ((*LabelSelectorValue)(nil))
107-
108-
func (s *LabelSelectorValue) Set(value string) error {
109-
if value == "" {
110-
return nil
111-
}
112-
var err error
113-
s.Selector, err = labels.Parse(value)
114-
return err
115-
}
116-
117-
func (s *LabelSelectorValue) String() string {
118-
if s.Selector == nil {
119-
return ""
120-
}
121-
return s.Selector.String()
122-
}
123-
124-
func (s *LabelSelectorValue) Type() string {
125-
return "label selector"
126-
}
127-
12894
// Metrics RBAC permissions
12995
// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create;
13096
// +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create;
@@ -157,7 +123,7 @@ func Command() *cobra.Command {
157123
enableHelmControllers bool
158124
ghostbuster bool
159125
unbindPVCsAfter time.Duration
160-
unbinderSelector LabelSelectorValue
126+
unbinderSelector pflagutil.LabelSelectorValue
161127
allowPVRebinding bool
162128
autoDeletePVCs bool
163129
forceDefluxedMode bool
@@ -313,21 +279,6 @@ func Command() *cobra.Command {
313279
return cmd
314280
}
315281

316-
type v1Fetcher struct {
317-
client kubeClient.Client
318-
}
319-
320-
func (f *v1Fetcher) FetchLatest(ctx context.Context, name, namespace string) (any, error) {
321-
var vectorizedCluster vectorizedv1alpha1.Cluster
322-
if err := f.client.Get(ctx, types.NamespacedName{
323-
Name: name,
324-
Namespace: namespace,
325-
}, &vectorizedCluster); err != nil {
326-
return nil, err
327-
}
328-
return &vectorizedCluster, nil
329-
}
330-
331282
//nolint:funlen,gocyclo // length looks good
332283
func Run(
333284
ctx context.Context,
@@ -723,115 +674,22 @@ func Run(
723674
}
724675

725676
if enableGhostBrokerDecommissioner {
726-
d := decommissioning.NewStatefulSetDecommissioner(mgr, &v1Fetcher{client: mgr.GetClient()},
677+
factory := internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())
678+
adapter := vectorizedDecommissionerAdapter{factory: factory, client: mgr.GetClient()}
679+
d := decommissioning.NewStatefulSetDecommissioner(
680+
mgr,
681+
adapter.getAdminClient,
682+
decommissioning.WithFilter(adapter.filter),
683+
// Operator v1 supports multiple NodePools, and therefore multiple STS.
684+
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
685+
decommissioning.WithDesiredReplicasFetcher(adapter.desiredReplicas),
727686
decommissioning.WithSyncPeriod(ghostBrokerDecommissionerSyncPeriod),
728687
decommissioning.WithCleanupPVCs(false),
729688
// In Operator v1, decommissioning based on pod ordinal is not correct because
730689
// it has controller code that manages decommissioning. If something else decommissions the node, it can not deal with this under all circumstances because of various reasons, eg. bercause of a protection against stale status reads of status.currentReplicas
731690
// (http://github.com/redpanda-data/redpanda-operator/blob/main/operator/pkg/resources/statefulset_scale.go#L139)
732691
// In addition to this situation where it can not (always) recover, it is just not desired that it interferes with graceful, "standard" decommissions (at least, in Operator v1 mode)
733692
decommissioning.WithDecommisionOnTooHighOrdinal(false),
734-
// Operator v1 supports multiple NodePools, and therefore multiple STS.
735-
// This function provides a custom replica count: the desired replicas of all STS, instead of a single STS.
736-
decommissioning.WithDesiredReplicasFetcher(func(ctx context.Context, sts *appsv1.StatefulSet) (int32, error) {
737-
// Get Cluster CR, so we can then find its StatefulSets for a full count of desired replicas.
738-
idx := slices.IndexFunc(
739-
sts.OwnerReferences,
740-
func(ownerRef metav1.OwnerReference) bool {
741-
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
742-
})
743-
if idx == -1 {
744-
return 0, nil
745-
}
746-
747-
var vectorizedCluster vectorizedv1alpha1.Cluster
748-
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
749-
Name: sts.OwnerReferences[idx].Name,
750-
Namespace: sts.Namespace,
751-
}, &vectorizedCluster); err != nil {
752-
return 0, fmt.Errorf("could not get Cluster: %w", err)
753-
}
754-
755-
// We assume the cluster is fine and synced, checks have been performed in the filter already.
756-
757-
// Get all nodepool-sts for this Cluster
758-
var stsList appsv1.StatefulSetList
759-
err := mgr.GetClient().List(ctx, &stsList, &client.ListOptions{
760-
LabelSelector: pkglabels.ForCluster(&vectorizedCluster).AsClientSelector(),
761-
})
762-
if err != nil {
763-
return 0, fmt.Errorf("failed to list statefulsets of Cluster: %w", err)
764-
}
765-
766-
if len(stsList.Items) == 0 {
767-
return 0, errors.New("found 0 StatefulSets for this Cluster")
768-
}
769-
770-
var allReplicas int32
771-
for _, sts := range stsList.Items {
772-
allReplicas += ptr.Deref(sts.Spec.Replicas, 0)
773-
}
774-
775-
// Should not happen, but if it actually happens, we don't want to run ghost broker decommissioner.
776-
if allReplicas < 3 {
777-
return 0, fmt.Errorf("found %d desiredReplicas, but want >= 3", allReplicas)
778-
}
779-
780-
if allReplicas != vectorizedCluster.Status.CurrentReplicas || allReplicas != vectorizedCluster.Status.Replicas {
781-
return 0, fmt.Errorf("replicas not synced. status.currentReplicas=%d,status.replicas=%d,allReplicas=%d", vectorizedCluster.Status.CurrentReplicas, vectorizedCluster.Status.Replicas, allReplicas)
782-
}
783-
784-
return allReplicas, nil
785-
}),
786-
decommissioning.WithFactory(internalclient.NewFactory(mgr.GetConfig(), mgr.GetClient())),
787-
decommissioning.WithFilter(func(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
788-
log := ctrl.LoggerFrom(ctx, "namespace", sts.Namespace).WithName("StatefulSetDecomissioner.Filter")
789-
idx := slices.IndexFunc(
790-
sts.OwnerReferences,
791-
func(ownerRef metav1.OwnerReference) bool {
792-
return ownerRef.APIVersion == vectorizedv1alpha1.GroupVersion.String() && ownerRef.Kind == "Cluster"
793-
})
794-
if idx == -1 {
795-
return false, nil
796-
}
797-
798-
var vectorizedCluster vectorizedv1alpha1.Cluster
799-
if err := mgr.GetClient().Get(ctx, types.NamespacedName{
800-
Name: sts.OwnerReferences[idx].Name,
801-
Namespace: sts.Namespace,
802-
}, &vectorizedCluster); err != nil {
803-
return false, fmt.Errorf("could not get Cluster: %w", err)
804-
}
805-
806-
managedAnnotationKey := vectorizedv1alpha1.GroupVersion.Group + "/managed"
807-
if managed, exists := vectorizedCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
808-
log.V(1).Info("ignoring StatefulSet of unmanaged V1 Cluster", "sts", sts.Name, "namespace", sts.Namespace)
809-
return false, nil
810-
}
811-
812-
// Do some "manual" checks, as ClusterlQuiescent condition is always false if a ghost broker causes unhealthy cluster
813-
// (and we can therefore not use it to check if the cluster is synced otherwise)
814-
if vectorizedCluster.Status.CurrentReplicas != vectorizedCluster.Status.Replicas {
815-
log.V(1).Info("replicas are not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
816-
return false, nil
817-
}
818-
if vectorizedCluster.Status.Restarting {
819-
log.V(1).Info("cluster is restarting", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace)
820-
return false, nil
821-
}
822-
823-
if vectorizedCluster.Status.ObservedGeneration != vectorizedCluster.Generation {
824-
log.V(1).Info("generation not synced", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "generation", vectorizedCluster.Generation, "observedGeneration", vectorizedCluster.Status.ObservedGeneration)
825-
return false, nil
826-
}
827-
828-
if vectorizedCluster.Status.DecommissioningNode != nil {
829-
log.V(1).Info("decommission in progress", "cluster", vectorizedCluster.Name, "namespace", vectorizedCluster.Namespace, "node", *vectorizedCluster.Status.DecommissioningNode)
830-
return false, nil
831-
}
832-
833-
return true, nil
834-
}),
835693
)
836694
if err := d.SetupWithManager(mgr); err != nil {
837695
setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner")

0 commit comments

Comments
 (0)