Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (r *ISBServiceRolloutReconciler) reconcile(ctx context.Context, isbServiceR
}

// if we still have interstepbufferservices that need deleting, or if we're in the middle of an upgrade strategy, then requeue
if !allDeleted || inProgressStrategy != apiv1.UpgradeStrategyNoOp {
if !allDeleted || inProgressStrategy == apiv1.UpgradeStrategyPPND {
Copy link
Collaborator Author

@juliev0 juliev0 Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case of Progressive, we have other logic which is causing us to requeue if we're in the middle of the assessment - we don't need this

if requeueDelay == 0 {
requeueDelay = common.DefaultRequeueDelay
} else {
Expand Down Expand Up @@ -454,33 +454,39 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont
case apiv1.UpgradeStrategyProgressive:
numaLogger.Debug("processing InterstepBufferService with Progressive")

done, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, isbServiceRollout, existingISBServiceDef, needsUpdate, r, r.client)
assessmentComplete, failed, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, isbServiceRollout, existingISBServiceDef, needsUpdate, r, r.client)
if err != nil {
return 0, fmt.Errorf("error processing isbsvc with progressive: %s", err.Error())
}
if done {
// update the list of riders in the Status based on our child which was just promoted
promotedISBService, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, isbServiceRollout, common.LabelValueUpgradePromoted, nil, true, r.client)
if err != nil {
return 0, err
}

currentRiderList, err := r.GetDesiredRiders(isbServiceRollout, promotedISBService.GetName(), promotedISBService)
if err != nil {
return 0, fmt.Errorf("error getting desired Riders for pipeline %s: %s", newISBServiceDef.GetName(), err)
}
r.SetCurrentRiderList(ctx, isbServiceRollout, currentRiderList)

r.inProgressStrategyMgr.UnsetStrategy(ctx, isbServiceRollout)
requeueDelay := time.Duration(0)
if progressiveRequeueDelay > 0 {
requeueDelay = progressiveRequeueDelay
}
if assessmentComplete {
if !failed {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github is making it seem like the code in this block failed, but it hasn't. Only the if done got changed to:

if assessmentComplete {
			if !failed {

// update the list of riders in the Status based on our child which was just promoted
promotedISBService, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, isbServiceRollout, common.LabelValueUpgradePromoted, nil, true, r.client)
if err != nil {
return 0, err
}

// Update metrics for progressive rollout
if isbServiceRollout.GetUpgradingChildStatus() != nil {
// assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same
assessmentResult := metrics.EvaluateSuccessStatusForMetrics(isbServiceRollout.GetUpgradingChildStatus().AssessmentResult)
if assessmentResult != "" {
r.customMetrics.IncISBServiceProgressiveCompleted(isbServiceRollout.GetRolloutObjectMeta().GetNamespace(), isbServiceRollout.GetRolloutObjectMeta().GetName(),
isbServiceRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(isbServiceRollout.GetUpgradingChildStatus().BasicAssessmentResult),
assessmentResult, isbServiceRollout.GetUpgradingChildStatus().ForcedSuccess)
currentRiderList, err := r.GetDesiredRiders(isbServiceRollout, promotedISBService.GetName(), promotedISBService)
if err != nil {
return 0, fmt.Errorf("error getting desired Riders for pipeline %s: %s", newISBServiceDef.GetName(), err)
}
r.SetCurrentRiderList(ctx, isbServiceRollout, currentRiderList)

r.inProgressStrategyMgr.UnsetStrategy(ctx, isbServiceRollout)

// Update metrics for progressive rollout
if isbServiceRollout.GetUpgradingChildStatus() != nil {
// assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same
assessmentResult := metrics.EvaluateSuccessStatusForMetrics(isbServiceRollout.GetUpgradingChildStatus().AssessmentResult)
if assessmentResult != "" {
r.customMetrics.IncISBServiceProgressiveCompleted(isbServiceRollout.GetRolloutObjectMeta().GetNamespace(), isbServiceRollout.GetRolloutObjectMeta().GetName(),
isbServiceRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(isbServiceRollout.GetUpgradingChildStatus().BasicAssessmentResult),
assessmentResult, isbServiceRollout.GetUpgradingChildStatus().ForcedSuccess)
}
}
}
} else {
Expand All @@ -498,8 +504,9 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont
}

// requeue using the provided delay
return progressiveRequeueDelay, nil
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requeue if we're mid-assessment

requeueDelay = progressiveRequeueDelay
}
return requeueDelay, nil
case apiv1.UpgradeStrategyApply:
// update ISBService
err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef, needsRecreate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,14 @@ func (r *MonoVertexRolloutReconciler) reconcile(ctx context.Context, monoVertexR
}
}

inProgressStrategy := r.inProgressStrategyMgr.GetStrategy(ctx, monoVertexRollout)

// clean up recyclable monovertices
allDeleted, err := ctlrcommon.GarbageCollectChildren(ctx, monoVertexRollout, r, r.client)
if err != nil {
return ctrl.Result{}, err
}

// if we still have monovertices that need deleting, or if we're in the middle of an upgrade strategy, then requeue
if !allDeleted || inProgressStrategy != apiv1.UpgradeStrategyNoOp {
if !allDeleted {
if requeueDelay == 0 {
requeueDelay = common.DefaultRequeueDelay
} else {
Expand Down Expand Up @@ -357,53 +355,57 @@ func (r *MonoVertexRolloutReconciler) processExistingMonoVertex(ctx context.Cont
}
}

done, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, monoVertexRollout, existingMonoVertexDef, needsUpdate, r, r.client)
assessmentComplete, failed, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, monoVertexRollout, existingMonoVertexDef, needsUpdate, r, r.client)
if err != nil {
return 0, err
}
if done {

// update the list of riders in the Status based on our child which was just promoted
promotedMonoVertex, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, monoVertexRollout, common.LabelValueUpgradePromoted, nil, true, r.client)
if err != nil {
return 0, err
}
if progressiveRequeueDelay > 0 {
requeueDelay = progressiveRequeueDelay
}
if assessmentComplete {
if !failed {
// update the list of riders in the Status based on our child which was just promoted
promotedMonoVertex, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, monoVertexRollout, common.LabelValueUpgradePromoted, nil, true, r.client)
if err != nil {
return 0, err
}

// update promotedPodSelector immediately so VPA targets the newly promoted child's pods
// without waiting for the next reconcile
monoVertexRollout.Status.PromotedPodSelector = buildPromotedPodSelector(promotedMonoVertex.GetName())
// update promotedPodSelector immediately so VPA targets the newly promoted child's pods
// without waiting for the next reconcile
monoVertexRollout.Status.PromotedPodSelector = buildPromotedPodSelector(promotedMonoVertex.GetName())

currentRiderList, err := r.GetDesiredRiders(monoVertexRollout, promotedMonoVertex.GetName(), promotedMonoVertex)
if err != nil {
return 0, fmt.Errorf("error getting desired Riders for MonoVertex %s: %s", newMonoVertexDef.GetName(), err)
}
r.SetCurrentRiderList(ctx, monoVertexRollout, currentRiderList)
currentRiderList, err := r.GetDesiredRiders(monoVertexRollout, promotedMonoVertex.GetName(), promotedMonoVertex)
if err != nil {
return 0, fmt.Errorf("error getting desired Riders for MonoVertex %s: %s", newMonoVertexDef.GetName(), err)
}
r.SetCurrentRiderList(ctx, monoVertexRollout, currentRiderList)

// we need to prevent the possibility that we're done but we fail to update the Progressive Status
// therefore, we publish Rollout.Status here, so if that fails, then we won't be "done" and so we'll come back in here to try again
err = r.updateMonoVertexRolloutStatus(ctx, monoVertexRollout)
if err != nil {
return 0, err
}
// we need to prevent the possibility that we're done but we fail to update the Progressive Status
// therefore, we publish Rollout.Status here, so if that fails, then we won't be "done" and so we'll come back in here to try again
err = r.updateMonoVertexRolloutStatus(ctx, monoVertexRollout)
if err != nil {
return 0, err
}

r.inProgressStrategyMgr.UnsetStrategy(ctx, monoVertexRollout)
monoVertexRollout.Status.ProgressiveStatus.PromotedMonoVertexStatus = nil

// generate metrics for MonoVertex progressive rollout results
if monoVertexRollout.GetUpgradingChildStatus() != nil {
// assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same
assessmentResult := metrics.EvaluateSuccessStatusForMetrics(monoVertexRollout.GetUpgradingChildStatus().AssessmentResult)
if assessmentResult != "" {
r.customMetrics.IncMonovertexProgressiveCompleted(monoVertexRollout.GetRolloutObjectMeta().GetNamespace(), monoVertexRollout.GetRolloutObjectMeta().GetName(),
monoVertexRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(monoVertexRollout.GetUpgradingChildStatus().BasicAssessmentResult),
assessmentResult, monoVertexRollout.GetUpgradingChildStatus().ForcedSuccess)
r.inProgressStrategyMgr.UnsetStrategy(ctx, monoVertexRollout)
monoVertexRollout.Status.ProgressiveStatus.PromotedMonoVertexStatus = nil

// generate metrics for MonoVertex progressive rollout results
if monoVertexRollout.GetUpgradingChildStatus() != nil {
// assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same
assessmentResult := metrics.EvaluateSuccessStatusForMetrics(monoVertexRollout.GetUpgradingChildStatus().AssessmentResult)
if assessmentResult != "" {
r.customMetrics.IncMonovertexProgressiveCompleted(monoVertexRollout.GetRolloutObjectMeta().GetNamespace(), monoVertexRollout.GetRolloutObjectMeta().GetName(),
monoVertexRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(monoVertexRollout.GetUpgradingChildStatus().BasicAssessmentResult),
assessmentResult, monoVertexRollout.GetUpgradingChildStatus().ForcedSuccess)
}
}
// we need to requeue one time to ensure that the newly promoted MonoVertex scales back to its rollout-defined scale
if requeueDelay == 0 {
requeueDelay = common.DefaultRequeueDelay
}
}
// we need to requeue one time to ensure that the newly promoted MonoVertex scales back to its rollout-defined scale
requeueDelay = common.DefaultRequeueDelay

} else {
requeueDelay = progressiveRequeueDelay
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this requeueing is taken care of at the top where we now do this:

if progressiveRequeueDelay > 0 {
		requeueDelay = progressiveRequeueDelay
}

}
default:
if needsUpdate {
Expand Down
72 changes: 39 additions & 33 deletions internal/controller/pipelinerollout/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,14 @@ func (r *PipelineRolloutReconciler) reconcile(

// get current in progress strategy if there is one
inProgressStrategy := r.inProgressStrategyMgr.GetStrategy(ctx, pipelineRollout)
inProgressStrategySet := inProgressStrategy != apiv1.UpgradeStrategyNoOp

// clean up recyclable pipelines
allDeleted, err := r.garbageCollectChildren(ctx, pipelineRollout)
if err != nil {
return 0, nil, err
}
// there are some cases that require re-queueing
if !allDeleted || inProgressStrategySet {
if !allDeleted || inProgressStrategy == apiv1.UpgradeStrategyPPND {
if requeueDelay == 0 {
requeueDelay = common.DefaultRequeueDelay
} else {
Expand Down Expand Up @@ -609,45 +608,52 @@ func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context,
case apiv1.UpgradeStrategyProgressive:
numaLogger.Debug("processing pipeline with Progressive")

done, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, pipelineRollout, existingPipelineDef, needsUpdate, r, r.client)
assessmentComplete, failed, progressiveRequeueDelay, err := progressive.ProcessResource(ctx, pipelineRollout, existingPipelineDef, needsUpdate, r, r.client)
if err != nil {
return 0, err
}
if done {
// update the list of riders in the Status based on our child which was just promoted
promotedPipeline, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, pipelineRollout, common.LabelValueUpgradePromoted, nil, true, r.client)
if err != nil {
return 0, err
}
currentRiderList, err := r.GetDesiredRiders(pipelineRollout, promotedPipeline.GetName(), promotedPipeline)
if err != nil {
return 0, fmt.Errorf("error getting desired Riders for pipeline %s: %s", newPipelineDef.GetName(), err)
}
r.SetCurrentRiderList(ctx, pipelineRollout, currentRiderList)
if progressiveRequeueDelay > 0 {
requeueDelay = progressiveRequeueDelay
}
if assessmentComplete {
if !failed {
// update the list of riders in the Status based on our child which was just promoted
promotedPipeline, err := ctlrcommon.FindMostCurrentChildOfUpgradeState(ctx, pipelineRollout, common.LabelValueUpgradePromoted, nil, true, r.client)
if err != nil {
return 0, err
}
currentRiderList, err := r.GetDesiredRiders(pipelineRollout, promotedPipeline.GetName(), promotedPipeline)
if err != nil {
return 0, fmt.Errorf("error getting desired Riders for pipeline %s: %s", newPipelineDef.GetName(), err)
}
r.SetCurrentRiderList(ctx, pipelineRollout, currentRiderList)

pipelineRollout.Status.ProgressiveStatus.PromotedPipelineStatus = nil
pipelineRollout.Status.ProgressiveStatus.PromotedPipelineStatus = nil

// we need to prevent the possibility that we're done, but we fail to update the Progressive Status
// therefore, we publish Rollout.Status here, so if that fails, then we won't be "done" and so we'll come back in here to try again
err = r.updatePipelineRolloutStatus(ctx, pipelineRollout)
if err != nil {
return 0, err
}
r.inProgressStrategyMgr.UnsetStrategy(ctx, pipelineRollout)
// we need to prevent the possibility that we're done but we fail to update the Progressive Status
// therefore, we publish Rollout.Status here, so if that fails, then we won't be "done" and so we'll come back in here to try again
err = r.updatePipelineRolloutStatus(ctx, pipelineRollout)
if err != nil {
return 0, err
}

r.inProgressStrategyMgr.UnsetStrategy(ctx, pipelineRollout)

if pipelineRollout.GetUpgradingChildStatus() != nil {
// assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same
assessmentResult := metrics.EvaluateSuccessStatusForMetrics(pipelineRollout.GetUpgradingChildStatus().AssessmentResult)
if assessmentResult != "" {
r.customMetrics.IncPipelineProgressiveCompleted(pipelineRollout.GetRolloutObjectMeta().GetNamespace(), pipelineRollout.GetRolloutObjectMeta().GetName(),
pipelineRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(pipelineRollout.GetUpgradingChildStatus().BasicAssessmentResult),
assessmentResult, pipelineRollout.GetUpgradingChildStatus().ForcedSuccess)
if pipelineRollout.GetUpgradingChildStatus() != nil {
// assessmentResult value indicates that the progressive rollout is completed, so we can generate the metrics for the same
assessmentResult := metrics.EvaluateSuccessStatusForMetrics(pipelineRollout.GetUpgradingChildStatus().AssessmentResult)
if assessmentResult != "" {
r.customMetrics.IncPipelineProgressiveCompleted(pipelineRollout.GetRolloutObjectMeta().GetNamespace(), pipelineRollout.GetRolloutObjectMeta().GetName(),
pipelineRollout.GetUpgradingChildStatus().Name, metrics.EvaluateSuccessStatusForMetrics(pipelineRollout.GetUpgradingChildStatus().BasicAssessmentResult),
assessmentResult, pipelineRollout.GetUpgradingChildStatus().ForcedSuccess)
}
}
// we need to requeue one time to ensure that the newly promoted Pipeline scales back to its rollout-defined scale
if requeueDelay == 0 {
requeueDelay = common.DefaultRequeueDelay
}
}
// we need to requeue one time to ensure that the newly promoted Pipeline scales back to its rollout-defined scale
requeueDelay = common.DefaultRequeueDelay
} else {
requeueDelay = progressiveRequeueDelay

}
default:
if needsUpdate && upgradeStrategyType == apiv1.UpgradeStrategyApply {
Expand Down
Loading
Loading