Skip to content

Commit d9aa68e

Browse files
committed
Do not decommission NodesDown that are starting
During decommission nodes Redpanda health response can return nodes that are starting and they are included to NodesDown part of the response. The logic should check all NodesIDs in each Redpanda to filter nodes that could start, but should not be decommissioned.
1 parent 6c288ea commit d9aa68e

File tree

1 file changed

+43
-23
lines changed

1 file changed

+43
-23
lines changed

src/go/k8s/internal/controller/redpanda/redpanda_decommission_controller.go

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (r *DecommissionReconciler) Reconcile(c context.Context, req ctrl.Request)
122122
case corev1.ConditionTrue:
123123
// condition updated to true, so we proceed to decommission
124124
log.Info("decommission started")
125-
result, err = r.reconcileDecommission(ctx, sts)
125+
result, err = r.reconcileDecommission(ctx, log, sts)
126126
}
127127

128128
// Decommission status must be reconciled constantly
@@ -222,7 +222,7 @@ func (r *DecommissionReconciler) verifyIfNeedDecommission(ctx context.Context, s
222222
}
223223
}
224224

225-
adminAPI, err := buildAdminAPI(releaseName, namespace, requestedReplicas, valuesMap)
225+
adminAPI, err := buildAdminAPI(releaseName, namespace, requestedReplicas, nil, valuesMap)
226226
if err != nil {
227227
return ctrl.Result{}, fmt.Errorf("could not reconcile, error creating adminapi: %w", err)
228228
}
@@ -287,15 +287,16 @@ func (r *DecommissionReconciler) verifyIfNeedDecommission(ctx context.Context, s
287287
// 11. Finally, reset condition state to unknown if we have been successful so far.
288288
//
289289
//nolint:funlen // length looks good
290-
func (r *DecommissionReconciler) reconcileDecommission(ctx context.Context, sts *appsv1.StatefulSet) (ctrl.Result, error) {
291-
log := ctrl.LoggerFrom(ctx).WithName("DecommissionReconciler.reconcileDecommission")
292-
Infof(log, "reconciling: %s/%s", sts.Namespace, sts.Name)
290+
func (r *DecommissionReconciler) reconcileDecommission(ctx context.Context, log logr.Logger, sts *appsv1.StatefulSet) (ctrl.Result, error) {
291+
log = log.WithName("reconcileDecommission")
292+
293+
log.Info("reconciling", "statefulset-namespace", sts.Namespace, "statefulset-name", sts.Name)
293294

294295
namespace := sts.Namespace
295296

296297
releaseName, ok := sts.Labels[K8sInstanceLabelKey]
297298
if !ok {
298-
log.Info("could not find instance label to retrieve releaseName")
299+
log.Info("could not find instance label to retrieve releaseName", "label", K8sInstanceLabelKey)
299300
return ctrl.Result{}, nil
300301
}
301302

@@ -306,7 +307,7 @@ func (r *DecommissionReconciler) reconcileDecommission(ctx context.Context, sts
306307
// we have started decommission, but we want to requeue if we have not transitioned here. This should
307308
// avoid decommissioning the wrong node (broker) id
308309
if statusReplicas != requestedReplicas && sts.Status.UpdatedReplicas == 0 {
309-
log.Info("have not finished terminating and restarted largest ordinal, requeue here")
310+
log.Info("have not finished terminating and restarted largest ordinal, requeue here", "statusReplicas", statusReplicas, "availableReplicas", availableReplicas)
310311
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, nil
311312
}
312313

@@ -322,7 +323,7 @@ func (r *DecommissionReconciler) reconcileDecommission(ctx context.Context, sts
322323
return ctrl.Result{}, fmt.Errorf("could not retrieve values, probably not a valid managed helm release: %w", err)
323324
}
324325

325-
adminAPI, err := buildAdminAPI(releaseName, namespace, requestedReplicas, valuesMap)
326+
adminAPI, err := buildAdminAPI(releaseName, namespace, requestedReplicas, nil, valuesMap)
326327
if err != nil {
327328
return ctrl.Result{}, fmt.Errorf("could not reconcile, error creating adminAPI: %w", err)
328329
}
@@ -353,34 +354,49 @@ func (r *DecommissionReconciler) reconcileDecommission(ctx context.Context, sts
353354
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, nil
354355
}
355356

357+
nodesDownMap := map[int]any{}
358+
for _, node := range health.NodesDown {
359+
nodesDownMap[node] = struct{}{}
360+
}
361+
356362
// perform decommission on down down-nodes but only if down nodes match count of all-nodes-replicas
357363
// the greater case takes care of the situation where we may also have additional ids here.
358364
if len(health.NodesDown) >= (len(health.AllNodes) - int(requestedReplicas)) {
359-
// TODO guard against intermittent situations where a node is coming up after it being brought down
360-
// how do we get a signal of this, it would be easy if we can compare previous situation
361-
for i := range health.NodesDown {
362-
item := health.NodesDown[i]
365+
for podOrdinal := 0; podOrdinal < int(requestedReplicas); podOrdinal++ {
366+
singleNodeAdminAPI, buildErr := buildAdminAPI(releaseName, namespace, requestedReplicas, &podOrdinal, valuesMap)
367+
if buildErr == nil {
368+
log.Error(buildErr, "creating single node AdminAPI", "pod-ordinal", podOrdinal)
369+
continue
370+
}
371+
nodeCfg, nodeErr := singleNodeAdminAPI.GetNodeConfig(ctx)
372+
if nodeErr != nil {
373+
log.Error(nodeErr, "getting node configuration", "pod-ordinal", podOrdinal)
374+
return ctrl.Result{}, fmt.Errorf("getting node configuration from pod (%d): %w", podOrdinal, nodeErr)
375+
}
376+
delete(nodesDownMap, nodeCfg.NodeID)
377+
}
363378

379+
for nodeID := range nodesDownMap {
364380
// Now we check the decommission status before continuing
365381
doDecommission := false
366-
status, decommStatusError := adminAPI.DecommissionBrokerStatus(ctx, item)
382+
status, decommStatusError := adminAPI.DecommissionBrokerStatus(ctx, nodeID)
367383
if decommStatusError != nil {
368-
Infof(log, "error found for decommission status: %s", decommStatusError.Error())
384+
log.Info("found for decommission status error", "decommStatusError", decommStatusError)
369385
// nolint:gocritic // no need for a switch, this is ok
370386
if strings.Contains(decommStatusError.Error(), "is not decommissioning") {
371387
doDecommission = true
372388
} else if strings.Contains(decommStatusError.Error(), "does not exists") {
373-
Infof(log, "nodeID %d does not exist, skipping: %s", item, decommStatusError.Error())
389+
log.Info("nodeID does not exist, skipping", "nodeID", nodeID, "decommStatusError", decommStatusError)
374390
} else {
375391
errList = errors.Join(errList, fmt.Errorf("could get decommission status of broker: %w", decommStatusError))
376392
}
377393
}
378-
Debugf(log, "decommission status: %v", status)
394+
log.V(logger.DebugLevel).Info("decommission status", "status", status)
379395

380396
if doDecommission {
381-
Infof(log, "all checks pass, attempting to decommission: %d", item)
397+
log.Info("all checks pass, attempting to decommission", "nodeID", nodeID)
382398
// we want a clear signal to avoid 400s here, the suspicion here is an invalid transitional state
383-
decomErr := adminAPI.DecommissionBroker(ctx, item)
399+
decomErr := adminAPI.DecommissionBroker(ctx, nodeID)
384400
if decomErr != nil && !strings.Contains(decomErr.Error(), "failed: Not Found") && !strings.Contains(decomErr.Error(), "failed: Bad Request") {
385401
errList = errors.Join(errList, fmt.Errorf("could not decommission broker: %w", decomErr))
386402
}
@@ -563,7 +579,7 @@ func isNameInList(name string, keys []string) bool {
563579
return false
564580
}
565581

566-
func buildAdminAPI(releaseName, namespace string, replicas int32, values map[string]interface{}) (*admin.AdminAPI, error) {
582+
func buildAdminAPI(releaseName, namespace string, replicas int32, podOrdinal *int, values map[string]interface{}) (*admin.AdminAPI, error) {
567583
tlsEnabled, ok, err := unstructured.NestedBool(values, "tls", "enabled")
568584
if !ok || err != nil {
569585
// probably not a correct helm release, bail
@@ -586,7 +602,7 @@ func buildAdminAPI(releaseName, namespace string, replicas int32, values map[str
586602
tlsConfig = &tls.Config{InsecureSkipVerify: true}
587603
}
588604

589-
urls, err := createBrokerURLs(releaseName, namespace, replicas, values)
605+
urls, err := createBrokerURLs(releaseName, namespace, replicas, podOrdinal, values)
590606
if err != nil {
591607
return nil, fmt.Errorf("could not create broker url: %w", err)
592608
}
@@ -595,7 +611,7 @@ func buildAdminAPI(releaseName, namespace string, replicas int32, values map[str
595611
return admin.NewAdminAPI(urls, admin.BasicCredentials{}, tlsConfig)
596612
}
597613

598-
func createBrokerURLs(release, namespace string, replicas int32, values map[string]interface{}) ([]string, error) {
614+
func createBrokerURLs(release, namespace string, replicas int32, ordinal *int, values map[string]interface{}) ([]string, error) {
599615
brokerList := make([]string, 0)
600616

601617
fullnameOverride, ok, err := unstructured.NestedString(values, "fullnameOverride")
@@ -621,8 +637,12 @@ func createBrokerURLs(release, namespace string, replicas int32, values map[stri
621637
return brokerList, fmt.Errorf("could not retrieve clusterDomain: %s; error: %w", domain, err)
622638
}
623639

624-
for i := 0; i < int(replicas); i++ {
625-
brokerList = append(brokerList, fmt.Sprintf("%s-%d.%s.%s.svc.%s:%d", release, i, serviceName, namespace, domain, port))
640+
if ordinal == nil {
641+
for i := 0; i < int(replicas); i++ {
642+
brokerList = append(brokerList, fmt.Sprintf("%s-%d.%s.%s.svc.%s:%d", release, i, serviceName, namespace, domain, port))
643+
}
644+
} else {
645+
brokerList = append(brokerList, fmt.Sprintf("%s-%d.%s.%s.svc.%s:%d", release, *ordinal, serviceName, namespace, domain, port))
626646
}
627647

628648
return brokerList, nil

0 commit comments

Comments
 (0)