Skip to content

Commit 71c8fea

Browse files
Merge pull request #6370 from devtron-labs/fix-ci-cd-workflow-status-oss
fix: fix for ci cd workflow fake success status and multiple post/pre cd success notifications
2 parents e537e28 + 0b9749b commit 71c8fea

File tree

3 files changed

+66
-50
lines changed

3 files changed

+66
-50
lines changed

pkg/eventProcessor/in/WorkflowEventProcessorService.go

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -436,51 +436,47 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDWorkflowStatusUpdate() error
436436
return
437437
}
438438

439-
wfrId, wfrStatus, err := impl.cdHandler.UpdateWorkflow(wfStatus)
439+
wfrId, wfrStatus, stateChanged, err := impl.cdHandler.UpdateWorkflow(wfStatus)
440440
impl.logger.Debugw("UpdateWorkflow", "wfrId", wfrId, "wfrStatus", wfrStatus)
441441
if err != nil {
442442
impl.logger.Error("err", err)
443443
return
444444
}
445-
446-
wfr, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(wfrId)
447-
if err != nil {
448-
impl.logger.Errorw("could not get wf runner", "err", err)
449-
return
450-
}
451-
if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
452-
if len(wfr.ImagePathReservationIds) > 0 {
453-
err := impl.cdHandler.DeactivateImageReservationPathsOnFailure(wfr.ImagePathReservationIds)
454-
if err != nil {
455-
impl.logger.Errorw("error in removing image path reservation ")
456-
}
457-
}
458-
}
459-
if wfrStatus == string(v1alpha1.NodeSucceeded) || wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
460-
eventType := eventUtil.EventType(0)
461-
if wfrStatus == string(v1alpha1.NodeSucceeded) {
462-
eventType = eventUtil.Success
463-
} else if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
464-
eventType = eventUtil.Fail
445+
if stateChanged {
446+
wfr, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(wfrId)
447+
if err != nil {
448+
impl.logger.Errorw("could not get wf runner", "wfrId", wfrId, "err", err)
449+
return
465450
}
466-
467-
if wfr != nil && executors.CheckIfReTriggerRequired(wfrStatus, wfStatus.Message, wfr.Status) {
468-
err = impl.workflowDagExecutor.HandleCdStageReTrigger(wfr)
469-
if err != nil {
470-
//check if this log required or not
471-
impl.logger.Errorw("error in HandleCdStageReTrigger", "error", err)
451+
if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
452+
if len(wfr.ImagePathReservationIds) > 0 {
453+
err := impl.cdHandler.DeactivateImageReservationPathsOnFailure(wfr.ImagePathReservationIds)
454+
if err != nil {
455+
impl.logger.Errorw("error in removing image path reservation ", "imagePathReservationIds", wfr.ImagePathReservationIds, "err", err)
456+
}
472457
}
473458
}
459+
if wfrStatus == string(v1alpha1.NodeSucceeded) || wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
460+
eventType := eventUtil.EventType(0)
461+
if wfrStatus == string(v1alpha1.NodeSucceeded) {
462+
eventType = eventUtil.Success
463+
} else if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) {
464+
eventType = eventUtil.Fail
465+
}
474466

475-
if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_PRE || wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST {
476-
event, _ := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, eventUtil.CD)
477-
impl.logger.Debugw("event pre stage", "event", event)
478-
event = impl.eventFactory.BuildExtraCDData(event, wfr, 0, wfr.WorkflowType)
479-
_, evtErr := impl.eventClient.WriteNotificationEvent(event)
480-
if evtErr != nil {
481-
impl.logger.Errorw("CD stage post fail or success event unable to sent", "error", evtErr)
467+
if wfr != nil && executors.CheckIfReTriggerRequired(wfrStatus, wfStatus.Message, wfr.Status) {
468+
err = impl.workflowDagExecutor.HandleCdStageReTrigger(wfr)
469+
if err != nil {
470+
//check if this log required or not
471+
impl.logger.Errorw("error in HandleCdStageReTrigger", "workflowRunnerId", wfr.Id, "workflowStatus", wfrStatus, "workflowStatusMessage", wfStatus.Message, "error", err)
472+
}
473+
impl.logger.Debugw("re-triggered cd stage", "workflowRunnerId", wfr.Id, "workflowStatus", wfrStatus, "workflowStatusMessage", wfStatus.Message)
474+
} else {
475+
impl.sendPrePostCdNotificationEvent(eventType, wfr)
482476
}
483477
}
478+
} else {
479+
impl.logger.Debugw("no state change detected for the cd workflow status update, ignoring this event", "workflowRunnerId", wfrId, "wfrStatus", wfrStatus)
484480
}
485481
}
486482

@@ -503,6 +499,18 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDWorkflowStatusUpdate() error
503499
return nil
504500
}
505501

502+
func (impl *WorkflowEventProcessorImpl) sendPrePostCdNotificationEvent(eventType eventUtil.EventType, wfr *pipelineConfig.CdWorkflowRunner) {
503+
if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_PRE || wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST {
504+
event, _ := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, eventUtil.CD)
505+
impl.logger.Debugw("event pre stage", "event", event)
506+
event = impl.eventFactory.BuildExtraCDData(event, wfr, 0, wfr.WorkflowType)
507+
_, evtErr := impl.eventClient.WriteNotificationEvent(event)
508+
if evtErr != nil {
509+
impl.logger.Errorw("CD stage post fail or success event unable to sent", "error", evtErr)
510+
}
511+
}
512+
}
513+
506514
func (impl *WorkflowEventProcessorImpl) extractCiCompleteEventFrom(msg *model.PubSubMsg) (bean.CiCompleteEvent, error) {
507515
ciCompleteEvent := bean.CiCompleteEvent{}
508516
err := json.Unmarshal([]byte(msg.Data), &ciCompleteEvent)

pkg/pipeline/CdHandler.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ const (
6868
)
6969

7070
type CdHandler interface {
71-
UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error)
71+
UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, bool, error)
7272
GetCdBuildHistory(appId int, environmentId int, pipelineId int, offset int, size int) ([]pipelineBean.CdWorkflowWithArtifact, error)
7373
GetRunningWorkflowLogs(environmentId int, pipelineId int, workflowId int) (*bufio.Reader, func() error, error)
7474
FetchCdWorkflowDetails(appId int, environmentId int, pipelineId int, buildId int) (types.WorkflowResponse, error)
@@ -262,23 +262,23 @@ func (impl *CdHandlerImpl) handleForceAbortCaseForCdStage(workflowRunner *pipeli
262262
return nil
263263
}
264264

265-
func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error) {
265+
func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, bool, error) {
266266
wfStatusRs := impl.extractWorkfowStatus(workflowStatus)
267267
workflowName, status, podStatus, message, podName := wfStatusRs.WorkflowName, wfStatusRs.Status, wfStatusRs.PodStatus, wfStatusRs.Message, wfStatusRs.PodName
268268
impl.Logger.Debugw("cd update for ", "wf ", workflowName, "status", status)
269269
if workflowName == "" {
270-
return 0, "", errors.New("invalid wf name")
270+
return 0, "", false, errors.New("invalid wf name")
271271
}
272272
workflowId, err := strconv.Atoi(workflowName[:strings.Index(workflowName, "-")])
273273
if err != nil {
274274
impl.Logger.Error("invalid wf status update req", "err", err)
275-
return 0, "", err
275+
return 0, "", false, err
276276
}
277277

278278
savedWorkflow, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(workflowId)
279279
if err != nil {
280280
impl.Logger.Error("cannot get saved wf", "err", err)
281-
return 0, "", err
281+
return 0, "", false, err
282282
}
283283

284284
cdArtifactLocationFormat := impl.config.GetArtifactLocationFormat()
@@ -299,21 +299,21 @@ func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus
299299
err = impl.cdWorkflowRunnerService.UpdateCdWorkflowRunnerWithStage(savedWorkflow)
300300
if err != nil {
301301
impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id))
302-
return 0, "", err
302+
return 0, "", true, err
303303
}
304304
appId := savedWorkflow.CdWorkflow.Pipeline.AppId
305305
envId := savedWorkflow.CdWorkflow.Pipeline.EnvironmentId
306306
envDeploymentConfig, err := impl.deploymentConfigService.GetConfigForDevtronApps(appId, envId)
307307
if err != nil {
308308
impl.Logger.Errorw("error in fetching environment deployment config by appId and envId", "appId", appId, "envId", envId, "err", err)
309-
return 0, "", err
309+
return 0, "", true, err
310310
}
311311
util3.TriggerCDMetrics(cdWorkflow.GetTriggerMetricsFromRunnerObj(savedWorkflow, envDeploymentConfig), impl.config.ExposeCDMetrics)
312312
if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status {
313313
impl.Logger.Warnw("cd stage failed for workflow: ", "wfId", savedWorkflow.Id)
314314
}
315315
}
316-
return savedWorkflow.Id, savedWorkflow.Status, nil
316+
return savedWorkflow.Id, savedWorkflow.Status, false, nil
317317
}
318318

319319
func (impl *CdHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.WorkflowStatus) *types.WorkflowStatus {

pkg/pipeline/CiHandler.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,17 +1223,25 @@ func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus
12231223
impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id))
12241224
return 0, err
12251225
}
1226-
if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status {
1227-
impl.Logger.Warnw("ci failed for workflow: ", "wfId", savedWorkflow.Id)
1226+
impl.sendCIFailEvent(savedWorkflow, status, message)
1227+
}
1228+
return savedWorkflow.Id, nil
1229+
}
12281230

1229-
if extractErrorCode(savedWorkflow.Message) != workFlow.CiStageFailErrorCode {
1230-
go impl.WriteCIFailEvent(savedWorkflow)
1231-
} else {
1232-
impl.Logger.Infof("Step failed notification received for wfID %d with message %s", savedWorkflow.Id, savedWorkflow.Message)
1233-
}
1231+
func (impl *CiHandlerImpl) sendCIFailEvent(savedWorkflow *pipelineConfig.CiWorkflow, status, message string) {
1232+
if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status {
1233+
if executors.CheckIfReTriggerRequired(status, message, savedWorkflow.Status) {
1234+
impl.Logger.Infow("not sending failure notification for re-trigger workflow", "workflowId", savedWorkflow.Id)
1235+
return
1236+
}
1237+
impl.Logger.Warnw("ci failed for workflow: ", "wfId", savedWorkflow.Id)
1238+
1239+
if extractErrorCode(savedWorkflow.Message) != workFlow.CiStageFailErrorCode {
1240+
go impl.WriteCIFailEvent(savedWorkflow)
1241+
} else {
1242+
impl.Logger.Infof("Step failed notification received for wfID %d with message %s", savedWorkflow.Id, savedWorkflow.Message)
12341243
}
12351244
}
1236-
return savedWorkflow.Id, nil
12371245
}
12381246

12391247
func extractErrorCode(msg string) int {

0 commit comments

Comments
 (0)