Skip to content

Commit ecd0d33

Browse files
committed
fix: ignore handling of the cd workflow status update event no state change is detected since last update status for the wfr
1 parent ae42e7b commit ecd0d33

File tree

3 files changed

+66
-50
lines changed

3 files changed

+66
-50
lines changed

pkg/eventProcessor/in/WorkflowEventProcessorService.go

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -436,51 +436,47 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDWorkflowStatusUpdate() error
436436
return
437437
}
438438

439-
wfrId, wfrStatus, err := impl.cdHandler.UpdateWorkflow(wfStatus)
439+
wfrId, wfrStatus, stateChanged, err := impl.cdHandler.UpdateWorkflow(wfStatus)
440440
impl.logger.Debugw("UpdateWorkflow", "wfrId", wfrId, "wfrStatus", wfrStatus)
441441
if err != nil {
442442
impl.logger.Error("err", err)
443443
return
444444
}
445-
446-
wfr, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(wfrId)
447-
if err != nil {
448-
impl.logger.Errorw("could not get wf runner", "err", err)
449-
return
450-
}
451-
if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
452-
if len(wfr.ImagePathReservationIds) > 0 {
453-
err := impl.cdHandler.DeactivateImageReservationPathsOnFailure(wfr.ImagePathReservationIds)
454-
if err != nil {
455-
impl.logger.Errorw("error in removing image path reservation ")
456-
}
457-
}
458-
}
459-
if wfrStatus == string(v1alpha1.NodeSucceeded) || wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
460-
eventType := eventUtil.EventType(0)
461-
if wfrStatus == string(v1alpha1.NodeSucceeded) {
462-
eventType = eventUtil.Success
463-
} else if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
464-
eventType = eventUtil.Fail
445+
if stateChanged {
446+
wfr, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(wfrId)
447+
if err != nil {
448+
impl.logger.Errorw("could not get wf runner", "wfrId", wfrId, "err", err)
449+
return
465450
}
466-
467-
if wfr != nil && executors.CheckIfReTriggerRequired(wfrStatus, wfStatus.Message, wfr.Status) {
468-
err = impl.workflowDagExecutor.HandleCdStageReTrigger(wfr)
469-
if err != nil {
470-
//check if this log required or not
471-
impl.logger.Errorw("error in HandleCdStageReTrigger", "error", err)
451+
if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
452+
if len(wfr.ImagePathReservationIds) > 0 {
453+
err := impl.cdHandler.DeactivateImageReservationPathsOnFailure(wfr.ImagePathReservationIds)
454+
if err != nil {
455+
impl.logger.Errorw("error in removing image path reservation ", "imagePathReservationIds", wfr.ImagePathReservationIds, "err", err)
456+
}
472457
}
473458
}
459+
if wfrStatus == string(v1alpha1.NodeSucceeded) || wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
460+
eventType := eventUtil.EventType(0)
461+
if wfrStatus == string(v1alpha1.NodeSucceeded) {
462+
eventType = eventUtil.Success
463+
} else if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
464+
eventType = eventUtil.Fail
465+
}
474466

475-
if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_PRE || wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST {
476-
event, _ := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, eventUtil.CD)
477-
impl.logger.Debugw("event pre stage", "event", event)
478-
event = impl.eventFactory.BuildExtraCDData(event, wfr, 0, wfr.WorkflowType)
479-
_, evtErr := impl.eventClient.WriteNotificationEvent(event)
480-
if evtErr != nil {
481-
impl.logger.Errorw("CD stage post fail or success event unable to sent", "error", evtErr)
467+
if wfr != nil && executors.CheckIfReTriggerRequired(wfrStatus, wfStatus.Message, wfr.Status) {
468+
err = impl.workflowDagExecutor.HandleCdStageReTrigger(wfr)
469+
if err != nil {
470+
//check if this log required or not
471+
impl.logger.Errorw("error in HandleCdStageReTrigger", "workflowRunnerId", wfr.Id, "workflowStatus", wfrStatus, "workflowStatusMessage", wfStatus.Message, "error", err)
472+
}
473+
impl.logger.Debugw("re-triggered cd stage", "workflowRunnerId", wfr.Id, "workflowStatus", wfrStatus, "workflowStatusMessage", wfStatus.Message)
474+
} else {
475+
impl.sendPrePostCdNotificationEvent(eventType, wfr)
482476
}
483477
}
478+
} else {
479+
impl.logger.Debugw("no state change detected for the cd workflow status update, ignoring this event", "workflowRunnerId", wfrId, "wfrStatus", wfrStatus)
484480
}
485481
}
486482

@@ -503,6 +499,18 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDWorkflowStatusUpdate() error
503499
return nil
504500
}
505501

502+
func (impl *WorkflowEventProcessorImpl) sendPrePostCdNotificationEvent(eventType eventUtil.EventType, wfr *pipelineConfig.CdWorkflowRunner) {
503+
if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_PRE || wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST {
504+
event, _ := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, eventUtil.CD)
505+
impl.logger.Debugw("event pre stage", "event", event)
506+
event = impl.eventFactory.BuildExtraCDData(event, wfr, 0, wfr.WorkflowType)
507+
_, evtErr := impl.eventClient.WriteNotificationEvent(event)
508+
if evtErr != nil {
509+
impl.logger.Errorw("CD stage post fail or success event unable to sent", "error", evtErr)
510+
}
511+
}
512+
}
513+
506514
func (impl *WorkflowEventProcessorImpl) extractCiCompleteEventFrom(msg *model.PubSubMsg) (bean.CiCompleteEvent, error) {
507515
ciCompleteEvent := bean.CiCompleteEvent{}
508516
err := json.Unmarshal([]byte(msg.Data), &ciCompleteEvent)

pkg/pipeline/CdHandler.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ const (
6464
)
6565

6666
type CdHandler interface {
67-
UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error)
67+
UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, bool, error)
6868
GetCdBuildHistory(appId int, environmentId int, pipelineId int, offset int, size int) ([]pipelineBean.CdWorkflowWithArtifact, error)
6969
GetRunningWorkflowLogs(environmentId int, pipelineId int, workflowId int) (*bufio.Reader, func() error, error)
7070
FetchCdWorkflowDetails(appId int, environmentId int, pipelineId int, buildId int) (types.WorkflowResponse, error)
@@ -252,23 +252,23 @@ func (impl *CdHandlerImpl) handleForceAbortCaseForCdStage(workflowRunner *pipeli
252252
return nil
253253
}
254254

255-
func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error) {
255+
func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, bool, error) {
256256
wfStatusRs := impl.extractWorkfowStatus(workflowStatus)
257257
workflowName, status, podStatus, message, podName := wfStatusRs.WorkflowName, wfStatusRs.Status, wfStatusRs.PodStatus, wfStatusRs.Message, wfStatusRs.PodName
258258
impl.Logger.Debugw("cd update for ", "wf ", workflowName, "status", status)
259259
if workflowName == "" {
260-
return 0, "", errors.New("invalid wf name")
260+
return 0, "", false, errors.New("invalid wf name")
261261
}
262262
workflowId, err := strconv.Atoi(workflowName[:strings.Index(workflowName, "-")])
263263
if err != nil {
264264
impl.Logger.Error("invalid wf status update req", "err", err)
265-
return 0, "", err
265+
return 0, "", false, err
266266
}
267267

268268
savedWorkflow, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(workflowId)
269269
if err != nil {
270270
impl.Logger.Error("cannot get saved wf", "err", err)
271-
return 0, "", err
271+
return 0, "", false, err
272272
}
273273

274274
cdArtifactLocationFormat := impl.config.GetArtifactLocationFormat()
@@ -289,21 +289,21 @@ func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus
289289
err = impl.cdWorkflowRepository.UpdateWorkFlowRunner(savedWorkflow)
290290
if err != nil {
291291
impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id))
292-
return 0, "", err
292+
return 0, "", true, err
293293
}
294294
appId := savedWorkflow.CdWorkflow.Pipeline.AppId
295295
envId := savedWorkflow.CdWorkflow.Pipeline.EnvironmentId
296296
envDeploymentConfig, err := impl.deploymentConfigService.GetConfigForDevtronApps(appId, envId)
297297
if err != nil {
298298
impl.Logger.Errorw("error in fetching environment deployment config by appId and envId", "appId", appId, "envId", envId, "err", err)
299-
return 0, "", err
299+
return 0, "", true, err
300300
}
301301
util3.TriggerCDMetrics(cdWorkflow.GetTriggerMetricsFromRunnerObj(savedWorkflow, envDeploymentConfig), impl.config.ExposeCDMetrics)
302302
if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status {
303303
impl.Logger.Warnw("cd stage failed for workflow: ", "wfId", savedWorkflow.Id)
304304
}
305305
}
306-
return savedWorkflow.Id, savedWorkflow.Status, nil
306+
return savedWorkflow.Id, savedWorkflow.Status, false, nil
307307
}
308308

309309
func (impl *CdHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.WorkflowStatus) *types.WorkflowStatus {

pkg/pipeline/CiHandler.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,17 +1206,25 @@ func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus
12061206
impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id))
12071207
return 0, err
12081208
}
1209-
if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status {
1210-
impl.Logger.Warnw("ci failed for workflow: ", "wfId", savedWorkflow.Id)
1209+
impl.sendCIFailEvent(savedWorkflow, status, message)
1210+
}
1211+
return savedWorkflow.Id, nil
1212+
}
12111213

1212-
if extractErrorCode(savedWorkflow.Message) != workFlow.CiStageFailErrorCode {
1213-
go impl.WriteCIFailEvent(savedWorkflow)
1214-
} else {
1215-
impl.Logger.Infof("Step failed notification received for wfID %d with message %s", savedWorkflow.Id, savedWorkflow.Message)
1216-
}
1214+
func (impl *CiHandlerImpl) sendCIFailEvent(savedWorkflow *pipelineConfig.CiWorkflow, status, message string) {
1215+
if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status {
1216+
if executors.CheckIfReTriggerRequired(status, message, savedWorkflow.Status) {
1217+
impl.Logger.Infow("not sending failure notification for re-trigger workflow", "workflowId", savedWorkflow.Id)
1218+
return
1219+
}
1220+
impl.Logger.Warnw("ci failed for workflow: ", "wfId", savedWorkflow.Id)
1221+
1222+
if extractErrorCode(savedWorkflow.Message) != workFlow.CiStageFailErrorCode {
1223+
go impl.WriteCIFailEvent(savedWorkflow)
1224+
} else {
1225+
impl.Logger.Infof("Step failed notification received for wfID %d with message %s", savedWorkflow.Id, savedWorkflow.Message)
12171226
}
12181227
}
1219-
return savedWorkflow.Id, nil
12201228
}
12211229

12221230
func extractErrorCode(msg string) int {

0 commit comments

Comments
 (0)