diff --git a/internal/controller/isbservicerollout/isbservicerollout_controller.go b/internal/controller/isbservicerollout/isbservicerollout_controller.go index 61706e13..a34f56ed 100644 --- a/internal/controller/isbservicerollout/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout/isbservicerollout_controller.go @@ -313,7 +313,7 @@ func (r *ISBServiceRolloutReconciler) reconcile(ctx context.Context, isbServiceR } // if we still have interstepbufferservices that need deleting, or if we're in the middle of an upgrade strategy, then requeue - if !allDeleted || inProgressStrategy != apiv1.UpgradeStrategyNoOp { + if !allDeleted || inProgressStrategy == apiv1.UpgradeStrategyPPND { if requeueDelay == 0 { requeueDelay = common.DefaultRequeueDelay } else { @@ -454,33 +454,39 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont case apiv1.UpgradeStrategyProgressive: numaLogger.Debug("processing InterstepBufferService with Progressive") - done, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, isbServiceRollout, existingISBServiceDef, needsUpdate, r, r.client) + assessmentComplete, failed, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, isbServiceRollout, existingISBServiceDef, needsUpdate, r, r.client) if err != nil { return 0, fmt.Errorf("error processing isbsvc with progressive: %s", err.Error()) } - if done { - // update the list of riders in the Status based on our child which was just promoted - promotedISBService, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, isbServiceRollout, common.LabelValueUpgradePromoted, nil, true, r.client) - if err != nil { - return 0, err - } - - currentRiderList, err := r.GetDesiredRiders(isbServiceRollout, promotedISBService.GetName(), promotedISBService) - if err != nil { - return 0, fmt.Errorf("error getting desired Riders for pipeline %s: %s", newISBServiceDef.GetName(), err) - } - r.SetCurrentRiderList(ctx, isbServiceRollout, currentRiderList) - - r.inProgressStrategyMgr.UnsetStrategy(ctx, isbServiceRollout) + requeueDelay := time.Duration(0) + if progressiveRequeueDelay > 0 { + requeueDelay = progressiveRequeueDelay + } + if assessmentComplete { + if !failed { + // update the list of riders in the Status based on our child which was just promoted + promotedISBService, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, isbServiceRollout, common.LabelValueUpgradePromoted, nil, true, r.client) + if err != nil { + return 0, err + } - // Update metrics for progressive rollout - if isbServiceRollout.GetUpgradingChildStatus() != nil { - // assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same - assessmentResult := metrics.EvaluateSuccessStatusForMetrics(isbServiceRollout.GetUpgradingChildStatus().AssessmentResult) - if assessmentResult != "" { - r.customMetrics.IncISBServiceProgressiveCompleted(isbServiceRollout.GetRolloutObjectMeta().GetNamespace(), isbServiceRollout.GetRolloutObjectMeta().GetName(), - isbServiceRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(isbServiceRollout.GetUpgradingChildStatus().BasicAssessmentResult), - assessmentResult, isbServiceRollout.GetUpgradingChildStatus().ForcedSuccess) + currentRiderList, err := r.GetDesiredRiders(isbServiceRollout, promotedISBService.GetName(), promotedISBService) + if err != nil { + return 0, fmt.Errorf("error getting desired Riders for pipeline %s: %s", newISBServiceDef.GetName(), err) + } + r.SetCurrentRiderList(ctx, isbServiceRollout, currentRiderList) + + r.inProgressStrategyMgr.UnsetStrategy(ctx, isbServiceRollout) + + // Update metrics for progressive rollout + if isbServiceRollout.GetUpgradingChildStatus() != nil { + // assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same + assessmentResult := metrics.EvaluateSuccessStatusForMetrics(isbServiceRollout.GetUpgradingChildStatus().AssessmentResult) + if assessmentResult != "" { + r.customMetrics.IncISBServiceProgressiveCompleted(isbServiceRollout.GetRolloutObjectMeta().GetNamespace(), isbServiceRollout.GetRolloutObjectMeta().GetName(), + isbServiceRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(isbServiceRollout.GetUpgradingChildStatus().BasicAssessmentResult), + assessmentResult, isbServiceRollout.GetUpgradingChildStatus().ForcedSuccess) + } } } } else { @@ -498,8 +504,9 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont } // requeue using the provided delay - return progressiveRequeueDelay, nil + requeueDelay = progressiveRequeueDelay } + return requeueDelay, nil case apiv1.UpgradeStrategyApply: // update ISBService err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef, needsRecreate) diff --git a/internal/controller/monovertexrollout/monovertexrollout_controller.go b/internal/controller/monovertexrollout/monovertexrollout_controller.go index df919642..5fbfddf2 100644 --- a/internal/controller/monovertexrollout/monovertexrollout_controller.go +++ b/internal/controller/monovertexrollout/monovertexrollout_controller.go @@ -266,8 +266,6 @@ func (r *MonoVertexRolloutReconciler) reconcile(ctx context.Context, monoVertexR } } - inProgressStrategy := r.inProgressStrategyMgr.GetStrategy(ctx, monoVertexRollout) - // clean up recyclable monovertices allDeleted, err := ctlrcommon.GarbageCollectChildren(ctx, monoVertexRollout, r, r.client) if err != nil { @@ -275,7 +273,7 @@ func (r *MonoVertexRolloutReconciler) reconcile(ctx context.Context, monoVertexR } // if we still have monovertices that need deleting, or if we're in the middle of an upgrade strategy, then requeue - if !allDeleted || inProgressStrategy != apiv1.UpgradeStrategyNoOp { + if !allDeleted { if requeueDelay == 0 { requeueDelay = common.DefaultRequeueDelay } else { @@ -357,53 +355,57 @@ func (r *MonoVertexRolloutReconciler) processExistingMonoVertex(ctx context.Cont } } - done, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, monoVertexRollout, existingMonoVertexDef, needsUpdate, r, r.client) + assessmentComplete, failed, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, monoVertexRollout, existingMonoVertexDef, needsUpdate, r, r.client) if err != nil { return 0, err } - if done { - - // update the list of riders in the Status based on our child which was just promoted - promotedMonoVertex, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, monoVertexRollout, common.LabelValueUpgradePromoted, nil, true, r.client) - if err != nil { - return 0, err - } + if progressiveRequeueDelay > 0 { + requeueDelay = progressiveRequeueDelay + } + if assessmentComplete { + if !failed { + // update the list of riders in the Status based on our child which was just promoted + promotedMonoVertex, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, monoVertexRollout, common.LabelValueUpgradePromoted, nil, true, r.client) + if err != nil { + return 0, err + } - // update promotedPodSelector immediately so VPA targets the newly promoted child's pods - // without waiting for the next reconcile - monoVertexRollout.Status.PromotedPodSelector = buildPromotedPodSelector(promotedMonoVertex.GetName()) + // update promotedPodSelector immediately so VPA targets the newly promoted child's pods + // without waiting for the next reconcile + monoVertexRollout.Status.PromotedPodSelector = buildPromotedPodSelector(promotedMonoVertex.GetName()) - currentRiderList, err := r.GetDesiredRiders(monoVertexRollout, promotedMonoVertex.GetName(), promotedMonoVertex) - if err != nil { - return 0, fmt.Errorf("error getting desired Riders for MonoVertex %s: %s", newMonoVertexDef.GetName(), err) - } - r.SetCurrentRiderList(ctx, monoVertexRollout, currentRiderList) + currentRiderList, err := r.GetDesiredRiders(monoVertexRollout, promotedMonoVertex.GetName(), promotedMonoVertex) + if err != nil { + return 0, fmt.Errorf("error getting desired Riders for MonoVertex %s: %s", newMonoVertexDef.GetName(), err) + } + r.SetCurrentRiderList(ctx, monoVertexRollout, currentRiderList) - // we need to prevent the possibility that we're done but we fail to update the Progressive Status - // therefore, we publish Rollout.Status here, so if that fails, then we won't be "done" and so we'll come back in here to try again - err = r.updateMonoVertexRolloutStatus(ctx, monoVertexRollout) - if err != nil { - return 0, err - } + // we need to prevent the possibility that we're done but we fail to update the Progressive Status + // therefore, we publish Rollout.Status here, so if that fails, then we won't be "done" and so we'll come back in here to try again + err = r.updateMonoVertexRolloutStatus(ctx, monoVertexRollout) + if err != nil { + return 0, err + } - r.inProgressStrategyMgr.UnsetStrategy(ctx, monoVertexRollout) - monoVertexRollout.Status.ProgressiveStatus.PromotedMonoVertexStatus = nil - - // generate metrics for MonoVertex progressive rollout results - if monoVertexRollout.GetUpgradingChildStatus() != nil { - // assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same - assessmentResult := metrics.EvaluateSuccessStatusForMetrics(monoVertexRollout.GetUpgradingChildStatus().AssessmentResult) - if assessmentResult != "" { - r.customMetrics.IncMonovertexProgressiveCompleted(monoVertexRollout.GetRolloutObjectMeta().GetNamespace(), monoVertexRollout.GetRolloutObjectMeta().GetName(), - monoVertexRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(monoVertexRollout.GetUpgradingChildStatus().BasicAssessmentResult), - assessmentResult, monoVertexRollout.GetUpgradingChildStatus().ForcedSuccess) + r.inProgressStrategyMgr.UnsetStrategy(ctx, monoVertexRollout) + monoVertexRollout.Status.ProgressiveStatus.PromotedMonoVertexStatus = nil + + // generate metrics for MonoVertex progressive rollout results + if monoVertexRollout.GetUpgradingChildStatus() != nil { + // assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same + assessmentResult := metrics.EvaluateSuccessStatusForMetrics(monoVertexRollout.GetUpgradingChildStatus().AssessmentResult) + if assessmentResult != "" { + r.customMetrics.IncMonovertexProgressiveCompleted(monoVertexRollout.GetRolloutObjectMeta().GetNamespace(), monoVertexRollout.GetRolloutObjectMeta().GetName(), + monoVertexRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(monoVertexRollout.GetUpgradingChildStatus().BasicAssessmentResult), + assessmentResult, monoVertexRollout.GetUpgradingChildStatus().ForcedSuccess) + } + } + // we need to requeue one time to ensure that the newly promoted MonoVertex scales back to its rollout-defined scale + if requeueDelay == 0 { + requeueDelay = common.DefaultRequeueDelay } } - // we need to requeue one time to ensure that the newly promoted MonoVertex scales back to its rollout-defined scale - requeueDelay = common.DefaultRequeueDelay - } else { - requeueDelay = progressiveRequeueDelay } default: if needsUpdate { diff --git a/internal/controller/pipelinerollout/pipelinerollout_controller.go b/internal/controller/pipelinerollout/pipelinerollout_controller.go index 4fd43fdf..a3d287a6 100644 --- a/internal/controller/pipelinerollout/pipelinerollout_controller.go +++ b/internal/controller/pipelinerollout/pipelinerollout_controller.go @@ -446,7 +446,6 @@ func (r *PipelineRolloutReconciler) reconcile( // get current in progress strategy if there is one inProgressStrategy := r.inProgressStrategyMgr.GetStrategy(ctx, pipelineRollout) - inProgressStrategySet := inProgressStrategy != apiv1.UpgradeStrategyNoOp // clean up recyclable pipelines allDeleted, err := r.garbageCollectChildren(ctx, pipelineRollout) @@ -454,7 +453,7 @@ func (r *PipelineRolloutReconciler) reconcile( return 0, nil, err } // there are some cases that require re-queueing - if !allDeleted || inProgressStrategySet { + if !allDeleted || inProgressStrategy == apiv1.UpgradeStrategyPPND { if requeueDelay == 0 { requeueDelay = common.DefaultRequeueDelay } else { @@ -609,45 +608,52 @@ func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context, case apiv1.UpgradeStrategyProgressive: numaLogger.Debug("processing pipeline with Progressive") - done, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, pipelineRollout, existingPipelineDef, needsUpdate, r, r.client) + assessmentComplete, failed, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, pipelineRollout, existingPipelineDef, needsUpdate, r, r.client) if err != nil { return 0, err } - if done { - // update the list of riders in the Status based on our child which was just promoted - promotedPipeline, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, pipelineRollout, common.LabelValueUpgradePromoted, nil, true, r.client) - if err != nil { - return 0, err - } - currentRiderList, err := r.GetDesiredRiders(pipelineRollout, promotedPipeline.GetName(), promotedPipeline) - if err != nil { - return 0, fmt.Errorf("error getting desired Riders for pipeline %s: %s", newPipelineDef.GetName(), err) - } - r.SetCurrentRiderList(ctx, pipelineRollout, currentRiderList) + if progressiveRequeueDelay > 0 { + requeueDelay = progressiveRequeueDelay + } + if assessmentComplete { + if !failed { + // update the list of riders in the Status based on our child which was just promoted + promotedPipeline, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, pipelineRollout, common.LabelValueUpgradePromoted, nil, true, r.client) + if err != nil { + return 0, err + } + currentRiderList, err := r.GetDesiredRiders(pipelineRollout, promotedPipeline.GetName(), promotedPipeline) + if err != nil { + return 0, fmt.Errorf("error getting desired Riders for pipeline %s: %s", newPipelineDef.GetName(), err) + } + r.SetCurrentRiderList(ctx, pipelineRollout, currentRiderList) - pipelineRollout.Status.ProgressiveStatus.PromotedPipelineStatus = nil + pipelineRollout.Status.ProgressiveStatus.PromotedPipelineStatus = nil - // we need to prevent the possibility that we're done, but we fail to update the Progressive Status - // therefore, we publish Rollout.Status here, so if that fails, then we won't be "done" and so we'll come back in here to try again - err = r.updatePipelineRolloutStatus(ctx, pipelineRollout) - if err != nil { - return 0, err - } - r.inProgressStrategyMgr.UnsetStrategy(ctx, pipelineRollout) + // we need to prevent the possibility that we're done but we fail to update the Progressive Status + // therefore, we publish Rollout.Status here, so if that fails, then we won't be "done" and so we'll come back in here to try again + err = r.updatePipelineRolloutStatus(ctx, pipelineRollout) + if err != nil { + return 0, err + } + + r.inProgressStrategyMgr.UnsetStrategy(ctx, pipelineRollout) - if pipelineRollout.GetUpgradingChildStatus() != nil { - // assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same - assessmentResult := metrics.EvaluateSuccessStatusForMetrics(pipelineRollout.GetUpgradingChildStatus().AssessmentResult) - if assessmentResult != "" { - r.customMetrics.IncPipelineProgressiveCompleted(pipelineRollout.GetRolloutObjectMeta().GetNamespace(), pipelineRollout.GetRolloutObjectMeta().GetName(), - pipelineRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(pipelineRollout.GetUpgradingChildStatus().BasicAssessmentResult), - assessmentResult, pipelineRollout.GetUpgradingChildStatus().ForcedSuccess) + if pipelineRollout.GetUpgradingChildStatus() != nil { + // assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same + assessmentResult := metrics.EvaluateSuccessStatusForMetrics(pipelineRollout.GetUpgradingChildStatus().AssessmentResult) + if assessmentResult != "" { + r.customMetrics.IncPipelineProgressiveCompleted(pipelineRollout.GetRolloutObjectMeta().GetNamespace(), pipelineRollout.GetRolloutObjectMeta().GetName(), + pipelineRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(pipelineRollout.GetUpgradingChildStatus().BasicAssessmentResult), + assessmentResult, pipelineRollout.GetUpgradingChildStatus().ForcedSuccess) + } + } + // we need to requeue one time to ensure that the newly promoted Pipeline scales back to its rollout-defined scale + if requeueDelay == 0 { + requeueDelay = common.DefaultRequeueDelay } } - // we need to requeue one time to ensure that the newly promoted Pipeline scales back to its rollout-defined scale - requeueDelay = common.DefaultRequeueDelay - } else { - requeueDelay = progressiveRequeueDelay + } default: if needsUpdate && upgradeStrategyType == apiv1.UpgradeStrategyApply { diff --git a/internal/controller/progressive/progressive.go b/internal/controller/progressive/progressive.go index 3630b81e..6bf51829 100644 --- a/internal/controller/progressive/progressive.go +++ b/internal/controller/progressive/progressive.go @@ -122,7 +122,8 @@ const ( ) // return: -// - whether we're done +// - whether the assessment is complete +// - whether the assessment failed // - duration indicating the requeue delay for the controller to use for next reconciliation // - error if any func ProcessResource( @@ -132,14 +133,14 @@ func ProcessResource( promotedDifference bool, controller progressiveController, c client.Client, -) (bool, time.Duration, error) { +) (bool, bool, time.Duration, error) { // Make sure that our Promoted Child Status reflects the current promoted child promotedChildStatus := rolloutObject.GetPromotedChildStatus() if promotedChildStatus == nil || promotedChildStatus.Name != existingPromotedChild.GetName() { err := rolloutObject.ResetPromotedChildStatus(existingPromotedChild) if err != nil { - return false, 0, err + return false, false, 0, err } } @@ -147,12 +148,12 @@ func ProcessResource( var currentUpgradingChildDef *unstructured.Unstructured currentUpgradingChildDef, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, rolloutObject, common.LabelValueUpgradeTrial, nil, true, c) if err != nil { - return false, 0, err + return false, false, 0, err } else if currentUpgradingChildDef == nil { // TODO: temporary code for handling LabelValueUpgradeInProgress for backwards compatibility purposes, remove later currentUpgradingChildDef, err = ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, rolloutObject, common.LabelValueUpgradeInProgress, nil, true, c) if err != nil { - return false, 0, err + return false, false, 0, err } } @@ -161,15 +162,15 @@ func ProcessResource( // Create it _, needRequeue, err := startUpgradeProcess(ctx, rolloutObject, existingPromotedChild, controller, c) if needRequeue { - return false, common.DefaultRequeueDelay, err + return false, false, common.DefaultRequeueDelay, err } else { - return false, 0, err + return false, false, 0, err } } // nothing to do (either there's nothing to upgrade, or we just created an "upgrading" child, and it's too early to start reconciling it) if currentUpgradingChildDef == nil { - return true, 0, err + return true, false, 0, err } // There's already an Upgrading child, now process it @@ -177,7 +178,7 @@ func ProcessResource( // get UpgradingChildStatus and reset it if necessary childStatus, err := getUpgradingChildStatus(ctx, rolloutObject, currentUpgradingChildDef) if err != nil { - return false, 0, err + return false, false, 0, err } // if the Upgrading child status exists but indicates that we aren't done with upgrade process, then do postupgrade process @@ -185,27 +186,30 @@ func ProcessResource( if initializationIncomplete { needsRequeue, err := postUpgradingChildCreatedProcess(ctx, rolloutObject, existingPromotedChild, currentUpgradingChildDef, controller, c) if needsRequeue { - return false, common.DefaultRequeueDelay, err + return false, false, common.DefaultRequeueDelay, err } else { - return false, 0, err + return false, false, 0, err } } // determine if we need to replace the Upgrading child with a newer one - needsRequeue, done, err := checkForUpgradeReplacement(ctx, rolloutObject, controller, existingPromotedChild, currentUpgradingChildDef, c) - if needsRequeue || err != nil { - return false, common.DefaultRequeueDelay, err + needsRequeue, assessmentComplete, err := checkForUpgradeReplacement(ctx, rolloutObject, controller, existingPromotedChild, currentUpgradingChildDef, c) + if err != nil { + return false, false, common.DefaultRequeueDelay, err + } + if assessmentComplete { + return true, false, 0, nil } - if done { - return true, 0, nil + if needsRequeue { + return false, false, common.DefaultRequeueDelay, nil } - done, requeueDelay, err := processUpgradingChild(ctx, rolloutObject, controller, existingPromotedChild, currentUpgradingChildDef, c) + assessmentComplete, assessmentFailed, requeueDelay, err := processUpgradingChild(ctx, rolloutObject, controller, existingPromotedChild, currentUpgradingChildDef, c) if err != nil { - return false, 0, err + return false, false, 0, err } - return done, requeueDelay, nil + return assessmentComplete, assessmentFailed, requeueDelay, nil } // create the definition for the child of the Rollout which is the one labeled "upgrading" @@ -324,7 +328,8 @@ Parameters: - c: The Kubernetes client for interacting with the cluster. Returns: -- A boolean indicating if the upgrade is done. +- A boolean indicating if the assessment is complete. +- A boolean indicating if the assessment failed. - A duration indicating the requeue delay for the controller to use for next reconciliation. - An error if any issues occur during the process. */ @@ -334,13 +339,13 @@ func processUpgradingChild( controller progressiveController, existingPromotedChildDef, existingUpgradingChildDef *unstructured.Unstructured, c client.Client, -) (bool, time.Duration, error) { +) (bool, bool, time.Duration, error) { numaLogger := logger.FromContext(ctx).WithValues("upgrading child", fmt.Sprintf("%s/%s", existingUpgradingChildDef.GetNamespace(), existingUpgradingChildDef.GetName())) assessmentSchedule, err := getChildStatusAssessmentSchedule(ctx, rolloutObject) if err != nil { - return false, 0, err + return false, false, 0, err } childStatus := rolloutObject.GetUpgradingChildStatus() @@ -348,7 +353,7 @@ func processUpgradingChild( // check if we should skip the asssessment skipAssessment, reason, err := controller.SkipProgressiveAssessment(ctx, rolloutObject) if err != nil { - return false, 0, err + return false, false, 0, err } // check for "force promote" label on the child @@ -367,14 +372,14 @@ func processUpgradingChild( err := ctlrcommon.UpdateResultState(ctx, c, common.LabelValueResultStateForcePromoted, existingUpgradingChildDef) if err != nil { - return false, 0, err + return false, false, 0, err } done, err := declareSuccess(ctx, rolloutObject, controller, existingPromotedChildDef, existingUpgradingChildDef, childStatus, c) if err != nil || done { - return done, 0, err + return done, false, 0, err } else { - return done, assessmentSchedule.Interval, err + return done, false, assessmentSchedule.Interval, err } } @@ -394,7 +399,7 @@ func processUpgradingChild( if childStatus.CanAssess() { assessment, _, err = controller.AssessUpgradingChild(ctx, rolloutObject, existingUpgradingChildDef, assessmentSchedule) if err != nil { - return false, 0, err + return false, false, 0, err } numaLogger.WithValues("name", existingUpgradingChildDef.GetName(), "childStatus", *childStatus, "assessment", assessment). @@ -414,36 +419,36 @@ func processUpgradingChild( err = ctlrcommon.UpdateResultState(ctx, c, common.LabelValueResultStateFailed, existingUpgradingChildDef) if err != nil { - return false, 0, err + return true, true, 0, err } requeue, err := controller.ProcessPromotedChildPostFailure(ctx, rolloutObject, existingPromotedChildDef, c) if err != nil { - return false, 0, err + return true, true, 0, err } if requeue { - return false, common.DefaultRequeueDelay, nil + return true, true, common.DefaultRequeueDelay, nil } requeue, err = controller.ProcessUpgradingChildPostFailure(ctx, rolloutObject, existingUpgradingChildDef, c) if err != nil { - return false, 0, err + return true, true, 0, err } if requeue { - return false, common.DefaultRequeueDelay, nil + return true, true, common.DefaultRequeueDelay, nil } - return false, 0, nil + return true, true, 0, nil case apiv1.AssessmentResultSuccess: err := ctlrcommon.UpdateResultState(ctx, c, common.LabelValueResultStateSucceeded, existingUpgradingChildDef) if err != nil { - return true, 0, err + return true, false, 0, err } done, err := declareSuccess(ctx, rolloutObject, controller, existingPromotedChildDef, existingUpgradingChildDef, childStatus, c) if err != nil || done { - return done, 0, err + return done, false, 0, err } else { - return done, assessmentSchedule.Interval, err + return done, false, assessmentSchedule.Interval, err } default: @@ -451,7 +456,7 @@ func processUpgradingChild( status.AssessmentResult = apiv1.AssessmentResultUnknown }) - return false, assessmentSchedule.Interval, nil + return false, false, assessmentSchedule.Interval, nil } } diff --git a/internal/controller/progressive/progressive_test.go b/internal/controller/progressive/progressive_test.go index d95202c7..6c1debd1 100644 --- a/internal/controller/progressive/progressive_test.go +++ b/internal/controller/progressive/progressive_test.go @@ -134,6 +134,7 @@ func Test_processUpgradingChild(t *testing.T) { skipProgressiveAssessment bool existingUpgradingChildDef *numaflowv1.MonoVertex expectedDone bool + expectedFailed bool expectedRequeueDelay time.Duration expectedError error }{ @@ -157,6 +158,7 @@ func Test_processUpgradingChild(t *testing.T) { skipProgressiveAssessment: false, existingUpgradingChildDef: createMonoVertex("test-success"), expectedDone: true, + expectedFailed: false, expectedRequeueDelay: 0, expectedError: nil, }, @@ -185,7 +187,8 @@ func Test_processUpgradingChild(t *testing.T) { ), skipProgressiveAssessment: false, existingUpgradingChildDef: createMonoVertex("test-failure"), - expectedDone: false, + expectedDone: true, + expectedFailed: true, expectedRequeueDelay: 0, expectedError: nil, }, @@ -215,6 +218,7 @@ func Test_processUpgradingChild(t *testing.T) { skipProgressiveAssessment: true, existingUpgradingChildDef: createMonoVertex("test-force-promote"), expectedDone: true, + expectedFailed: false, expectedRequeueDelay: 0, expectedError: nil, }, @@ -244,6 +248,7 @@ func Test_processUpgradingChild(t *testing.T) { skipProgressiveAssessment: false, existingUpgradingChildDef: createMonoVertex("test-analysis-success"), expectedDone: true, + expectedFailed: false, expectedRequeueDelay: 0, expectedError: nil, }, @@ -272,7 +277,8 @@ func Test_processUpgradingChild(t *testing.T) { ), skipProgressiveAssessment: false, existingUpgradingChildDef: createMonoVertex("test-analysis-failure"), - expectedDone: false, + expectedDone: true, + expectedFailed: true, expectedRequeueDelay: 0, expectedError: nil, }, @@ -297,7 +303,7 @@ func Test_processUpgradingChild(t *testing.T) { assert.NoError(t, err) } - actualDone, actualRequeueDelay, actualErr := processUpgradingChild( + actualDone, actualFailed, actualRequeueDelay, actualErr := processUpgradingChild( ctx, tc.rolloutObject, fakeProgressiveController{skipProgressiveAssessment: tc.skipProgressiveAssessment}, monoVertexToUnstruct(defaultExistingPromotedChildDef), monoVertexToUnstruct(tc.existingUpgradingChildDef), client) if tc.expectedError != nil { @@ -307,6 +313,7 @@ func Test_processUpgradingChild(t *testing.T) { } else { assert.Nil(t, actualErr) assert.Equal(t, tc.expectedDone, actualDone) + assert.Equal(t, tc.expectedFailed, actualFailed) assert.Equal(t, tc.expectedRequeueDelay, actualRequeueDelay) } })