Skip to content

Commit 06d4e95

Browse files
committed
merge conflicts + code refactor
1 parent 4741208 commit 06d4e95

File tree

6 files changed

+56
-67
lines changed

6 files changed

+56
-67
lines changed

pkg/pipeline/CdHandler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ func (impl *CdHandlerImpl) GetCdBuildHistory(appId int, environmentId int, pipel
494494
wfIdToWfTypeMap[item.Id] = item
495495
}
496496
}
497-
wfRunnerIdToStageDetailMap, err := impl.workflowStageStatusService.GetPrePostWorkflowStagesByWorkflowRunnerIdsList(wfIdToWfTypeMap)
497+
wfRunnerIdToStageDetailMap, err := impl.cdWorkflowRunnerService.GetPrePostWorkflowStagesByWorkflowRunnerIdsList(wfIdToWfTypeMap)
498498
if err != nil {
499499
impl.Logger.Errorw("error in fetching pre/post stage data", "err", err)
500500
return cdWorkflowArtifact, err
@@ -640,7 +640,7 @@ func (impl *CdHandlerImpl) FetchCdWorkflowDetails(appId int, environmentId int,
640640
if workflowR.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE || workflowR.WorkflowType == bean.CD_WORKFLOW_TYPE_POST {
641641
//get execution stage data
642642
impl.Logger.Infow("fetching pre/post workflow stages", "workflowId", workflowR.Id, "workflowType", workflowR.WorkflowType)
643-
workflowStageData, err := impl.workflowStageStatusService.GetPrePostWorkflowStagesByWorkflowIdAndType(workflowR.Id, workflowR.WorkflowType.String())
643+
workflowStageData, err := impl.workflowStageStatusService.GetWorkflowStagesByWorkflowIdAndType(workflowR.Id, workflowR.WorkflowType.String())
644644
if err != nil {
645645
impl.Logger.Errorw("error in fetching pre/post workflow stages", "err", err)
646646
return types.WorkflowResponse{}, err

pkg/pipeline/CiHandler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ func (impl *CiHandlerImpl) GetBuildHistory(pipelineId int, appId int, offset int
537537
workflowIds = append(workflowIds, w.Id)
538538
}
539539

540-
allWfStagesDetail, err := impl.workFlowStageStatusService.GetCiWorkflowStagesByWorkflowIds(workflowIds)
540+
allWfStagesDetail, err := impl.workFlowStageStatusService.GetWorkflowStagesByWorkflowIdsAndWfType(workflowIds, bean2.CI_WORKFLOW_TYPE.String())
541541
if err != nil {
542542
impl.Logger.Errorw("error in fetching allWfStagesDetail", "err", err, "workflowIds", workflowIds)
543543
return nil, err
@@ -793,7 +793,7 @@ func (impl *CiHandlerImpl) FetchWorkflowDetails(appId int, pipelineId int, build
793793
impl.ciWorkflowRepository.MigrateIsArtifactUploaded(workflow.Id, ciArtifact.IsArtifactUploaded)
794794
isArtifactUploaded = ciArtifact.IsArtifactUploaded
795795
}
796-
wfStagesDetail, err := impl.workFlowStageStatusService.GetCiWorkflowStagesByWorkflowIds([]int{workflow.Id})
796+
wfStagesDetail, err := impl.workFlowStageStatusService.GetWorkflowStagesByWorkflowIdsAndWfType([]int{workflow.Id}, bean2.CI_WORKFLOW_TYPE.String())
797797
if err != nil {
798798
impl.Logger.Errorw("error in fetching allWfStagesDetail", "err", err, "workflowId", workflow.Id)
799799
return types.WorkflowResponse{}, err

pkg/pipeline/workflowStatus/WorkflowStageStatusService.go

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
88
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig/bean/workflow/cdWorkflow"
99
bean3 "github.com/devtron-labs/devtron/pkg/bean"
10-
bean4 "github.com/devtron-labs/devtron/pkg/pipeline/bean"
1110
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
1211
"github.com/devtron-labs/devtron/pkg/pipeline/types"
1312
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus/adapter"
@@ -23,10 +22,8 @@ import (
2322
)
2423

2524
type WorkFlowStageStatusService interface {
26-
//todo move read functions for ci and cd from here to their respective services. CiService and CdWorkflowRunnerService
27-
GetCiWorkflowStagesByWorkflowIds(wfIds []int) ([]*repository.WorkflowExecutionStage, error)
28-
GetPrePostWorkflowStagesByWorkflowIdAndType(wfId int, wfType string) ([]*repository.WorkflowExecutionStage, error)
29-
GetPrePostWorkflowStagesByWorkflowRunnerIdsList(wfIdWfTypeMap map[int]bean4.CdWorkflowWithArtifact) (map[int]map[string][]*bean2.WorkflowStageDto, error)
25+
GetWorkflowStagesByWorkflowIdsAndWfType(wfIds []int, wfType string) ([]*repository.WorkflowExecutionStage, error)
26+
GetWorkflowStagesByWorkflowIdAndType(wfId int, wfType string) ([]*repository.WorkflowExecutionStage, error)
3027

3128
SaveWorkflowStages(wfId int, wfType, wfName string, tx *pg.Tx) error
3229
UpdateWorkflowStages(wfId int, wfType, wfName, wfStatus, podStatus, message, podName string, tx *pg.Tx) (string, string, error)
@@ -284,10 +281,10 @@ func (impl *WorkFlowStageStatusServiceImpl) updateWorkflowStagesToDevtronStatus(
284281
return currentWorkflowStages, updatedWfStatus
285282
}
286283

287-
func (impl *WorkFlowStageStatusServiceImpl) GetCiWorkflowStagesByWorkflowIds(wfIds []int) ([]*repository.WorkflowExecutionStage, error) {
284+
func (impl *WorkFlowStageStatusServiceImpl) GetWorkflowStagesByWorkflowIdsAndWfType(wfIds []int, wfType string) ([]*repository.WorkflowExecutionStage, error) {
288285
// implementation
289286

290-
dbData, err := impl.workflowStatusRepository.GetCiWorkflowStagesByWorkflowIds(wfIds)
287+
dbData, err := impl.workflowStatusRepository.GetWorkflowStagesByWorkflowIdsAndWtype(wfIds, wfType)
291288
if err != nil {
292289
impl.logger.Errorw("error in getting ci workflow stages", "error", err)
293290
return nil, err
@@ -299,7 +296,7 @@ func (impl *WorkFlowStageStatusServiceImpl) GetCiWorkflowStagesByWorkflowIds(wfI
299296
}
300297
}
301298

302-
func (impl *WorkFlowStageStatusServiceImpl) GetPrePostWorkflowStagesByWorkflowIdAndType(wfId int, wfType string) ([]*repository.WorkflowExecutionStage, error) {
299+
func (impl *WorkFlowStageStatusServiceImpl) GetWorkflowStagesByWorkflowIdAndType(wfId int, wfType string) ([]*repository.WorkflowExecutionStage, error) {
303300
// implementation
304301

305302
dbData, err := impl.workflowStatusRepository.GetWorkflowStagesByWorkflowIdAndWtype(wfId, wfType)
@@ -314,44 +311,6 @@ func (impl *WorkFlowStageStatusServiceImpl) GetPrePostWorkflowStagesByWorkflowId
314311
}
315312
}
316313

317-
func (impl *WorkFlowStageStatusServiceImpl) GetPrePostWorkflowStagesByWorkflowRunnerIdsList(wfIdWfTypeMap map[int]bean4.CdWorkflowWithArtifact) (map[int]map[string][]*bean2.WorkflowStageDto, error) {
318-
// implementation
319-
resp := map[int]map[string][]*bean2.WorkflowStageDto{}
320-
if len(wfIdWfTypeMap) == 0 {
321-
return resp, nil
322-
}
323-
//first create a map of pre-runner ids and post-runner ids
324-
prePostRunnerIds := map[string][]int{}
325-
for wfId, wf := range wfIdWfTypeMap {
326-
if wf.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE.String() {
327-
prePostRunnerIds[bean.CD_WORKFLOW_TYPE_PRE.String()] = append(prePostRunnerIds[bean.CD_WORKFLOW_TYPE_PRE.String()], wfId)
328-
} else if wf.WorkflowType == bean.CD_WORKFLOW_TYPE_POST.String() {
329-
prePostRunnerIds[bean.CD_WORKFLOW_TYPE_POST.String()] = append(prePostRunnerIds[bean.CD_WORKFLOW_TYPE_POST.String()], wfId)
330-
}
331-
}
332-
333-
preCdDbData, err := impl.workflowStatusRepository.GetWorkflowStagesByWorkflowIdsAndWtype(prePostRunnerIds[bean.CD_WORKFLOW_TYPE_PRE.String()], bean.CD_WORKFLOW_TYPE_PRE.String())
334-
if err != nil {
335-
impl.logger.Errorw("error in getting pre-ci workflow stages", "error", err)
336-
return resp, err
337-
}
338-
//do the above for post cd
339-
postCdDbData, err := impl.workflowStatusRepository.GetWorkflowStagesByWorkflowIdsAndWtype(prePostRunnerIds[bean.CD_WORKFLOW_TYPE_POST.String()], bean.CD_WORKFLOW_TYPE_POST.String())
340-
if err != nil {
341-
impl.logger.Errorw("error in getting post-ci workflow stages", "error", err)
342-
return resp, err
343-
}
344-
//iterate over prePostRunnerIds and create response structure using ConvertDBWorkflowStageToMap function
345-
for wfId, wf := range wfIdWfTypeMap {
346-
if wf.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE.String() {
347-
resp[wfId] = impl.ConvertDBWorkflowStageToMap(preCdDbData, wfId, wf.Status, wf.PodStatus, wf.Message, wf.WorkflowType, wf.StartedOn, wf.FinishedOn)
348-
} else if wf.WorkflowType == bean.CD_WORKFLOW_TYPE_POST.String() {
349-
resp[wfId] = impl.ConvertDBWorkflowStageToMap(postCdDbData, wfId, wf.Status, wf.PodStatus, wf.Message, wf.WorkflowType, wf.StartedOn, wf.FinishedOn)
350-
}
351-
}
352-
return resp, nil
353-
}
354-
355314
func (impl *WorkFlowStageStatusServiceImpl) ConvertDBWorkflowStageToMap(workflowStages []*repository.WorkflowExecutionStage, wfId int, status, podStatus, message, wfType string, startTime, endTime time.Time) map[string][]*bean2.WorkflowStageDto {
356315
wfMap := make(map[string][]*bean2.WorkflowStageDto)
357316
foundInDb := false

pkg/pipeline/workflowStatus/adapter/adapter.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package adapter
22

33
import (
44
"encoding/json"
5+
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
56
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig/bean/workflow/cdWorkflow"
67
bean3 "github.com/devtron-labs/devtron/pkg/bean"
78
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
@@ -44,19 +45,19 @@ func getMetadataJson(metadata string) map[string]interface{} {
4445
func ConvertStatusToDevtronStatus(wfStatus string, wfMessage string) bean.WorkflowStageStatus {
4546
// implementation
4647
switch strings.ToLower(wfStatus) {
47-
case "pending", strings.ToLower(cdWorkflow.WorkflowWaitingToStart):
48+
case strings.ToLower(string(v1alpha1.NodePending)), strings.ToLower(cdWorkflow.WorkflowWaitingToStart):
4849
return bean.WORKFLOW_STAGE_STATUS_NOT_STARTED
49-
case "starting", "running":
50+
case strings.ToLower(cdWorkflow.WorkflowStarting), strings.ToLower(string(v1alpha1.NodeRunning)):
5051
return bean.WORKFLOW_STAGE_STATUS_RUNNING
51-
case "succeeded":
52+
case strings.ToLower(cdWorkflow.WorkflowSucceeded):
5253
return bean.WORKFLOW_STAGE_STATUS_SUCCEEDED
53-
case "failed", "error", "errored":
54+
case strings.ToLower(cdWorkflow.WorkflowFailed), strings.ToLower(string(v1alpha1.NodeError)), "errored":
5455
if strings.ToLower(wfMessage) == strings.ToLower(constants.POD_TIMEOUT_MESSAGE) {
5556
return bean.WORKFLOW_STAGE_STATUS_TIMEOUT
5657
} else {
5758
return bean.WORKFLOW_STAGE_STATUS_FAILED
5859
}
59-
case "aborted", "cancelled":
60+
case strings.ToLower(cdWorkflow.WorkflowAborted), strings.ToLower(cdWorkflow.WorkflowCancel):
6061
return bean.WORKFLOW_STAGE_STATUS_ABORTED
6162
default:
6263
log.Println("unknown wf status", "wf", wfStatus)

pkg/pipeline/workflowStatus/repository/WorkflowStageRepository.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package repository
22

33
import (
4-
bean2 "github.com/devtron-labs/devtron/api/bean"
54
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus/bean"
65
"github.com/devtron-labs/devtron/pkg/sql"
76
"github.com/go-pg/pg"
@@ -12,7 +11,6 @@ type WorkflowStageRepository interface {
1211
SaveWorkflowStages(workflowStage []*WorkflowExecutionStage, tx *pg.Tx) ([]*WorkflowExecutionStage, error)
1312
UpdateWorkflowStages(workflowStage []*WorkflowExecutionStage, tx *pg.Tx) ([]*WorkflowExecutionStage, error)
1413
GetWorkflowStagesByWorkflowIdAndType(workflowId int, workflowType string) ([]*WorkflowExecutionStage, error)
15-
GetCiWorkflowStagesByWorkflowIds(wfIds []int) ([]*WorkflowExecutionStage, error)
1614
GetWorkflowStagesByWorkflowIdAndWtype(wfId int, wfType string) ([]*WorkflowExecutionStage, error)
1715
GetWorkflowStagesByWorkflowIdsAndWtype(wfIds []int, wfType string) ([]*WorkflowExecutionStage, error)
1816
}
@@ -72,16 +70,6 @@ func (impl *WorkflowStageRepositoryImpl) GetWorkflowStagesByWorkflowIdAndType(wo
7270
return workflowStages, err
7371
}
7472

75-
func (impl *WorkflowStageRepositoryImpl) GetCiWorkflowStagesByWorkflowIds(wfIds []int) ([]*WorkflowExecutionStage, error) {
76-
var workflowStages []*WorkflowExecutionStage
77-
err := impl.dbConnection.Model(&workflowStages).Where("workflow_id in (?)", pg.In(wfIds)).Where("workflow_type = ?", bean2.CI_WORKFLOW_TYPE).Order("id ASC").Select()
78-
if err != nil {
79-
impl.logger.Errorw("error in fetching ci workflow stages", "err", err)
80-
return workflowStages, err
81-
}
82-
return workflowStages, err
83-
}
84-
8573
func (impl *WorkflowStageRepositoryImpl) GetWorkflowStagesByWorkflowIdAndWtype(wfId int, wfType string) ([]*WorkflowExecutionStage, error) {
8674
var workflowStages []*WorkflowExecutionStage
8775
err := impl.dbConnection.Model(&workflowStages).Where("workflow_id = ?", wfId).Where("workflow_type = ?", wfType).Order("id ASC").Select()

pkg/workflow/cd/CdWorkflowRunnerService.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
2222
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig/bean/workflow"
2323
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig/bean/workflow/cdWorkflow"
24+
bean4 "github.com/devtron-labs/devtron/pkg/pipeline/bean"
2425
"github.com/devtron-labs/devtron/pkg/pipeline/types"
2526
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
27+
bean3 "github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus/bean"
2628
"github.com/devtron-labs/devtron/pkg/sql"
2729
"github.com/devtron-labs/devtron/pkg/workflow/cd/adapter"
2830
"github.com/devtron-labs/devtron/pkg/workflow/cd/bean"
@@ -34,6 +36,7 @@ type CdWorkflowRunnerService interface {
3436
UpdateIsArtifactUploaded(wfrId int, isArtifactUploaded bool) error
3537
SaveCDWorkflowRunnerWithStage(wfr *pipelineConfig.CdWorkflowRunner) (*pipelineConfig.CdWorkflowRunner, error)
3638
UpdateCdWorkflowRunnerWithStage(wfr *pipelineConfig.CdWorkflowRunner) error
39+
GetPrePostWorkflowStagesByWorkflowRunnerIdsList(wfIdWfTypeMap map[int]bean4.CdWorkflowWithArtifact) (map[int]map[string][]*bean3.WorkflowStageDto, error)
3740
}
3841

3942
type CdWorkflowRunnerServiceImpl struct {
@@ -156,3 +159,41 @@ func (impl *CdWorkflowRunnerServiceImpl) UpdateCdWorkflowRunnerWithStage(wfr *pi
156159
return nil
157160

158161
}
162+
163+
func (impl *CdWorkflowRunnerServiceImpl) GetPrePostWorkflowStagesByWorkflowRunnerIdsList(wfIdWfTypeMap map[int]bean4.CdWorkflowWithArtifact) (map[int]map[string][]*bean3.WorkflowStageDto, error) {
164+
// implementation
165+
resp := map[int]map[string][]*bean3.WorkflowStageDto{}
166+
if len(wfIdWfTypeMap) == 0 {
167+
return resp, nil
168+
}
169+
//first create a map of pre-runner ids and post-runner ids
170+
prePostRunnerIds := map[string][]int{}
171+
for wfId, wf := range wfIdWfTypeMap {
172+
if wf.WorkflowType == bean2.CD_WORKFLOW_TYPE_PRE.String() {
173+
prePostRunnerIds[bean2.CD_WORKFLOW_TYPE_PRE.String()] = append(prePostRunnerIds[bean2.CD_WORKFLOW_TYPE_PRE.String()], wfId)
174+
} else if wf.WorkflowType == bean2.CD_WORKFLOW_TYPE_POST.String() {
175+
prePostRunnerIds[bean2.CD_WORKFLOW_TYPE_POST.String()] = append(prePostRunnerIds[bean2.CD_WORKFLOW_TYPE_POST.String()], wfId)
176+
}
177+
}
178+
179+
preCdDbData, err := impl.workflowStageService.GetWorkflowStagesByWorkflowIdsAndWfType(prePostRunnerIds[bean2.CD_WORKFLOW_TYPE_PRE.String()], bean2.CD_WORKFLOW_TYPE_PRE.String())
180+
if err != nil {
181+
impl.logger.Errorw("error in getting pre-ci workflow stages", "error", err)
182+
return resp, err
183+
}
184+
//do the above for post cd
185+
postCdDbData, err := impl.workflowStageService.GetWorkflowStagesByWorkflowIdsAndWfType(prePostRunnerIds[bean2.CD_WORKFLOW_TYPE_POST.String()], bean2.CD_WORKFLOW_TYPE_POST.String())
186+
if err != nil {
187+
impl.logger.Errorw("error in getting post-ci workflow stages", "error", err)
188+
return resp, err
189+
}
190+
//iterate over prePostRunnerIds and create response structure using ConvertDBWorkflowStageToMap function
191+
for wfId, wf := range wfIdWfTypeMap {
192+
if wf.WorkflowType == bean2.CD_WORKFLOW_TYPE_PRE.String() {
193+
resp[wfId] = impl.workflowStageService.ConvertDBWorkflowStageToMap(preCdDbData, wfId, wf.Status, wf.PodStatus, wf.Message, wf.WorkflowType, wf.StartedOn, wf.FinishedOn)
194+
} else if wf.WorkflowType == bean2.CD_WORKFLOW_TYPE_POST.String() {
195+
resp[wfId] = impl.workflowStageService.ConvertDBWorkflowStageToMap(postCdDbData, wfId, wf.Status, wf.PodStatus, wf.Message, wf.WorkflowType, wf.StartedOn, wf.FinishedOn)
196+
}
197+
}
198+
return resp, nil
199+
}

0 commit comments

Comments
 (0)