Skip to content

Commit 0bc1b40

Browse files
committed
operator v1: do not decom broker if it's missing in RP
avoid setting decommissionBrokerID for an already-decommissioned-broker by also checking in redpanda if that broker exists.
1 parent 585a918 commit 0bc1b40

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

operator/pkg/resources/statefulset_scale.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,23 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
138138
podName := fmt.Sprintf("%s-%d", r.LastObservedState.Name, targetOrdinal)
139139
log.WithValues("pod", podName, "ordinal", targetOrdinal, "node_id", targetBroker).Info("start decommission broker")
140140

141+
// Make sure the broker we want to decommission actually exists in redpanda itself.
142+
// If not, error out, it is not a supported condition, and could be result of a stale read of status.nodePools[*].currentReplicas.
143+
adminClient, err := r.getAdminAPIClient(ctx)
144+
if err != nil {
145+
return fmt.Errorf("failed to create admin API client: %w", err)
146+
}
147+
broker, err := getBrokerByBrokerID(ctx, *targetBroker, adminClient)
148+
if err != nil {
149+
return fmt.Errorf("failed to get broker by broker ID: %w", err)
150+
}
151+
if broker == nil {
152+
return &RequeueAfterError{
153+
RequeueAfter: RequeueDuration,
154+
Msg: fmt.Sprintf("attempting to decommission broker %d, but redpanda reports that it does not exist", *targetBroker),
155+
}
156+
}
157+
141158
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
142159
cluster := &vectorizedv1alpha1.Cluster{}
143160
err := r.Get(ctx, types.NamespacedName{

0 commit comments

Comments
 (0)