Skip to content

Commit 48468ce

Browse files
committed
refactoring
1 parent 03f78d4 commit 48468ce

File tree

3 files changed

+114
-100
lines changed

3 files changed

+114
-100
lines changed

pkg/pipeline/CdHandler.go

Lines changed: 7 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
3333
bean5 "github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus/bean"
3434
"github.com/devtron-labs/devtron/pkg/workflow/cd"
35+
"github.com/devtron-labs/devtron/pkg/workflow/cd/read"
3536
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
3637
"slices"
3738
"strconv"
@@ -93,6 +94,7 @@ type CdHandlerImpl struct {
9394
cdWorkflowRunnerService cd.CdWorkflowRunnerService
9495
WorkflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService
9596
pipelineStageRepository repository2.PipelineStageRepository
97+
cdWorkflowRunnerReadService read.CdWorkflowRunnerReadService
9698
}
9799

98100
func NewCdHandlerImpl(Logger *zap.SugaredLogger, userService user.UserService,
@@ -109,6 +111,7 @@ func NewCdHandlerImpl(Logger *zap.SugaredLogger, userService user.UserService,
109111
cdWorkflowRunnerService cd.CdWorkflowRunnerService,
110112
WorkflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService,
111113
pipelineStageRepository repository2.PipelineStageRepository,
114+
cdWorkflowRunnerReadService read.CdWorkflowRunnerReadService,
112115
) *CdHandlerImpl {
113116
cdh := &CdHandlerImpl{
114117
Logger: Logger,
@@ -129,6 +132,7 @@ func NewCdHandlerImpl(Logger *zap.SugaredLogger, userService user.UserService,
129132
cdWorkflowRunnerService: cdWorkflowRunnerService,
130133
WorkflowStatusLatestService: WorkflowStatusLatestService,
131134
pipelineStageRepository: pipelineStageRepository,
135+
cdWorkflowRunnerReadService: cdWorkflowRunnerReadService,
132136
}
133137
config, err := types.GetCdConfig()
134138
if err != nil {
@@ -602,7 +606,7 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerView(appId int) ([]*p
602606
return cdWorkflowStatus, nil
603607
}
604608

605-
result, err := impl.getWfrStatusForLatestRunners(pipelineIds, pipelines)
609+
result, err := impl.cdWorkflowRunnerReadService.GetWfrStatusForLatestRunners(pipelineIds, pipelines)
606610
if err != nil {
607611
impl.Logger.Errorw("error in fetching wfrIds", "pipelineIds", pipelineIds, "err", err)
608612
return cdWorkflowStatus, err
@@ -692,88 +696,6 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerView(appId int) ([]*p
692696
return cdWorkflowStatus, err
693697
}
694698

695-
func (impl *CdHandlerImpl) getWfrStatusForLatestRunners(pipelineIds []int, pipelines []*pipelineConfig.Pipeline) ([]*pipelineConfig.CdWorkflowStatus, error) {
696-
// fetching the latest pipeline from the index table - cdWorkflowLatest
697-
var result []*pipelineConfig.CdWorkflowStatus
698-
cdWorkflowLatest, err := impl.WorkflowStatusLatestService.GetCdWorkflowLatestByPipelineIds(pipelineIds)
699-
if err != nil {
700-
impl.Logger.Errorw("error in getting latest by pipelineId", "pipelineId", pipelineIds, "err", err)
701-
return nil, err
702-
}
703-
704-
var pipelineIdToCiPipelineIdMap map[int]int
705-
for _, item := range pipelines {
706-
pipelineIdToCiPipelineIdMap[item.Id] = item.CiPipelineId
707-
}
708-
709-
for _, item := range cdWorkflowLatest {
710-
result = append(result, &pipelineConfig.CdWorkflowStatus{
711-
CiPipelineId: pipelineIdToCiPipelineIdMap[item.PipelineId],
712-
PipelineId: item.PipelineId,
713-
WorkflowType: item.WorkflowType,
714-
WfrId: item.WorkflowRunnerId,
715-
})
716-
}
717-
718-
var cdWorfklowLatestMap map[int]map[bean.WorkflowType]bool
719-
for _, item := range cdWorkflowLatest {
720-
if _, ok := cdWorfklowLatestMap[item.PipelineId]; !ok {
721-
cdWorfklowLatestMap[item.PipelineId] = make(map[bean.WorkflowType]bool)
722-
}
723-
cdWorfklowLatestMap[item.PipelineId][bean.WorkflowType(item.WorkflowType)] = true
724-
}
725-
726-
pipelineStage, err := impl.pipelineStageRepository.GetAllCdStagesByCdPipelineIds(pipelineIds)
727-
if err != nil {
728-
impl.Logger.Errorw("error in fetching pipeline stages", "pipelineId", pipelineIds, "err", err)
729-
return nil, err
730-
}
731-
pipelineStageMap := make(map[int]map[bean.WorkflowType]bool)
732-
for _, item := range pipelineStage {
733-
if _, ok := pipelineStageMap[item.CdPipelineId]; !ok {
734-
pipelineStageMap[item.CdPipelineId] = make(map[bean.WorkflowType]bool)
735-
}
736-
if item.Type == repository2.PIPELINE_STAGE_TYPE_PRE_CD {
737-
pipelineStageMap[item.CdPipelineId][bean.CD_WORKFLOW_TYPE_PRE] = true
738-
} else if item.Type == repository2.PIPELINE_STAGE_TYPE_POST_CD {
739-
pipelineStageMap[item.CdPipelineId][bean.CD_WORKFLOW_TYPE_POST] = true
740-
}
741-
}
742-
743-
// calculating all the pipelines not present in the index table cdWorkflowLatest
744-
var pipelinesAbsentInCache map[int]bean.WorkflowType
745-
for _, item := range pipelines {
746-
if _, ok := cdWorfklowLatestMap[item.Id]; !ok {
747-
pipelinesAbsentInCache[item.Id] = bean.CD_WORKFLOW_TYPE_PRE
748-
pipelinesAbsentInCache[item.Id] = bean.CD_WORKFLOW_TYPE_DEPLOY
749-
pipelinesAbsentInCache[item.Id] = bean.CD_WORKFLOW_TYPE_POST
750-
} else {
751-
if _, ok := pipelineStageMap[item.Id][bean.CD_WORKFLOW_TYPE_PRE]; ok {
752-
if val, ok := cdWorfklowLatestMap[item.Id][bean.CD_WORKFLOW_TYPE_PRE]; !ok || !val {
753-
pipelinesAbsentInCache[item.Id] = bean.CD_WORKFLOW_TYPE_PRE
754-
}
755-
}
756-
if _, ok := pipelineStageMap[item.Id][bean.CD_WORKFLOW_TYPE_POST]; ok {
757-
if val, ok := cdWorfklowLatestMap[item.Id][bean.CD_WORKFLOW_TYPE_POST]; !ok || !val {
758-
pipelinesAbsentInCache[item.Id] = bean.CD_WORKFLOW_TYPE_POST
759-
}
760-
}
761-
if val, ok := cdWorfklowLatestMap[item.Id][bean.CD_WORKFLOW_TYPE_DEPLOY]; !ok || !val {
762-
pipelinesAbsentInCache[item.Id] = bean.CD_WORKFLOW_TYPE_POST
763-
}
764-
}
765-
}
766-
if len(pipelinesAbsentInCache) > 0 {
767-
remainingRunners, err := impl.cdWorkflowRepository.FetchAllCdStagesLatestEntity(pipelinesAbsentInCache)
768-
if err != nil {
769-
impl.Logger.Errorw("error in fetching all cd stages latest entity", "pipelinesAbsentInCache", pipelinesAbsentInCache, "err", err)
770-
return nil, err
771-
}
772-
result = append(result, remainingRunners...)
773-
}
774-
return result, nil
775-
}
776-
777699
func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerViewForEnvironment(request resourceGroup2.ResourceGroupingRequest, token string) ([]*pipelineConfig.CdWorkflowStatus, error) {
778700
cdWorkflowStatus := make([]*pipelineConfig.CdWorkflowStatus, 0)
779701
var pipelines []*pipelineConfig.Pipeline
@@ -848,7 +770,7 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerViewForEnvironment(re
848770

849771
cdMap := make(map[int]*pipelineConfig.CdWorkflowStatus)
850772

851-
wfrStatus, err := impl.getWfrStatusForLatestRunners(pipelineIds, pipelines)
773+
wfrStatus, err := impl.cdWorkflowRunnerReadService.GetWfrStatusForLatestRunners(pipelineIds, pipelines)
852774
if err != nil {
853775
impl.Logger.Errorw("error in fetching wfrIds", "pipelineIds", pipelineIds, "err", err)
854776
return cdWorkflowStatus, err
@@ -1000,7 +922,7 @@ func (impl *CdHandlerImpl) FetchAppDeploymentStatusForEnvironments(request resou
1000922
return deploymentStatuses, nil
1001923
}
1002924
_, span = otel.Tracer("orchestrator").Start(request.Ctx, "pipelineBuilder.FetchAllCdStagesLatestEntity")
1003-
result, err := impl.getWfrStatusForLatestRunners(pipelineIds, cdPipelines)
925+
result, err := impl.cdWorkflowRunnerReadService.GetWfrStatusForLatestRunners(pipelineIds, cdPipelines)
1004926
span.End()
1005927
if err != nil {
1006928
return deploymentStatuses, err

pkg/workflow/cd/read/CdWorkflowRunnerReadService.go

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,38 @@
11
package read
22

33
import (
4+
bean2 "github.com/devtron-labs/devtron/api/bean"
45
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
6+
repository2 "github.com/devtron-labs/devtron/pkg/pipeline/repository"
57
"github.com/devtron-labs/devtron/pkg/workflow/cd/adapter"
68
"github.com/devtron-labs/devtron/pkg/workflow/cd/bean"
9+
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
710
"github.com/go-pg/pg"
811
"go.uber.org/zap"
912
)
1013

1114
type CdWorkflowRunnerReadService interface {
1215
FindWorkflowRunnerById(wfrId int) (*bean.CdWorkflowRunnerDto, error)
1316
CheckIfWfrLatest(wfrId, pipelineId int) (isLatest bool, err error)
17+
GetWfrStatusForLatestRunners(pipelineIds []int, pipelines []*pipelineConfig.Pipeline) ([]*pipelineConfig.CdWorkflowStatus, error)
1418
}
1519

1620
type CdWorkflowRunnerReadServiceImpl struct {
17-
logger *zap.SugaredLogger
18-
cdWorkflowRepository pipelineConfig.CdWorkflowRepository
21+
logger *zap.SugaredLogger
22+
cdWorkflowRepository pipelineConfig.CdWorkflowRepository
23+
WorkflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService
24+
pipelineStageRepository repository2.PipelineStageRepository
1925
}
2026

2127
func NewCdWorkflowRunnerReadServiceImpl(logger *zap.SugaredLogger,
22-
cdWorkflowRepository pipelineConfig.CdWorkflowRepository) *CdWorkflowRunnerReadServiceImpl {
28+
cdWorkflowRepository pipelineConfig.CdWorkflowRepository,
29+
WorkflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService,
30+
pipelineStageRepository repository2.PipelineStageRepository) *CdWorkflowRunnerReadServiceImpl {
2331
return &CdWorkflowRunnerReadServiceImpl{
24-
logger: logger,
25-
cdWorkflowRepository: cdWorkflowRepository,
32+
logger: logger,
33+
cdWorkflowRepository: cdWorkflowRepository,
34+
WorkflowStatusLatestService: WorkflowStatusLatestService,
35+
pipelineStageRepository: pipelineStageRepository,
2636
}
2737
}
2838

@@ -44,3 +54,85 @@ func (impl *CdWorkflowRunnerReadServiceImpl) CheckIfWfrLatest(wfrId, pipelineId
4454
}
4555
return isLatest, nil
4656
}
57+
58+
func (impl *CdWorkflowRunnerReadServiceImpl) GetWfrStatusForLatestRunners(pipelineIds []int, pipelines []*pipelineConfig.Pipeline) ([]*pipelineConfig.CdWorkflowStatus, error) {
59+
// fetching the latest pipeline from the index table - cdWorkflowLatest
60+
var result []*pipelineConfig.CdWorkflowStatus
61+
cdWorkflowLatest, err := impl.WorkflowStatusLatestService.GetCdWorkflowLatestByPipelineIds(pipelineIds)
62+
if err != nil {
63+
impl.logger.Errorw("error in getting latest by pipelineId", "pipelineId", pipelineIds, "err", err)
64+
return nil, err
65+
}
66+
67+
var pipelineIdToCiPipelineIdMap map[int]int
68+
for _, item := range pipelines {
69+
pipelineIdToCiPipelineIdMap[item.Id] = item.CiPipelineId
70+
}
71+
72+
for _, item := range cdWorkflowLatest {
73+
result = append(result, &pipelineConfig.CdWorkflowStatus{
74+
CiPipelineId: pipelineIdToCiPipelineIdMap[item.PipelineId],
75+
PipelineId: item.PipelineId,
76+
WorkflowType: item.WorkflowType,
77+
WfrId: item.WorkflowRunnerId,
78+
})
79+
}
80+
81+
var cdWorfklowLatestMap map[int]map[bean2.WorkflowType]bool
82+
for _, item := range cdWorkflowLatest {
83+
if _, ok := cdWorfklowLatestMap[item.PipelineId]; !ok {
84+
cdWorfklowLatestMap[item.PipelineId] = make(map[bean2.WorkflowType]bool)
85+
}
86+
cdWorfklowLatestMap[item.PipelineId][bean2.WorkflowType(item.WorkflowType)] = true
87+
}
88+
89+
pipelineStage, err := impl.pipelineStageRepository.GetAllCdStagesByCdPipelineIds(pipelineIds)
90+
if err != nil {
91+
impl.logger.Errorw("error in fetching pipeline stages", "pipelineId", pipelineIds, "err", err)
92+
return nil, err
93+
}
94+
pipelineStageMap := make(map[int]map[bean2.WorkflowType]bool)
95+
for _, item := range pipelineStage {
96+
if _, ok := pipelineStageMap[item.CdPipelineId]; !ok {
97+
pipelineStageMap[item.CdPipelineId] = make(map[bean2.WorkflowType]bool)
98+
}
99+
if item.Type == repository2.PIPELINE_STAGE_TYPE_PRE_CD {
100+
pipelineStageMap[item.CdPipelineId][bean2.CD_WORKFLOW_TYPE_PRE] = true
101+
} else if item.Type == repository2.PIPELINE_STAGE_TYPE_POST_CD {
102+
pipelineStageMap[item.CdPipelineId][bean2.CD_WORKFLOW_TYPE_POST] = true
103+
}
104+
}
105+
106+
// calculating all the pipelines not present in the index table cdWorkflowLatest
107+
var pipelinesAbsentInCache map[int]bean2.WorkflowType
108+
for _, item := range pipelines {
109+
if _, ok := cdWorfklowLatestMap[item.Id]; !ok {
110+
pipelinesAbsentInCache[item.Id] = bean2.CD_WORKFLOW_TYPE_PRE
111+
pipelinesAbsentInCache[item.Id] = bean2.CD_WORKFLOW_TYPE_DEPLOY
112+
pipelinesAbsentInCache[item.Id] = bean2.CD_WORKFLOW_TYPE_POST
113+
} else {
114+
if _, ok := pipelineStageMap[item.Id][bean2.CD_WORKFLOW_TYPE_PRE]; ok {
115+
if val, ok := cdWorfklowLatestMap[item.Id][bean2.CD_WORKFLOW_TYPE_PRE]; !ok || !val {
116+
pipelinesAbsentInCache[item.Id] = bean2.CD_WORKFLOW_TYPE_PRE
117+
}
118+
}
119+
if _, ok := pipelineStageMap[item.Id][bean2.CD_WORKFLOW_TYPE_POST]; ok {
120+
if val, ok := cdWorfklowLatestMap[item.Id][bean2.CD_WORKFLOW_TYPE_POST]; !ok || !val {
121+
pipelinesAbsentInCache[item.Id] = bean2.CD_WORKFLOW_TYPE_POST
122+
}
123+
}
124+
if val, ok := cdWorfklowLatestMap[item.Id][bean2.CD_WORKFLOW_TYPE_DEPLOY]; !ok || !val {
125+
pipelinesAbsentInCache[item.Id] = bean2.CD_WORKFLOW_TYPE_POST
126+
}
127+
}
128+
}
129+
if len(pipelinesAbsentInCache) > 0 {
130+
remainingRunners, err := impl.cdWorkflowRepository.FetchAllCdStagesLatestEntity(pipelinesAbsentInCache)
131+
if err != nil {
132+
impl.logger.Errorw("error in fetching all cd stages latest entity", "pipelinesAbsentInCache", pipelinesAbsentInCache, "err", err)
133+
return nil, err
134+
}
135+
result = append(result, remainingRunners...)
136+
}
137+
return result, nil
138+
}

0 commit comments

Comments
 (0)