Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions operator/cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlClient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -252,8 +253,15 @@ func Run(

adminAPIClientFactory := adminutils.CachedNodePoolAdminAPIClientFactory(adminutils.NewNodePoolInternalAdminAPI)

restConfig := mgr.GetConfig()
uncachedClient, err := ctrlClient.New(restConfig, ctrlClient.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
if err != nil {
return fmt.Errorf("failed to create uncached client: %w", err)
}

if err = (&vectorizedcontrollers.ClusterReconciler{
Client: mgr.GetClient(),
UncachedClient: uncachedClient,
Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"),
Scheme: mgr.GetScheme(),
AdminAPIClientFactory: adminutils.NewNodePoolInternalAdminAPI,
Expand Down
35 changes: 22 additions & 13 deletions operator/internal/controller/vectorized/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var (
// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
UncachedClient client.Client
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Expand Down Expand Up @@ -124,17 +125,25 @@ func (r *ClusterReconciler) Reconcile(

var vectorizedCluster vectorizedv1alpha1.Cluster
ar := newAttachedResources(ctx, r, log, &vectorizedCluster)
if err := r.Get(ctx, req.NamespacedName, &vectorizedCluster); err != nil {
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
if apierrors.IsNotFound(err) {
if removeError := ar.getClusterRoleBinding().RemoveSubject(ctx, req.NamespacedName); removeError != nil {
return ctrl.Result{}, fmt.Errorf("unable to remove subject in ClusterroleBinding: %w", removeError)
{
// Perform Get with uncached client, to avoid cache races, where we don't
// see previous changes performed via Patch/Update.
getClient := r.Client
if r.UncachedClient != nil {
getClient = r.UncachedClient
}
if err := getClient.Get(ctx, req.NamespacedName, &vectorizedCluster); err != nil {
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
if apierrors.IsNotFound(err) {
if removeError := ar.getClusterRoleBinding().RemoveSubject(ctx, req.NamespacedName); removeError != nil {
return ctrl.Result{}, fmt.Errorf("unable to remove subject in ClusterroleBinding: %w", removeError)
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
return ctrl.Result{}, fmt.Errorf("unable to retrieve Cluster resource: %w", err)
}
return ctrl.Result{}, fmt.Errorf("unable to retrieve Cluster resource: %w", err)
}

// After every reconciliation, update status:
Expand Down Expand Up @@ -233,11 +242,11 @@ func (r *ClusterReconciler) Reconcile(
}

result, errs := ar.Ensure()
if !result.IsZero() && errs == nil {
return result, nil
}
if errs != nil {
return result, errs
return ctrl.Result{}, errs
}
if !result.IsZero() {
return result, nil
}

adminAPI, err := r.AdminAPIClientFactory(ctx, r.Client, &vectorizedCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,15 @@ func (a *attachedResources) Ensure() (ctrl.Result, error) {
errs = errors.Join(errs, err)
}
}
return result, errs
// Controller-runtime does not allow returning a result and an error at the same time.
// We choose to prioritize the error - if there is an error, this is more important
// to us to return than the reconcile.
// Downstream functions are expected to only return errors, if there is an
// actual error condition, not generally to cause a requeue (must use result for this purpose).
if errs != nil {
return ctrl.Result{}, errs
}
return result, nil
}

func (a *attachedResources) bootstrapService() {
Expand Down
12 changes: 8 additions & 4 deletions operator/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,12 @@ func (r *StatefulSetResource) handleDecommissionInProgress(ctx context.Context,

// Decom Pod is from another NodePool. Super important to early exit here, as it's not our business (in this StatefulSet handler).
if brokerPod == nil {
log.Info("decom on other in progress")
return fmt.Errorf("decommission on other NodePool in progress, can't handle decom for this one yet")
log.Info("decom on other NodePool in progress. asking for requeue so this nodepool can get reconciled afterwards.")
return &RequeueAfterError{
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: "decommission on other NodePool in progress, can't handle decom for this one yet",
}

}

if !r.nodePool.Deleted && *r.nodePool.Replicas >= r.pandaCluster.Status.NodePools[r.nodePool.Name].CurrentReplicas {
Expand Down Expand Up @@ -287,12 +291,12 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context, l logr.Log
if err != nil {
return err
}
cluster.Status.DecommissioningNode = nil
cluster.SetDecommissionBrokerID(nil)
err = r.Status().Update(ctx, cluster)
if err == nil {
log.Info("Cleared decomm broker ID from status")
// sync original cluster variable to avoid conflicts on subsequent operations
r.pandaCluster.Status = cluster.Status
r.pandaCluster.SetDecommissionBrokerID(nil)
}
return err
})
Expand Down
15 changes: 8 additions & 7 deletions operator/tests/e2e/decommission/05-assert.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
commands:
- timeout: 300
script: |
kubectl wait --for=condition=ClusterConfigured=True cluster/decommission --timeout 300s --namespace $NAMESPACE
kubectl wait --for=condition=OperatorQuiescent=True cluster/decommission --timeout 300s --namespace $NAMESPACE
---
apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
Expand All @@ -8,12 +16,5 @@ status:
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
commands:
- timeout: 300
script: |
kubectl wait --for=condition=ClusterConfigured=True cluster/decommission --timeout 300s --namespace $NAMESPACE
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
collectors:
- command: ../../../hack/get-redpanda-info.sh