Skip to content

Commit 083fd43

Browse files
committed
Merge remote-tracking branch 'origin/optimize-ci-cd-workflow' into cd-get-optimised
2 parents 7f327f2 + b3e96fc commit 083fd43

File tree

9 files changed

+265
-66
lines changed

9 files changed

+265
-66
lines changed

internal/sql/repository/pipelineConfig/CiWorkflowRepository.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type CiWorkflowRepository interface {
4040
FindByName(name string) (*CiWorkflow, error)
4141

4242
FindLastTriggeredWorkflowByCiIds(pipelineId []int) (ciWorkflow []*CiWorkflow, err error)
43+
FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error)
4344
FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error)
4445
FindAllLastTriggeredWorkflowByArtifactId(ciArtifactId []int) (ciWorkflow []*CiWorkflow, err error)
4546
FindAllTriggeredWorkflowCountInLast24Hour() (ciWorkflowCount int, err error)
@@ -48,6 +49,7 @@ type CiWorkflowRepository interface {
4849
ExistsByStatus(status string) (bool, error)
4950
FindBuildTypeAndStatusDataOfLast1Day() ([]*BuildTypeCount, error)
5051
FIndCiWorkflowStatusesByAppId(appId int) ([]*CiWorkflowStatus, error)
52+
FindCiPipelineIdsByAppId(appId int) ([]int, error)
5153

5254
MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool)
5355
MigrateCiArtifactLocation(wfId int, artifactLocation string)
@@ -290,6 +292,19 @@ func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByCiIds(pipelineI
290292
return ciWorkflow, err
291293
}
292294

295+
// FindWorkflowsByCiWorkflowIds fetches workflows by their workflow IDs (simple query)
296+
func (impl *CiWorkflowRepositoryImpl) FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error) {
297+
if len(ciWorkflowIds) == 0 {
298+
return ciWorkflow, nil
299+
}
300+
301+
err = impl.dbConnection.Model(&ciWorkflow).
302+
Column("ci_workflow.*", "CiPipeline").
303+
Where("ci_workflow.id IN (?)", pg.In(ciWorkflowIds)).
304+
Select()
305+
return ciWorkflow, err
306+
}
307+
293308
func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error) {
294309
workflow := &CiWorkflow{}
295310
err = impl.dbConnection.Model(workflow).
@@ -379,6 +394,16 @@ func (impl *CiWorkflowRepositoryImpl) FIndCiWorkflowStatusesByAppId(appId int) (
379394
return ciworkflowStatuses, err
380395
}
381396

397+
// FindCiPipelineIdsByAppId gets all CI pipeline IDs for an app (simple query)
398+
func (impl *CiWorkflowRepositoryImpl) FindCiPipelineIdsByAppId(appId int) ([]int, error) {
399+
var ciPipelineIds []int
400+
err := impl.dbConnection.Model((*CiPipeline)(nil)).
401+
Column("id").
402+
Where("app_id = ? AND deleted = false", appId).
403+
Select(&ciPipelineIds)
404+
return ciPipelineIds, err
405+
}
406+
382407
func (impl *CiWorkflowRepositoryImpl) MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool) {
383408
_, err := impl.dbConnection.Model((*CiWorkflow)(nil)).
384409
Set("is_artifact_uploaded = ?", workflow.GetArtifactUploadedType(isArtifactUploaded)).

internal/sql/repository/pipelineConfig/WorkflowStatusLatestRepository.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type WorkflowStatusLatestRepository interface {
2929
UpdateCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error
3030
GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error)
3131
GetCiWorkflowStatusLatestByAppId(appId int) ([]*CiWorkflowStatusLatest, error)
32+
GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error)
3233
DeleteCiWorkflowStatusLatestByPipelineId(pipelineId int) error
3334

3435
// CD Workflow Status Latest methods
@@ -129,6 +130,22 @@ func (impl *WorkflowStatusLatestRepositoryImpl) DeleteCiWorkflowStatusLatestByPi
129130
return nil
130131
}
131132

133+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error) {
134+
if len(pipelineIds) == 0 {
135+
return []*CiWorkflowStatusLatest{}, nil
136+
}
137+
138+
var models []*CiWorkflowStatusLatest
139+
err := impl.dbConnection.Model(&models).
140+
Where("pipeline_id IN (?)", pg.In(pipelineIds)).
141+
Select()
142+
if err != nil {
143+
impl.logger.Errorw("error in getting ci workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds)
144+
return nil, err
145+
}
146+
return models, nil
147+
}
148+
132149
// CD Workflow Status Latest methods implementation
133150
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
134151
var connection orm.DB

pkg/pipeline/CiHandler.go

Lines changed: 194 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
buildBean "github.com/devtron-labs/devtron/pkg/build/pipeline/bean"
2828
repository2 "github.com/devtron-labs/devtron/pkg/cluster/environment/repository"
2929
eventProcessorBean "github.com/devtron-labs/devtron/pkg/eventProcessor/bean"
30+
"github.com/devtron-labs/devtron/pkg/pipeline/adapter"
3031
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
3132
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
3233
"regexp"
@@ -76,56 +77,59 @@ type CiHandler interface {
7677
}
7778

7879
type CiHandlerImpl struct {
79-
Logger *zap.SugaredLogger
80-
ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository
81-
ciService CiService
82-
gitSensorClient gitSensor.Client
83-
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
84-
ciArtifactRepository repository.CiArtifactRepository
85-
userService user.UserService
86-
eventClient client.EventClient
87-
eventFactory client.EventFactory
88-
ciPipelineRepository pipelineConfig.CiPipelineRepository
89-
appListingRepository repository.AppListingRepository
90-
cdPipelineRepository pipelineConfig.PipelineRepository
91-
enforcerUtil rbac.EnforcerUtil
92-
resourceGroupService resourceGroup.ResourceGroupService
93-
envRepository repository2.EnvironmentRepository
94-
imageTaggingService imageTagging.ImageTaggingService
95-
customTagService CustomTagService
96-
appWorkflowRepository appWorkflow.AppWorkflowRepository
97-
config *types.CiConfig
98-
k8sCommonService k8sPkg.K8sCommonService
99-
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService
80+
Logger *zap.SugaredLogger
81+
ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository
82+
ciService CiService
83+
gitSensorClient gitSensor.Client
84+
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
85+
ciArtifactRepository repository.CiArtifactRepository
86+
userService user.UserService
87+
eventClient client.EventClient
88+
eventFactory client.EventFactory
89+
ciPipelineRepository pipelineConfig.CiPipelineRepository
90+
appListingRepository repository.AppListingRepository
91+
cdPipelineRepository pipelineConfig.PipelineRepository
92+
enforcerUtil rbac.EnforcerUtil
93+
resourceGroupService resourceGroup.ResourceGroupService
94+
envRepository repository2.EnvironmentRepository
95+
imageTaggingService imageTagging.ImageTaggingService
96+
customTagService CustomTagService
97+
appWorkflowRepository appWorkflow.AppWorkflowRepository
98+
config *types.CiConfig
99+
k8sCommonService k8sPkg.K8sCommonService
100+
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService
101+
workflowStatusLatestRepository pipelineConfig.WorkflowStatusLatestRepository
100102
}
101103

102104
func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository, gitSensorClient gitSensor.Client, ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
103105
ciArtifactRepository repository.CiArtifactRepository, userService user.UserService, eventClient client.EventClient, eventFactory client.EventFactory, ciPipelineRepository pipelineConfig.CiPipelineRepository,
104106
appListingRepository repository.AppListingRepository, cdPipelineRepository pipelineConfig.PipelineRepository, enforcerUtil rbac.EnforcerUtil, resourceGroupService resourceGroup.ResourceGroupService, envRepository repository2.EnvironmentRepository,
105107
imageTaggingService imageTagging.ImageTaggingService, k8sCommonService k8sPkg.K8sCommonService, appWorkflowRepository appWorkflow.AppWorkflowRepository, customTagService CustomTagService,
106108
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService,
109+
workflowStatusLatestRepository pipelineConfig.WorkflowStatusLatestRepository,
107110
) *CiHandlerImpl {
108111
cih := &CiHandlerImpl{
109-
Logger: Logger,
110-
ciService: ciService,
111-
ciPipelineMaterialRepository: ciPipelineMaterialRepository,
112-
gitSensorClient: gitSensorClient,
113-
ciWorkflowRepository: ciWorkflowRepository,
114-
ciArtifactRepository: ciArtifactRepository,
115-
userService: userService,
116-
eventClient: eventClient,
117-
eventFactory: eventFactory,
118-
ciPipelineRepository: ciPipelineRepository,
119-
appListingRepository: appListingRepository,
120-
cdPipelineRepository: cdPipelineRepository,
121-
enforcerUtil: enforcerUtil,
122-
resourceGroupService: resourceGroupService,
123-
envRepository: envRepository,
124-
imageTaggingService: imageTaggingService,
125-
customTagService: customTagService,
126-
appWorkflowRepository: appWorkflowRepository,
127-
k8sCommonService: k8sCommonService,
128-
workFlowStageStatusService: workFlowStageStatusService,
112+
Logger: Logger,
113+
ciService: ciService,
114+
ciPipelineMaterialRepository: ciPipelineMaterialRepository,
115+
gitSensorClient: gitSensorClient,
116+
ciWorkflowRepository: ciWorkflowRepository,
117+
ciArtifactRepository: ciArtifactRepository,
118+
userService: userService,
119+
eventClient: eventClient,
120+
eventFactory: eventFactory,
121+
ciPipelineRepository: ciPipelineRepository,
122+
appListingRepository: appListingRepository,
123+
cdPipelineRepository: cdPipelineRepository,
124+
enforcerUtil: enforcerUtil,
125+
resourceGroupService: resourceGroupService,
126+
envRepository: envRepository,
127+
imageTaggingService: imageTaggingService,
128+
customTagService: customTagService,
129+
appWorkflowRepository: appWorkflowRepository,
130+
k8sCommonService: k8sCommonService,
131+
workFlowStageStatusService: workFlowStageStatusService,
132+
workflowStatusLatestRepository: workflowStatusLatestRepository,
129133
}
130134
config, err := types.GetCiConfig()
131135
if err != nil {
@@ -644,13 +648,155 @@ func (impl *CiHandlerImpl) stateChanged(status string, podStatus string, msg str
644648
}
645649

646650
func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewV1(appId int) ([]*pipelineConfig.CiWorkflowStatus, error) {
647-
ciWorkflowStatuses, err := impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId)
648-
if err != nil && !util.IsErrNoRows(err) {
649-
impl.Logger.Errorw("err in fetching ciWorkflowStatuses from ciWorkflowRepository", "appId", appId, "err", err)
650-
return ciWorkflowStatuses, err
651+
allPipelineIds, err := impl.ciWorkflowRepository.FindCiPipelineIdsByAppId(appId)
652+
if err != nil {
653+
impl.Logger.Errorw("error in getting ci pipeline ids for app, falling back to old method", "appId", appId, "err", err)
654+
return impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId)
655+
}
656+
657+
if len(allPipelineIds) == 0 {
658+
return []*pipelineConfig.CiWorkflowStatus{}, nil
659+
}
660+
661+
latestStatusEntries, err := impl.workflowStatusLatestRepository.GetCiWorkflowStatusLatestByPipelineIds(allPipelineIds)
662+
if err != nil {
663+
impl.Logger.Errorw("error in checking latest status table, falling back to old method", "appId", appId, "err", err)
664+
return impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId)
665+
}
666+
667+
var allStatuses []*pipelineConfig.CiWorkflowStatus
668+
669+
if len(latestStatusEntries) > 0 {
670+
statusesFromLatestTable, err := impl.fetchCiWorkflowStatusFromLatestEntries(latestStatusEntries)
671+
if err != nil {
672+
impl.Logger.Errorw("error in fetching ci workflow status from latest ci workflow entries ", "latestStatusEntries", latestStatusEntries, "err", err)
673+
return nil, err
674+
} else {
675+
allStatuses = append(allStatuses, statusesFromLatestTable...)
676+
}
677+
}
678+
679+
pipelinesNotInLatestTable := impl.getPipelineIdsNotInLatestTable(allPipelineIds, latestStatusEntries)
680+
681+
if len(pipelinesNotInLatestTable) > 0 {
682+
statusesFromOldQuery, err := impl.fetchCiStatusUsingFallbackMethod(pipelinesNotInLatestTable)
683+
if err != nil {
684+
impl.Logger.Errorw("error in fetching using fallback method by pipelineIds", "pipelineIds", pipelinesNotInLatestTable, "err", err)
685+
return nil, err
686+
} else {
687+
allStatuses = append(allStatuses, statusesFromOldQuery...)
688+
}
689+
}
690+
691+
return allStatuses, nil
692+
}
693+
694+
// fetchCiWorkflowStatusFromLatestEntries fetches CI status from ci_workflow_status_latest table
695+
func (impl *CiHandlerImpl) fetchCiWorkflowStatusFromLatestEntries(latestCiWorkflowStatusEntries []*pipelineConfig.CiWorkflowStatusLatest) ([]*pipelineConfig.CiWorkflowStatus, error) {
696+
var workflowIds []int
697+
for _, entry := range latestCiWorkflowStatusEntries {
698+
workflowIds = append(workflowIds, entry.CiWorkflowId)
699+
}
700+
701+
workflows, err := impl.ciWorkflowRepository.FindWorkflowsByCiWorkflowIds(workflowIds)
702+
if err != nil {
703+
impl.Logger.Errorw("error in fetching ci workflows by ci workflow ids", "workflowIds", workflowIds, "err", err)
704+
return nil, err
705+
}
706+
707+
var statuses []*pipelineConfig.CiWorkflowStatus
708+
for _, workflow := range workflows {
709+
status := adapter.GetCiWorkflowStatusFromCiWorkflow(workflow)
710+
statuses = append(statuses, status)
711+
}
712+
713+
return statuses, nil
714+
}
715+
716+
// fetchCiStatusUsingFallbackMethod fetches CI status directly from ci_workflow table
717+
func (impl *CiHandlerImpl) fetchCiStatusUsingFallbackMethod(pipelineIds []int) ([]*pipelineConfig.CiWorkflowStatus, error) {
718+
workflows, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(pipelineIds)
719+
if err != nil {
720+
impl.Logger.Errorw("error in fetching ci workflows by ci ids", "pipelineIds", pipelineIds, "err", err)
721+
return nil, err
722+
}
723+
724+
var statuses []*pipelineConfig.CiWorkflowStatus
725+
for _, workflow := range workflows {
726+
status := adapter.GetCiWorkflowStatusFromCiWorkflow(workflow)
727+
statuses = append(statuses, status)
728+
}
729+
730+
return statuses, nil
731+
}
732+
733+
func (impl *CiHandlerImpl) fetchWorkflowsFromLatestTable(latestStatusEntries []*pipelineConfig.CiWorkflowStatusLatest) ([]*pipelineConfig.CiWorkflow, error) {
734+
var workflowIds []int
735+
for _, entry := range latestStatusEntries {
736+
workflowIds = append(workflowIds, entry.CiWorkflowId)
651737
}
652738

653-
return ciWorkflowStatuses, err
739+
return impl.ciWorkflowRepository.FindWorkflowsByCiWorkflowIds(workflowIds)
740+
}
741+
742+
// fetchLastTriggeredWorkflowsHybrid implements hybrid approach for workflow fetching
743+
// Uses latest status table for available pipelines, fallback to complex query for missing pipelines
744+
func (impl *CiHandlerImpl) fetchLastTriggeredWorkflowsHybrid(pipelineIds []int) ([]*pipelineConfig.CiWorkflow, error) {
745+
if len(pipelineIds) == 0 {
746+
return []*pipelineConfig.CiWorkflow{}, nil
747+
}
748+
749+
latestStatusEntries, err := impl.workflowStatusLatestRepository.GetCiWorkflowStatusLatestByPipelineIds(pipelineIds)
750+
if err != nil {
751+
impl.Logger.Errorw("error in checking latest status table, falling back to complex query", "pipelineIds", pipelineIds, "err", err)
752+
return impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(pipelineIds)
753+
}
754+
755+
var allWorkflows []*pipelineConfig.CiWorkflow
756+
757+
if len(latestStatusEntries) > 0 {
758+
workflowsFromLatestTable, err := impl.fetchWorkflowsFromLatestTable(latestStatusEntries)
759+
if err != nil {
760+
impl.Logger.Errorw("error in fetching from latest status table", "latestStatusEntries", latestStatusEntries, "err", err)
761+
return nil, err
762+
} else {
763+
allWorkflows = append(allWorkflows, workflowsFromLatestTable...)
764+
}
765+
}
766+
767+
pipelinesNotInLatestTable := impl.getPipelineIdsNotInLatestTable(pipelineIds, latestStatusEntries)
768+
769+
if len(pipelinesNotInLatestTable) > 0 {
770+
workflowsFromOldQuery, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(pipelinesNotInLatestTable)
771+
if err != nil {
772+
impl.Logger.Errorw("error in fetching using old query by pipeline ids", "pipelineIds", pipelinesNotInLatestTable, "err", err)
773+
return nil, err
774+
} else {
775+
allWorkflows = append(allWorkflows, workflowsFromOldQuery...)
776+
}
777+
}
778+
779+
return allWorkflows, nil
780+
}
781+
782+
// getPipelineIdsNotInLatestTable finds pipeline IDs that are NOT in the latest status table
783+
func (impl *CiHandlerImpl) getPipelineIdsNotInLatestTable(allPipelineIds []int, latestStatusEntries []*pipelineConfig.CiWorkflowStatusLatest) []int {
784+
var pipelinesInLatestTable []int
785+
for _, entry := range latestStatusEntries {
786+
pipelinesInLatestTable = append(pipelinesInLatestTable, entry.PipelineId)
787+
}
788+
pipelineIdMap := make(map[int]bool)
789+
for _, id := range pipelinesInLatestTable {
790+
pipelineIdMap[id] = true
791+
}
792+
793+
var missingPipelineIds []int
794+
for _, id := range allPipelineIds {
795+
if !pipelineIdMap[id] {
796+
missingPipelineIds = append(missingPipelineIds, id)
797+
}
798+
}
799+
return missingPipelineIds
654800
}
655801

656802
func (impl *CiHandlerImpl) FetchCiStatusForTriggerView(appId int) ([]*pipelineConfig.CiWorkflowStatus, error) {
@@ -861,9 +1007,9 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewForEnvironment(request res
8611007
if len(ciPipelineIds) == 0 {
8621008
return ciWorkflowStatuses, nil
8631009
}
864-
latestCiWorkflows, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(ciPipelineIds)
1010+
latestCiWorkflows, err := impl.fetchLastTriggeredWorkflowsHybrid(ciPipelineIds)
8651011
if err != nil && !util.IsErrNoRows(err) {
866-
impl.Logger.Errorw("err", "ciPipelineIds", ciPipelineIds, "err", err)
1012+
impl.Logger.Errorw("err in hybrid ci workflow fetch", "ciPipelineIds", ciPipelineIds, "err", err)
8671013
return ciWorkflowStatuses, err
8681014
}
8691015

pkg/pipeline/adapter/adapter.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,13 @@ func NewMigrateExternalAppValidationRequest(pipeline *bean.CDPipelineConfigObjec
406406
}
407407
return request
408408
}
409+
410+
func GetCiWorkflowStatusFromCiWorkflow(ciWorkflow *pipelineConfig.CiWorkflow) *pipelineConfig.CiWorkflowStatus {
411+
return &pipelineConfig.CiWorkflowStatus{
412+
CiPipelineId: ciWorkflow.CiPipelineId,
413+
CiPipelineName: ciWorkflow.CiPipeline.Name,
414+
CiStatus: ciWorkflow.Status,
415+
StorageConfigured: ciWorkflow.BlobStorageEnabled,
416+
CiWorkflowId: ciWorkflow.Id,
417+
}
418+
}

pkg/workflow/wire_workflow.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ import (
2222
"github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/hook"
2323
"github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/repository"
2424
"github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service"
25+
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
2526
"github.com/google/wire"
2627
)
2728

2829
var WorkflowWireSet = wire.NewSet(
2930
cd.CdWorkflowWireSet,
3031
status.WorkflowStatusWireSet,
31-
status.WorkflowStatusLatestWireSet,
32+
workflowStatusLatest.WorkflowStatusLatestWireSet,
3233
hook.NewTriggerAuditHookImpl,
3334
wire.Bind(new(hook.TriggerAuditHook), new(*hook.TriggerAuditHookImpl)),
3435
service.NewWorkflowTriggerAuditServiceImpl,

0 commit comments

Comments
 (0)