Skip to content

Commit 716b841

Browse files
authored
Merge pull request #6744 from devtron-labs/optimize-ci-cd-workflow
feat: optimize ci cd workflow
2 parents 28bd893 + 797bc2c commit 716b841

19 files changed

+929
-70
lines changed

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
469469
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
470470
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
471471
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
472+
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
472473
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
473474
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
474475
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

internal/sql/repository/pipelineConfig/CiWorkflowRepository.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type CiWorkflowRepository interface {
4040
FindByName(name string) (*CiWorkflow, error)
4141

4242
FindLastTriggeredWorkflowByCiIds(pipelineId []int) (ciWorkflow []*CiWorkflow, err error)
43+
FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error)
4344
FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error)
4445
FindAllLastTriggeredWorkflowByArtifactId(ciArtifactId []int) (ciWorkflow []*CiWorkflow, err error)
4546
FindAllTriggeredWorkflowCountInLast24Hour() (ciWorkflowCount int, err error)
@@ -48,6 +49,7 @@ type CiWorkflowRepository interface {
4849
ExistsByStatus(status string) (bool, error)
4950
FindBuildTypeAndStatusDataOfLast1Day() ([]*BuildTypeCount, error)
5051
FIndCiWorkflowStatusesByAppId(appId int) ([]*CiWorkflowStatus, error)
52+
FindCiPipelineIdsByAppId(appId int) ([]int, error)
5153

5254
MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool)
5355
MigrateCiArtifactLocation(wfId int, artifactLocation string)
@@ -290,6 +292,19 @@ func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByCiIds(pipelineI
290292
return ciWorkflow, err
291293
}
292294

295+
// FindWorkflowsByCiWorkflowIds fetches workflows by their workflow IDs (simple query)
296+
func (impl *CiWorkflowRepositoryImpl) FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error) {
297+
if len(ciWorkflowIds) == 0 {
298+
return ciWorkflow, nil
299+
}
300+
301+
err = impl.dbConnection.Model(&ciWorkflow).
302+
Column("ci_workflow.*", "CiPipeline").
303+
Where("ci_workflow.id IN (?)", pg.In(ciWorkflowIds)).
304+
Select()
305+
return ciWorkflow, err
306+
}
307+
293308
func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error) {
294309
workflow := &CiWorkflow{}
295310
err = impl.dbConnection.Model(workflow).
@@ -379,6 +394,16 @@ func (impl *CiWorkflowRepositoryImpl) FIndCiWorkflowStatusesByAppId(appId int) (
379394
return ciworkflowStatuses, err
380395
}
381396

397+
// FindCiPipelineIdsByAppId gets all CI pipeline IDs for an app (simple query)
398+
func (impl *CiWorkflowRepositoryImpl) FindCiPipelineIdsByAppId(appId int) ([]int, error) {
399+
var ciPipelineIds []int
400+
err := impl.dbConnection.Model((*CiPipeline)(nil)).
401+
Column("id").
402+
Where("app_id = ? AND deleted = false", appId).
403+
Select(&ciPipelineIds)
404+
return ciPipelineIds, err
405+
}
406+
382407
func (impl *CiWorkflowRepositoryImpl) MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool) {
383408
_, err := impl.dbConnection.Model((*CiWorkflow)(nil)).
384409
Set("is_artifact_uploaded = ?", workflow.GetArtifactUploadedType(isArtifactUploaded)).
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright (c) 2024. Devtron Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pipelineConfig
18+
19+
import (
20+
"github.com/devtron-labs/devtron/pkg/sql"
21+
"github.com/go-pg/pg"
22+
"github.com/go-pg/pg/orm"
23+
"go.uber.org/zap"
24+
)
25+
26+
type WorkflowStatusLatestRepository interface {
27+
// CI Workflow Status Latest methods
28+
SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error
29+
UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error
30+
GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error)
31+
GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error)
32+
33+
// CD Workflow Status Latest methods
34+
SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error
35+
UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error
36+
GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error)
37+
GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error)
38+
}
39+
40+
type WorkflowStatusLatestRepositoryImpl struct {
41+
dbConnection *pg.DB
42+
logger *zap.SugaredLogger
43+
}
44+
45+
func NewWorkflowStatusLatestRepositoryImpl(dbConnection *pg.DB, logger *zap.SugaredLogger) *WorkflowStatusLatestRepositoryImpl {
46+
return &WorkflowStatusLatestRepositoryImpl{
47+
dbConnection: dbConnection,
48+
logger: logger,
49+
}
50+
}
51+
52+
// CI Workflow Status Latest model
53+
type CiWorkflowStatusLatest struct {
54+
tableName struct{} `sql:"ci_workflow_status_latest" pg:",discard_unknown_columns"`
55+
Id int `sql:"id,pk"`
56+
PipelineId int `sql:"pipeline_id"`
57+
AppId int `sql:"app_id"`
58+
CiWorkflowId int `sql:"ci_workflow_id"`
59+
sql.AuditLog
60+
}
61+
62+
// CD Workflow Status Latest model
63+
type CdWorkflowStatusLatest struct {
64+
tableName struct{} `sql:"cd_workflow_status_latest" pg:",discard_unknown_columns"`
65+
Id int `sql:"id,pk"`
66+
PipelineId int `sql:"pipeline_id"`
67+
AppId int `sql:"app_id"`
68+
EnvironmentId int `sql:"environment_id"`
69+
WorkflowType string `sql:"workflow_type"`
70+
WorkflowRunnerId int `sql:"workflow_runner_id"`
71+
sql.AuditLog
72+
}
73+
74+
// CI Workflow Status Latest methods implementation
75+
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error {
76+
var connection orm.DB
77+
if tx != nil {
78+
connection = tx
79+
} else {
80+
connection = impl.dbConnection
81+
}
82+
err := connection.Insert(model)
83+
if err != nil {
84+
impl.logger.Errorw("error in saving ci workflow status latest", "err", err, "model", model)
85+
return err
86+
}
87+
return nil
88+
}
89+
90+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error) {
91+
if len(pipelineIds) == 0 {
92+
return []*CiWorkflowStatusLatest{}, nil
93+
}
94+
95+
var models []*CiWorkflowStatusLatest
96+
err := impl.dbConnection.Model(&models).
97+
Where("pipeline_id IN (?)", pg.In(pipelineIds)).
98+
Select()
99+
if err != nil {
100+
impl.logger.Errorw("error in getting ci workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds)
101+
return nil, err
102+
}
103+
return models, nil
104+
}
105+
106+
func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error {
107+
var connection orm.DB
108+
if tx != nil {
109+
connection = tx
110+
} else {
111+
connection = impl.dbConnection
112+
}
113+
_, err := connection.Model(model).WherePK().Update()
114+
if err != nil {
115+
impl.logger.Errorw("error in updating ci workflow status latest", "err", err, "model", model)
116+
return err
117+
}
118+
return nil
119+
}
120+
121+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error) {
122+
var model CiWorkflowStatusLatest
123+
err := impl.dbConnection.Model(&model).
124+
Where("pipeline_id = ?", pipelineId).
125+
Select()
126+
if err != nil {
127+
impl.logger.Errorw("error in getting ci workflow status latest by pipeline id", "err", err, "pipelineId", pipelineId)
128+
return nil, err
129+
}
130+
return &model, nil
131+
}
132+
133+
// CD Workflow Status Latest methods implementation
134+
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
135+
var connection orm.DB
136+
if tx != nil {
137+
connection = tx
138+
} else {
139+
connection = impl.dbConnection
140+
}
141+
err := connection.Insert(model)
142+
if err != nil {
143+
impl.logger.Errorw("error in saving cd workflow status latest", "err", err, "model", model)
144+
return err
145+
}
146+
return nil
147+
}
148+
149+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error) {
150+
var models []*CdWorkflowStatusLatest
151+
err := impl.dbConnection.Model(&models).
152+
Where("pipeline_id IN (?)", pg.In(pipelineIds)).
153+
Select()
154+
if err != nil {
155+
impl.logger.Errorw("error in getting cd workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds)
156+
return nil, err
157+
}
158+
return models, nil
159+
}
160+
161+
func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
162+
var connection orm.DB
163+
if tx != nil {
164+
connection = tx
165+
} else {
166+
connection = impl.dbConnection
167+
}
168+
_, err := connection.Model(model).WherePK().Update()
169+
if err != nil {
170+
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "model", model)
171+
return err
172+
}
173+
return nil
174+
}
175+
176+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) {
177+
var connection orm.DB
178+
if tx != nil {
179+
connection = tx
180+
} else {
181+
connection = impl.dbConnection
182+
}
183+
184+
var model CdWorkflowStatusLatest
185+
err := connection.Model(&model).
186+
Where("pipeline_id = ? AND workflow_type = ?", pipelineId, workflowType).
187+
Select()
188+
if err != nil {
189+
impl.logger.Errorw("error in getting cd workflow status latest by pipeline id and workflow type", "err", err, "pipelineId", pipelineId, "workflowType", workflowType)
190+
return nil, err
191+
}
192+
return &model, nil
193+
}

pkg/deployment/trigger/devtronApps/HandlerService.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020
"bufio"
2121
"context"
2222
"github.com/devtron-labs/common-lib/async"
23-
service2 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service"
2423
"github.com/devtron-labs/devtron/client/fluxcd"
24+
service2 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service"
25+
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
2526
"os"
2627
"time"
2728

@@ -174,6 +175,7 @@ type HandlerServiceImpl struct {
174175
asyncRunnable *async.Runnable
175176
workflowTriggerAuditService service2.WorkflowTriggerAuditService
176177
fluxCdDeploymentService fluxcd.DeploymentService
178+
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService
177179
}
178180

179181
func NewHandlerServiceImpl(logger *zap.SugaredLogger,
@@ -238,7 +240,8 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
238240
deploymentEventHandler app.DeploymentEventHandler,
239241
asyncRunnable *async.Runnable,
240242
workflowTriggerAuditService service2.WorkflowTriggerAuditService,
241-
fluxCdDeploymentService fluxcd.DeploymentService) (*HandlerServiceImpl, error) {
243+
fluxCdDeploymentService fluxcd.DeploymentService,
244+
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService) (*HandlerServiceImpl, error) {
242245
impl := &HandlerServiceImpl{
243246
logger: logger,
244247
cdWorkflowCommonService: cdWorkflowCommonService,
@@ -307,7 +310,8 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
307310
deploymentEventHandler: deploymentEventHandler,
308311
asyncRunnable: asyncRunnable,
309312
workflowTriggerAuditService: workflowTriggerAuditService,
310-
fluxCdDeploymentService: fluxCdDeploymentService,
313+
fluxCdDeploymentService: fluxCdDeploymentService,
314+
workflowStatusLatestService: workflowStatusLatestService,
311315
}
312316
config, err := types.GetCdConfig()
313317
if err != nil {

pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,20 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte
269269
}
270270
cdWorkflowId = cdWf.Id
271271
}
272-
272+
tx, err := impl.cdWorkflowRepository.GetConnection().Begin()
273+
if err != nil {
274+
impl.logger.Errorw("error in getting connection for cdWorkflowRepository, ManualCdTrigger", "err", err)
275+
return 0, "", nil, err
276+
}
277+
isRollbackNeeded := true
278+
defer func() {
279+
if isRollbackNeeded {
280+
err = tx.Rollback()
281+
if err != nil {
282+
impl.logger.Errorw("error in rolling back transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
283+
}
284+
}
285+
}()
273286
runner := &pipelineConfig.CdWorkflowRunner{
274287
Name: cdPipeline.Name,
275288
WorkflowType: bean3.CD_WORKFLOW_TYPE_DEPLOY,
@@ -282,11 +295,26 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte
282295
AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: overrideRequest.UserId, UpdatedOn: triggeredAt, UpdatedBy: overrideRequest.UserId},
283296
ReferenceId: triggerContext.ReferenceId,
284297
}
285-
err = impl.cdWorkflowRunnerService.SaveWfr(nil, runner)
298+
299+
err = impl.cdWorkflowRunnerService.SaveWfr(tx, runner)
286300
if err != nil {
287301
impl.logger.Errorw("err in creating cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
288302
return 0, "", nil, err
289303
}
304+
305+
err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, overrideRequest.PipelineId, overrideRequest.AppId, overrideRequest.EnvId, runner.Id, runner.WorkflowType.String(), overrideRequest.UserId)
306+
if err != nil {
307+
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", overrideRequest.WfrId, "err", err)
308+
return 0, "", nil, err
309+
}
310+
311+
isRollbackNeeded = false
312+
err = tx.Commit()
313+
if err != nil {
314+
impl.logger.Errorw("error in committing transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
315+
return 0, "", nil, err
316+
}
317+
290318
runner.CdWorkflow = &pipelineConfig.CdWorkflow{
291319
Pipeline: cdPipeline,
292320
}
@@ -407,6 +435,22 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge
407435
}
408436
}
409437

438+
tx, err := impl.cdWorkflowRepository.GetConnection().Begin()
439+
if err != nil {
440+
impl.logger.Errorw("error in getting connection for cdWorkflowRepository, ManualCdTrigger", "err", err)
441+
return err
442+
}
443+
444+
isRollbackNeeded := true
445+
defer func() {
446+
if isRollbackNeeded {
447+
err = tx.Rollback()
448+
if err != nil {
449+
impl.logger.Errorw("error in rolling back transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWf.Id, "err", err)
450+
}
451+
}
452+
}()
453+
410454
runner := &pipelineConfig.CdWorkflowRunner{
411455
Name: pipeline.Name,
412456
WorkflowType: bean3.CD_WORKFLOW_TYPE_DEPLOY,
@@ -419,10 +463,23 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge
419463
AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: triggeredBy, UpdatedOn: triggeredAt, UpdatedBy: triggeredBy},
420464
ReferenceId: request.TriggerContext.ReferenceId,
421465
}
422-
err := impl.cdWorkflowRunnerService.SaveWfr(nil, runner)
466+
err = impl.cdWorkflowRunnerService.SaveWfr(tx, runner)
467+
if err != nil {
468+
return err
469+
}
470+
471+
err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, pipeline.Id, pipeline.AppId, pipeline.EnvironmentId, runner.Id, runner.WorkflowType.String(), request.TriggeredBy)
472+
if err != nil {
473+
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", runner.Id, "err", err)
474+
return err
475+
}
476+
477+
err = tx.Commit()
423478
if err != nil {
479+
impl.logger.Errorw("error in committing transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWf.Id, "err", err)
424480
return err
425481
}
482+
isRollbackNeeded = false
426483
runner.CdWorkflow = &pipelineConfig.CdWorkflow{
427484
Pipeline: pipeline,
428485
}

0 commit comments

Comments
 (0)