Skip to content

Commit 677f569

Browse files
authored
Merge pull request #6748 from devtron-labs/cd-get-optimised
enhancement: cd workflow status get optimisation
2 parents b3e96fc + 03f78d4 commit 677f569

File tree

10 files changed

+282
-49
lines changed

10 files changed

+282
-49
lines changed

internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package pipelineConfig
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
apiBean "github.com/devtron-labs/devtron/api/bean"
2324
"github.com/devtron-labs/devtron/internal/sql/repository"
2425
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig/bean/workflow"
@@ -28,6 +29,7 @@ import (
2829
"github.com/go-pg/pg"
2930
"go.opentelemetry.io/otel"
3031
"go.uber.org/zap"
32+
"strings"
3133
"time"
3234
)
3335

@@ -69,7 +71,7 @@ type CdWorkflowRepository interface {
6971
IsLatestWf(pipelineId int, wfId int) (bool, error)
7072
FindLatestCdWorkflowByPipelineId(pipelineIds []int) (*CdWorkflow, error)
7173
FindLatestCdWorkflowByPipelineIdV2(pipelineIds []int) ([]*CdWorkflow, error)
72-
FetchAllCdStagesLatestEntity(pipelineIds []int) ([]*CdWorkflowStatus, error)
74+
FetchAllCdStagesLatestEntity(pipelineWorkflowPairs map[int]apiBean.WorkflowType) ([]*CdWorkflowStatus, error)
7375
FetchAllCdStagesLatestEntityStatus(wfrIds []int) ([]*CdWorkflowRunner, error)
7476
ExistsByStatus(status string) (bool, error)
7577
FetchEnvAllCdStagesLatestEntityStatus(wfrIds []int, envID int) ([]*CdWorkflowRunner, error)
@@ -578,15 +580,15 @@ func (impl *CdWorkflowRepositoryImpl) IsLatestWf(pipelineId int, wfId int) (bool
578580
return !exists, err
579581
}
580582

581-
func (impl *CdWorkflowRepositoryImpl) FetchAllCdStagesLatestEntity(pipelineIds []int) ([]*CdWorkflowStatus, error) {
583+
func (impl *CdWorkflowRepositoryImpl) FetchAllCdStagesLatestEntity(pipelineWorkflowPairs map[int]apiBean.WorkflowType) ([]*CdWorkflowStatus, error) {
582584
var cdWorkflowStatus []*CdWorkflowStatus
583-
if len(pipelineIds) == 0 {
585+
if len(pipelineWorkflowPairs) == 0 {
584586
return cdWorkflowStatus, nil
585587
}
586588
query := "select p.ci_pipeline_id, wf.pipeline_id, wfr.workflow_type, max(wfr.id) as wfr_id from cd_workflow_runner wfr" +
587589
" inner join cd_workflow wf on wf.id=wfr.cd_workflow_id" +
588590
" inner join pipeline p on p.id = wf.pipeline_id" +
589-
" where wf.pipeline_id in (" + sqlIntSeq(pipelineIds) + ")" +
591+
" where (wf.pipeline_id, wfr.workflow_type) in (" + buildPipelineTypeValuesList(pipelineWorkflowPairs) + ")" +
590592
" group by p.ci_pipeline_id, wf.pipeline_id, wfr.workflow_type order by wfr_id desc;"
591593
_, err := impl.dbConnection.Query(&cdWorkflowStatus, query)
592594
if err != nil {
@@ -596,6 +598,14 @@ func (impl *CdWorkflowRepositoryImpl) FetchAllCdStagesLatestEntity(pipelineIds [
596598
return cdWorkflowStatus, nil
597599
}
598600

601+
func buildPipelineTypeValuesList(pairs map[int]apiBean.WorkflowType) string {
602+
var values []string
603+
for pipelineId, workflowType := range pairs {
604+
values = append(values, fmt.Sprintf("(%d,'%s')", pipelineId, workflowType))
605+
}
606+
return strings.Join(values, ",")
607+
}
608+
599609
func (impl *CdWorkflowRepositoryImpl) FetchAllCdStagesLatestEntityStatus(wfrIds []int) ([]*CdWorkflowRunner, error) {
600610
var wfrList []*CdWorkflowRunner
601611
err := impl.dbConnection.Model(&wfrList).

internal/sql/repository/pipelineConfig/WorkflowStatusLatestRepository.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package pipelineConfig
1919
import (
2020
"github.com/devtron-labs/devtron/pkg/sql"
2121
"github.com/go-pg/pg"
22+
"github.com/go-pg/pg/orm"
2223
"go.uber.org/zap"
2324
)
2425

@@ -32,12 +33,13 @@ type WorkflowStatusLatestRepository interface {
3233
DeleteCiWorkflowStatusLatestByPipelineId(pipelineId int) error
3334

3435
// CD Workflow Status Latest methods
35-
SaveCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error
36-
UpdateCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error
37-
GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error)
36+
SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error
37+
UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error
38+
GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error)
3839
GetCdWorkflowStatusLatestByAppId(appId int) ([]*CdWorkflowStatusLatest, error)
3940
GetCdWorkflowStatusLatestByPipelineId(pipelineId int) ([]*CdWorkflowStatusLatest, error)
4041
DeleteCdWorkflowStatusLatestByPipelineId(pipelineId int) error
42+
GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error)
4143
}
4244

4345
type WorkflowStatusLatestRepositoryImpl struct {
@@ -145,27 +147,45 @@ func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipel
145147
}
146148

147149
// CD Workflow Status Latest methods implementation
148-
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error {
149-
err := impl.dbConnection.Insert(model)
150+
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
151+
var connection orm.DB
152+
if tx != nil {
153+
connection = tx
154+
} else {
155+
connection = impl.dbConnection
156+
}
157+
err := connection.Insert(model)
150158
if err != nil {
151159
impl.logger.Errorw("error in saving cd workflow status latest", "err", err, "model", model)
152160
return err
153161
}
154162
return nil
155163
}
156164

157-
func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error {
158-
_, err := impl.dbConnection.Model(model).WherePK().UpdateNotNull()
165+
func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
166+
var connection orm.DB
167+
if tx != nil {
168+
connection = tx
169+
} else {
170+
connection = impl.dbConnection
171+
}
172+
_, err := connection.Model(model).WherePK().UpdateNotNull()
159173
if err != nil {
160174
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "model", model)
161175
return err
162176
}
163177
return nil
164178
}
165179

166-
func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) {
180+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) {
167181
model := &CdWorkflowStatusLatest{}
168-
err := impl.dbConnection.Model(model).
182+
var connection orm.DB
183+
if tx != nil {
184+
connection = tx
185+
} else {
186+
connection = impl.dbConnection
187+
}
188+
err := connection.Model(model).
169189
Where("pipeline_id = ?", pipelineId).
170190
Where("workflow_type = ?", workflowType).
171191
Select()
@@ -210,3 +230,15 @@ func (impl *WorkflowStatusLatestRepositoryImpl) DeleteCdWorkflowStatusLatestByPi
210230
}
211231
return nil
212232
}
233+
234+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error) {
235+
var models []*CdWorkflowStatusLatest
236+
err := impl.dbConnection.Model(&models).
237+
Where("pipeline_id IN (?)", pg.In(pipelineIds)).
238+
Select()
239+
if err != nil {
240+
impl.logger.Errorw("error in getting cd workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds)
241+
return nil, err
242+
}
243+
return models, nil
244+
}

pkg/deployment/trigger/devtronApps/HandlerService.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020
"bufio"
2121
"context"
2222
"github.com/devtron-labs/common-lib/async"
23-
service2 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service"
2423
"github.com/devtron-labs/devtron/client/fluxcd"
24+
service2 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service"
25+
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
2526
"os"
2627
"time"
2728

@@ -174,6 +175,7 @@ type HandlerServiceImpl struct {
174175
asyncRunnable *async.Runnable
175176
workflowTriggerAuditService service2.WorkflowTriggerAuditService
176177
fluxCdDeploymentService fluxcd.DeploymentService
178+
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
177179
}
178180

179181
func NewHandlerServiceImpl(logger *zap.SugaredLogger,
@@ -238,7 +240,8 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
238240
deploymentEventHandler app.DeploymentEventHandler,
239241
asyncRunnable *async.Runnable,
240242
workflowTriggerAuditService service2.WorkflowTriggerAuditService,
241-
fluxCdDeploymentService fluxcd.DeploymentService) (*HandlerServiceImpl, error) {
243+
fluxCdDeploymentService fluxcd.DeploymentService,
244+
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService) (*HandlerServiceImpl, error) {
242245
impl := &HandlerServiceImpl{
243246
logger: logger,
244247
cdWorkflowCommonService: cdWorkflowCommonService,
@@ -307,7 +310,8 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
307310
deploymentEventHandler: deploymentEventHandler,
308311
asyncRunnable: asyncRunnable,
309312
workflowTriggerAuditService: workflowTriggerAuditService,
310-
fluxCdDeploymentService: fluxCdDeploymentService,
313+
fluxCdDeploymentService: fluxCdDeploymentService,
314+
workflowStatusUpdateService: workflowStatusUpdateService,
311315
}
312316
config, err := types.GetCdConfig()
313317
if err != nil {

pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,20 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte
269269
}
270270
cdWorkflowId = cdWf.Id
271271
}
272-
272+
tx, err := impl.cdWorkflowRepository.GetConnection().Begin()
273+
if err != nil {
274+
impl.logger.Errorw("error in getting connection for cdWorkflowRepository, ManualCdTrigger", "err", err)
275+
return 0, "", nil, err
276+
}
277+
isRollbackNeeded := true
278+
defer func() {
279+
if isRollbackNeeded {
280+
err = tx.Rollback()
281+
if err != nil {
282+
impl.logger.Errorw("error in rolling back transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
283+
}
284+
}
285+
}()
273286
runner := &pipelineConfig.CdWorkflowRunner{
274287
Name: cdPipeline.Name,
275288
WorkflowType: bean3.CD_WORKFLOW_TYPE_DEPLOY,
@@ -282,11 +295,26 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte
282295
AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: overrideRequest.UserId, UpdatedOn: triggeredAt, UpdatedBy: overrideRequest.UserId},
283296
ReferenceId: triggerContext.ReferenceId,
284297
}
285-
err = impl.cdWorkflowRunnerService.SaveWfr(nil, runner)
298+
299+
err = impl.cdWorkflowRunnerService.SaveWfr(tx, runner)
286300
if err != nil {
287301
impl.logger.Errorw("err in creating cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
288302
return 0, "", nil, err
289303
}
304+
305+
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(tx, overrideRequest.PipelineId, overrideRequest.AppId, overrideRequest.EnvId, runner.Id, runner.WorkflowType.String(), overrideRequest.UserId)
306+
if err != nil {
307+
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", overrideRequest.WfrId, "err", err)
308+
return 0, "", nil, err
309+
}
310+
311+
isRollbackNeeded = false
312+
err = tx.Commit()
313+
if err != nil {
314+
impl.logger.Errorw("error in committing transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
315+
return 0, "", nil, err
316+
}
317+
290318
runner.CdWorkflow = &pipelineConfig.CdWorkflow{
291319
Pipeline: cdPipeline,
292320
}
@@ -407,6 +435,22 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge
407435
}
408436
}
409437

438+
tx, err := impl.cdWorkflowRepository.GetConnection().Begin()
439+
if err != nil {
440+
impl.logger.Errorw("error in getting connection for cdWorkflowRepository, ManualCdTrigger", "err", err)
441+
return err
442+
}
443+
444+
isRollbackNeeded := true
445+
defer func() {
446+
if isRollbackNeeded {
447+
err = tx.Rollback()
448+
if err != nil {
449+
impl.logger.Errorw("error in rolling back transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWf.Id, "err", err)
450+
}
451+
}
452+
}()
453+
410454
runner := &pipelineConfig.CdWorkflowRunner{
411455
Name: pipeline.Name,
412456
WorkflowType: bean3.CD_WORKFLOW_TYPE_DEPLOY,
@@ -419,10 +463,23 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge
419463
AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: triggeredBy, UpdatedOn: triggeredAt, UpdatedBy: triggeredBy},
420464
ReferenceId: request.TriggerContext.ReferenceId,
421465
}
422-
err := impl.cdWorkflowRunnerService.SaveWfr(nil, runner)
466+
err = impl.cdWorkflowRunnerService.SaveWfr(tx, runner)
467+
if err != nil {
468+
return err
469+
}
470+
471+
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(tx, pipeline.Id, pipeline.AppId, pipeline.EnvironmentId, runner.Id, runner.WorkflowType.String(), request.TriggeredBy)
472+
if err != nil {
473+
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", runner.Id, "err", err)
474+
return err
475+
}
476+
477+
err = tx.Commit()
423478
if err != nil {
479+
impl.logger.Errorw("error in committing transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWf.Id, "err", err)
424480
return err
425481
}
482+
isRollbackNeeded = false
426483
runner.CdWorkflow = &pipelineConfig.CdWorkflow{
427484
Pipeline: pipeline,
428485
}

pkg/deployment/trigger/devtronApps/preStageHandlerCode.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func (impl *HandlerServiceImpl) createStartingWfAndRunner(request bean.CdTrigger
312312
ReferenceId: request.TriggerContext.ReferenceId,
313313
}
314314
_, span := otel.Tracer("orchestrator").Start(ctx, "cdWorkflowRepository.SaveWorkFlowRunner")
315-
_, err = impl.cdWorkflowRunnerService.SaveCDWorkflowRunnerWithStage(runner)
315+
_, err = impl.cdWorkflowRunnerService.SaveCDWorkflowRunnerWithStage(runner, cdWf, pipeline)
316316
span.End()
317317
if err != nil {
318318
return nil, nil, err

0 commit comments

Comments
 (0)