@@ -122,7 +122,7 @@ func (r *DecommissionReconciler) Reconcile(c context.Context, req ctrl.Request)
122122 case corev1 .ConditionTrue :
123123 // condition updated to true, so we proceed to decommission
124124 log .Info ("decommission started" )
125- result , err = r .reconcileDecommission (ctx , sts )
125+ result , err = r .reconcileDecommission (ctx , log , sts )
126126 }
127127
128128 // Decommission status must be reconciled constantly
@@ -222,7 +222,7 @@ func (r *DecommissionReconciler) verifyIfNeedDecommission(ctx context.Context, s
222222 }
223223 }
224224
225- adminAPI , err := buildAdminAPI (releaseName , namespace , requestedReplicas , valuesMap )
225+ adminAPI , err := buildAdminAPI (releaseName , namespace , requestedReplicas , nil , valuesMap )
226226 if err != nil {
227227 return ctrl.Result {}, fmt .Errorf ("could not reconcile, error creating adminapi: %w" , err )
228228 }
@@ -287,15 +287,16 @@ func (r *DecommissionReconciler) verifyIfNeedDecommission(ctx context.Context, s
287287// 11. Finally, reset condition state to unknown if we have been successful so far.
288288//
289289//nolint:funlen // length looks good
290- func (r * DecommissionReconciler ) reconcileDecommission (ctx context.Context , sts * appsv1.StatefulSet ) (ctrl.Result , error ) {
291- log := ctrl .LoggerFrom (ctx ).WithName ("DecommissionReconciler.reconcileDecommission" )
292- Infof (log , "reconciling: %s/%s" , sts .Namespace , sts .Name )
290+ func (r * DecommissionReconciler ) reconcileDecommission (ctx context.Context , log logr.Logger , sts * appsv1.StatefulSet ) (ctrl.Result , error ) {
291+ log = log .WithName ("reconcileDecommission" )
292+
293+ log .Info ("reconciling" , "statefulset-namespace" , sts .Namespace , "statefulset-name" , sts .Name )
293294
294295 namespace := sts .Namespace
295296
296297 releaseName , ok := sts .Labels [K8sInstanceLabelKey ]
297298 if ! ok {
298- log .Info ("could not find instance label to retrieve releaseName" )
299+ log .Info ("could not find instance label to retrieve releaseName" , "label" , K8sInstanceLabelKey )
299300 return ctrl.Result {}, nil
300301 }
301302
@@ -306,7 +307,7 @@ func (r *DecommissionReconciler) reconcileDecommission(ctx context.Context, sts
306307 // we have started decommission, but we want to requeue if we have not transitioned here. This should
307308 // avoid decommissioning the wrong node (broker) id
308309 if statusReplicas != requestedReplicas && sts .Status .UpdatedReplicas == 0 {
309- log .Info ("have not finished terminating and restarted largest ordinal, requeue here" )
310+ log .Info ("have not finished terminating and restarted largest ordinal, requeue here" , "statusReplicas" , statusReplicas , "availableReplicas" , availableReplicas )
310311 return ctrl.Result {Requeue : true , RequeueAfter : 10 * time .Second }, nil
311312 }
312313
@@ -322,7 +323,7 @@ func (r *DecommissionReconciler) reconcileDecommission(ctx context.Context, sts
322323 return ctrl.Result {}, fmt .Errorf ("could not retrieve values, probably not a valid managed helm release: %w" , err )
323324 }
324325
325- adminAPI , err := buildAdminAPI (releaseName , namespace , requestedReplicas , valuesMap )
326+ adminAPI , err := buildAdminAPI (releaseName , namespace , requestedReplicas , nil , valuesMap )
326327 if err != nil {
327328 return ctrl.Result {}, fmt .Errorf ("could not reconcile, error creating adminAPI: %w" , err )
328329 }
@@ -353,34 +354,49 @@ func (r *DecommissionReconciler) reconcileDecommission(ctx context.Context, sts
353354 return ctrl.Result {Requeue : true , RequeueAfter : 10 * time .Second }, nil
354355 }
355356
357+ nodesDownMap := map [int ]any {}
358+ for _ , node := range health .NodesDown {
359+ nodesDownMap [node ] = struct {}{}
360+ }
361+
356362 // perform decommission on down down-nodes but only if down nodes match count of all-nodes-replicas
357363 // the greater case takes care of the situation where we may also have additional ids here.
358364 if len (health .NodesDown ) >= (len (health .AllNodes ) - int (requestedReplicas )) {
359- // TODO guard against intermittent situations where a node is coming up after it being brought down
360- // how do we get a signal of this, it would be easy if we can compare previous situation
361- for i := range health .NodesDown {
362- item := health .NodesDown [i ]
365+ for podOrdinal := 0 ; podOrdinal < int (requestedReplicas ); podOrdinal ++ {
366+ singleNodeAdminAPI , buildErr := buildAdminAPI (releaseName , namespace , requestedReplicas , & podOrdinal , valuesMap )
367+ if buildErr == nil {
368+ log .Error (buildErr , "creating single node AdminAPI" , "pod-ordinal" , podOrdinal )
369+ continue
370+ }
371+ nodeCfg , nodeErr := singleNodeAdminAPI .GetNodeConfig (ctx )
372+ if nodeErr != nil {
373+ log .Error (nodeErr , "getting node configuration" , "pod-ordinal" , podOrdinal )
374+ return ctrl.Result {}, fmt .Errorf ("getting node configuration from pod (%d): %w" , podOrdinal , nodeErr )
375+ }
376+ delete (nodesDownMap , nodeCfg .NodeID )
377+ }
363378
379+ for nodeID := range nodesDownMap {
364380 // Now we check the decommission status before continuing
365381 doDecommission := false
366- status , decommStatusError := adminAPI .DecommissionBrokerStatus (ctx , item )
382+ status , decommStatusError := adminAPI .DecommissionBrokerStatus (ctx , nodeID )
367383 if decommStatusError != nil {
368- Infof ( log , "error found for decommission status: %s " , decommStatusError . Error () )
384+ log . Info ( " found for decommission status error " , " decommStatusError" , decommStatusError )
369385 // nolint:gocritic // no need for a switch, this is ok
370386 if strings .Contains (decommStatusError .Error (), "is not decommissioning" ) {
371387 doDecommission = true
372388 } else if strings .Contains (decommStatusError .Error (), "does not exists" ) {
373- Infof ( log , "nodeID %d does not exist, skipping: %s" , item , decommStatusError . Error () )
389+ log . Info ( "nodeID does not exist, skipping" , "nodeID" , nodeID , " decommStatusError" , decommStatusError )
374390 } else {
375391 errList = errors .Join (errList , fmt .Errorf ("could get decommission status of broker: %w" , decommStatusError ))
376392 }
377393 }
378- Debugf ( log , "decommission status: %v " , status )
394+ log . V ( logger . DebugLevel ). Info ( "decommission status" , "status " , status )
379395
380396 if doDecommission {
381- Infof ( log , "all checks pass, attempting to decommission: %d" , item )
397+ log . Info ( "all checks pass, attempting to decommission" , "nodeID" , nodeID )
382398 // we want a clear signal to avoid 400s here, the suspicion here is an invalid transitional state
383- decomErr := adminAPI .DecommissionBroker (ctx , item )
399+ decomErr := adminAPI .DecommissionBroker (ctx , nodeID )
384400 if decomErr != nil && ! strings .Contains (decomErr .Error (), "failed: Not Found" ) && ! strings .Contains (decomErr .Error (), "failed: Bad Request" ) {
385401 errList = errors .Join (errList , fmt .Errorf ("could not decommission broker: %w" , decomErr ))
386402 }
@@ -563,7 +579,7 @@ func isNameInList(name string, keys []string) bool {
563579 return false
564580}
565581
566- func buildAdminAPI (releaseName , namespace string , replicas int32 , values map [string ]interface {}) (* admin.AdminAPI , error ) {
582+ func buildAdminAPI (releaseName , namespace string , replicas int32 , podOrdinal * int , values map [string ]interface {}) (* admin.AdminAPI , error ) {
567583 tlsEnabled , ok , err := unstructured .NestedBool (values , "tls" , "enabled" )
568584 if ! ok || err != nil {
569585 // probably not a correct helm release, bail
@@ -586,7 +602,7 @@ func buildAdminAPI(releaseName, namespace string, replicas int32, values map[str
586602 tlsConfig = & tls.Config {InsecureSkipVerify : true }
587603 }
588604
589- urls , err := createBrokerURLs (releaseName , namespace , replicas , values )
605+ urls , err := createBrokerURLs (releaseName , namespace , replicas , podOrdinal , values )
590606 if err != nil {
591607 return nil , fmt .Errorf ("could not create broker url: %w" , err )
592608 }
@@ -595,7 +611,7 @@ func buildAdminAPI(releaseName, namespace string, replicas int32, values map[str
595611 return admin .NewAdminAPI (urls , admin.BasicCredentials {}, tlsConfig )
596612}
597613
598- func createBrokerURLs (release , namespace string , replicas int32 , values map [string ]interface {}) ([]string , error ) {
614+ func createBrokerURLs (release , namespace string , replicas int32 , ordinal * int , values map [string ]interface {}) ([]string , error ) {
599615 brokerList := make ([]string , 0 )
600616
601617 fullnameOverride , ok , err := unstructured .NestedString (values , "fullnameOverride" )
@@ -621,8 +637,12 @@ func createBrokerURLs(release, namespace string, replicas int32, values map[stri
621637 return brokerList , fmt .Errorf ("could not retrieve clusterDomain: %s; error: %w" , domain , err )
622638 }
623639
624- for i := 0 ; i < int (replicas ); i ++ {
625- brokerList = append (brokerList , fmt .Sprintf ("%s-%d.%s.%s.svc.%s:%d" , release , i , serviceName , namespace , domain , port ))
640+ if ordinal == nil {
641+ for i := 0 ; i < int (replicas ); i ++ {
642+ brokerList = append (brokerList , fmt .Sprintf ("%s-%d.%s.%s.svc.%s:%d" , release , i , serviceName , namespace , domain , port ))
643+ }
644+ } else {
645+ brokerList = append (brokerList , fmt .Sprintf ("%s-%d.%s.%s.svc.%s:%d" , release , * ordinal , serviceName , namespace , domain , port ))
626646 }
627647
628648 return brokerList , nil
0 commit comments