Skip to content

Commit df5fb33

Browse files
committed
Remove WorkflowStatusUpdateService and update references to streamline workflow status handling in CI/CD services.
1 parent 714107a commit df5fb33

File tree

9 files changed

+54
-207
lines changed

9 files changed

+54
-207
lines changed

pkg/deployment/trigger/devtronApps/HandlerService.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ type HandlerServiceImpl struct {
175175
asyncRunnable *async.Runnable
176176
workflowTriggerAuditService service2.WorkflowTriggerAuditService
177177
fluxCdDeploymentService fluxcd.DeploymentService
178-
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
178+
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService
179179
}
180180

181181
func NewHandlerServiceImpl(logger *zap.SugaredLogger,
@@ -241,7 +241,7 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
241241
asyncRunnable *async.Runnable,
242242
workflowTriggerAuditService service2.WorkflowTriggerAuditService,
243243
fluxCdDeploymentService fluxcd.DeploymentService,
244-
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService) (*HandlerServiceImpl, error) {
244+
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService) (*HandlerServiceImpl, error) {
245245
impl := &HandlerServiceImpl{
246246
logger: logger,
247247
cdWorkflowCommonService: cdWorkflowCommonService,
@@ -311,7 +311,7 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
311311
asyncRunnable: asyncRunnable,
312312
workflowTriggerAuditService: workflowTriggerAuditService,
313313
fluxCdDeploymentService: fluxCdDeploymentService,
314-
workflowStatusUpdateService: workflowStatusUpdateService,
314+
workflowStatusLatestService: workflowStatusLatestService,
315315
}
316316
config, err := types.GetCdConfig()
317317
if err != nil {

pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte
302302
return 0, "", nil, err
303303
}
304304

305-
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(tx, overrideRequest.PipelineId, overrideRequest.AppId, overrideRequest.EnvId, runner.Id, runner.WorkflowType.String(), overrideRequest.UserId)
305+
err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, overrideRequest.PipelineId, overrideRequest.AppId, overrideRequest.EnvId, runner.Id, runner.WorkflowType.String(), overrideRequest.UserId)
306306
if err != nil {
307307
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", overrideRequest.WfrId, "err", err)
308308
return 0, "", nil, err
@@ -468,7 +468,7 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge
468468
return err
469469
}
470470

471-
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(tx, pipeline.Id, pipeline.AppId, pipeline.EnvironmentId, runner.Id, runner.WorkflowType.String(), request.TriggeredBy)
471+
err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, pipeline.Id, pipeline.AppId, pipeline.EnvironmentId, runner.Id, runner.WorkflowType.String(), request.TriggeredBy)
472472
if err != nil {
473473
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", runner.Id, "err", err)
474474
return err

pkg/pipeline/CiHandler.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/devtron-labs/devtron/pkg/pipeline/adapter"
3131
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
3232
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
33-
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
3433
"regexp"
3534
"slices"
3635
"strconv"
@@ -100,7 +99,6 @@ type CiHandlerImpl struct {
10099
k8sCommonService k8sPkg.K8sCommonService
101100
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService
102101
workflowStatusLatestRepository pipelineConfig.WorkflowStatusLatestRepository
103-
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
104102
}
105103

106104
func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository, gitSensorClient gitSensor.Client, ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
@@ -109,7 +107,6 @@ func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipeline
109107
imageTaggingService imageTagging.ImageTaggingService, k8sCommonService k8sPkg.K8sCommonService, appWorkflowRepository appWorkflow.AppWorkflowRepository, customTagService CustomTagService,
110108
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService,
111109
workflowStatusLatestRepository pipelineConfig.WorkflowStatusLatestRepository,
112-
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService,
113110
) *CiHandlerImpl {
114111
cih := &CiHandlerImpl{
115112
Logger: Logger,
@@ -133,7 +130,6 @@ func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipeline
133130
k8sCommonService: k8sCommonService,
134131
workFlowStageStatusService: workFlowStageStatusService,
135132
workflowStatusLatestRepository: workflowStatusLatestRepository,
136-
workflowStatusUpdateService: workflowStatusUpdateService,
137133
}
138134
config, err := types.GetCiConfig()
139135
if err != nil {

pkg/pipeline/CiService.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,25 +46,28 @@ type CiServiceImpl struct {
4646
eventFactory client.EventFactory
4747
config *types.CiConfig
4848
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
49+
ciPipelineRepository pipelineConfig.CiPipelineRepository
4950
transactionManager sql.TransactionWrapper
50-
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
51+
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService
5152
}
5253

5354
func NewCiServiceImpl(Logger *zap.SugaredLogger,
5455
workflowStageStatusService workflowStatus.WorkFlowStageStatusService, eventClient client.EventClient,
5556
eventFactory client.EventFactory,
5657
ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
58+
ciPipelineRepository pipelineConfig.CiPipelineRepository,
5759
transactionManager sql.TransactionWrapper,
58-
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService,
60+
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService,
5961
) *CiServiceImpl {
6062
cis := &CiServiceImpl{
6163
Logger: Logger,
6264
workflowStageStatusService: workflowStageStatusService,
6365
eventClient: eventClient,
6466
eventFactory: eventFactory,
6567
ciWorkflowRepository: ciWorkflowRepository,
68+
ciPipelineRepository: ciPipelineRepository,
6669
transactionManager: transactionManager,
67-
workflowStatusUpdateService: workflowStatusUpdateService,
70+
workflowStatusLatestService: workflowStatusLatestService,
6871
}
6972
config, err := types.GetCiConfig()
7073
if err != nil {
@@ -135,9 +138,17 @@ func (impl *CiServiceImpl) SaveCiWorkflowWithStage(wf *pipelineConfig.CiWorkflow
135138
return err
136139
}
137140

138-
err = impl.workflowStatusUpdateService.SaveCiWorkflowStatusLatest(tx, wf.CiPipelineId, wf.Id, wf.TriggeredBy)
141+
// Get appId from CI pipeline (not from workflow to avoid transaction issues)
142+
ciPipeline, err := impl.ciPipelineRepository.FindById(wf.CiPipelineId)
139143
if err != nil {
140-
impl.Logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", wf.CiPipelineId, "workflowId", wf.Id)
144+
impl.Logger.Errorw("error in fetching ci pipeline for appId", "err", err, "ciPipelineId", wf.CiPipelineId)
145+
return err
146+
}
147+
appId := ciPipeline.AppId
148+
149+
err = impl.workflowStatusLatestService.SaveCiWorkflowStatusLatest(tx, wf.CiPipelineId, appId, wf.Id, wf.TriggeredBy)
150+
if err != nil {
151+
impl.Logger.Errorw("error in saving ci workflow status latest", "err", err, "pipelineId", wf.CiPipelineId, "workflowId", wf.Id)
141152
return err
142153
}
143154

pkg/workflow/cd/CdWorkflowRunnerService.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,20 @@ type CdWorkflowRunnerServiceImpl struct {
4949
workflowStageService workflowStatus.WorkFlowStageStatusService
5050
transactionManager sql.TransactionWrapper
5151
config *types.CiConfig
52-
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
52+
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService
5353
}
5454

5555
func NewCdWorkflowRunnerServiceImpl(logger *zap.SugaredLogger,
5656
cdWorkflowRepository pipelineConfig.CdWorkflowRepository,
5757
workflowStageService workflowStatus.WorkFlowStageStatusService,
5858
transactionManager sql.TransactionWrapper,
59-
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService) *CdWorkflowRunnerServiceImpl {
59+
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService) *CdWorkflowRunnerServiceImpl {
6060
impl := &CdWorkflowRunnerServiceImpl{
6161
logger: logger,
6262
cdWorkflowRepository: cdWorkflowRepository,
6363
workflowStageService: workflowStageService,
6464
transactionManager: transactionManager,
65-
workflowStatusUpdateService: workflowStatusUpdateService,
65+
workflowStatusLatestService: workflowStatusLatestService,
6666
}
6767
ciConfig, err := types.GetCiConfig()
6868
if err != nil {
@@ -121,7 +121,7 @@ func (impl *CdWorkflowRunnerServiceImpl) SaveCDWorkflowRunnerWithStage(wfr *pipe
121121
return wfr, err
122122
}
123123

124-
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(tx, cdWf.PipelineId, pipeline.AppId, pipeline.EnvironmentId, wfr.Id, wfr.WorkflowType.String(), wfr.CreatedBy)
124+
err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, cdWf.PipelineId, pipeline.AppId, pipeline.EnvironmentId, wfr.Id, wfr.WorkflowType.String(), wfr.CreatedBy)
125125
if err != nil {
126126
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", wfr.CreatedBy, "err", err)
127127
return wfr, err

pkg/workflow/workflowStatusLatest/WorkflowStatusLatestService.go

Lines changed: 25 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
type WorkflowStatusLatestService interface {
3232
// CI Workflow Status Latest methods
33-
SaveCiWorkflowStatusLatest(tx *pg.Tx, pipelineId, ciWorkflowId int, userId int32) error
33+
SaveCiWorkflowStatusLatest(tx *pg.Tx, pipelineId, appId, ciWorkflowId int, userId int32) error
3434
GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error)
3535
GetCiWorkflowStatusLatestByAppId(appId int) ([]*CiWorkflowStatusLatest, error)
3636

@@ -83,53 +83,25 @@ type CdWorkflowStatusLatest struct {
8383
Status string `json:"status"` // Derived from cd_workflow_runner table
8484
}
8585

86-
func (impl *WorkflowStatusLatestServiceImpl) SaveCiWorkflowStatusLatest(tx *pg.Tx, pipelineId, ciWorkflowId int, userId int32) error {
86+
func (impl *WorkflowStatusLatestServiceImpl) SaveCiWorkflowStatusLatest(tx *pg.Tx, pipelineId, appId, ciWorkflowId int, userId int32) error {
8787
// Validate required parameters
88-
if pipelineId <= 0 {
89-
impl.logger.Errorw("invalid pipelineId provided", "pipelineId", pipelineId)
90-
return fmt.Errorf("invalid pipelineId: %d", pipelineId)
91-
}
92-
9388
if ciWorkflowId <= 0 {
9489
impl.logger.Errorw("invalid ciWorkflowId provided", "ciWorkflowId", ciWorkflowId)
9590
return fmt.Errorf("invalid ciWorkflowId: %d", ciWorkflowId)
9691
}
97-
ciPipeline, err := impl.ciPipelineRepository.FindOneWithAppData(pipelineId)
98-
if err != nil {
99-
impl.logger.Errorw("error in fetching ci pipeline for appId", "ciPipelineId", pipelineId, "err", err)
100-
return err
101-
}
102-
appId := ciPipeline.AppId
103-
104-
// Check if entry exists
105-
existingEntry, err := impl.workflowStatusLatestRepository.GetCiWorkflowStatusLatestByPipelineId(pipelineId)
106-
if err != nil && !util2.IsErrNoRows(err) {
107-
impl.logger.Errorw("error in getting ci workflow status latest", "err", err, "pipelineId", pipelineId)
108-
return err
109-
}
11092

11193
now := time.Now()
112-
if util2.IsErrNoRows(err) {
113-
// Create new entry
114-
model := &pipelineConfig.CiWorkflowStatusLatest{
115-
PipelineId: pipelineId,
116-
AppId: appId,
117-
CiWorkflowId: ciWorkflowId,
118-
}
119-
model.CreatedBy = userId
120-
model.CreatedOn = now
121-
model.UpdatedBy = userId
122-
model.UpdatedOn = now
123-
124-
return impl.workflowStatusLatestRepository.SaveCiWorkflowStatusLatest(tx, model)
125-
} else {
126-
// Update existing entry
127-
existingEntry.CiWorkflowId = ciWorkflowId
128-
existingEntry.UpdatedBy = userId
129-
existingEntry.UpdatedOn = now
130-
131-
return impl.workflowStatusLatestRepository.UpdateCiWorkflowStatusLatest(tx, existingEntry)
94+
model := &pipelineConfig.CiWorkflowStatusLatest{
95+
PipelineId: pipelineId,
96+
AppId: appId,
97+
CiWorkflowId: ciWorkflowId,
13298
}
99+
model.CreatedBy = userId
100+
model.CreatedOn = now
101+
model.UpdatedBy = userId
102+
model.UpdatedOn = now
103+
104+
return impl.workflowStatusLatestRepository.SaveCiWorkflowStatusLatest(tx, model)
133105
}
134106

135107
func (impl *WorkflowStatusLatestServiceImpl) GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error) {
@@ -189,91 +161,26 @@ func (impl *WorkflowStatusLatestServiceImpl) GetCiWorkflowStatusLatestByAppId(ap
189161
// CD Workflow Status Latest methods implementation
190162
func (impl *WorkflowStatusLatestServiceImpl) SaveCdWorkflowStatusLatest(tx *pg.Tx, pipelineId, appId, environmentId, workflowRunnerId int, workflowType string, userId int32) error {
191163
// Validate required parameters
192-
if pipelineId <= 0 {
193-
impl.logger.Errorw("invalid pipelineId provided", "pipelineId", pipelineId)
194-
return fmt.Errorf("invalid pipelineId: %d", pipelineId)
195-
}
196-
197164
if workflowRunnerId <= 0 {
198165
impl.logger.Errorw("invalid workflowRunnerId provided", "workflowRunnerId", workflowRunnerId)
199166
return fmt.Errorf("invalid workflowRunnerId: %d", workflowRunnerId)
200167
}
201168

202-
// If appId or environmentId is not provided (0), fetch them from the CdWorkflow
203-
if appId <= 0 || environmentId <= 0 {
204-
cdWorkflowRunner, err := impl.cdWorkflowRepository.FindBasicWorkflowRunnerById(workflowRunnerId)
205-
if err != nil {
206-
impl.logger.Errorw("error in fetching cd workflow runner to get appId/environmentId", "err", err, "workflowRunnerId", workflowRunnerId)
207-
return err
208-
}
209-
210-
if cdWorkflowRunner == nil {
211-
impl.logger.Errorw("cd workflow runner not found", "workflowRunnerId", workflowRunnerId)
212-
return fmt.Errorf("cd workflow runner not found with id: %d", workflowRunnerId)
213-
}
214-
215-
// Check if CdWorkflow is loaded
216-
if cdWorkflowRunner.CdWorkflow == nil {
217-
impl.logger.Errorw("cd workflow not loaded in cd workflow runner", "workflowRunnerId", workflowRunnerId, "cdWorkflowId", cdWorkflowRunner.CdWorkflowId)
218-
return fmt.Errorf("cd workflow not loaded for workflow runner id: %d", workflowRunnerId)
219-
}
220-
221-
// Check if Pipeline is loaded
222-
if cdWorkflowRunner.CdWorkflow.Pipeline == nil {
223-
impl.logger.Errorw("pipeline not loaded in cd workflow", "workflowRunnerId", workflowRunnerId, "cdWorkflowId", cdWorkflowRunner.CdWorkflowId, "pipelineId", cdWorkflowRunner.CdWorkflow.PipelineId)
224-
return fmt.Errorf("pipeline not loaded for workflow runner id: %d", workflowRunnerId)
225-
}
226-
227-
if appId <= 0 {
228-
appId = cdWorkflowRunner.CdWorkflow.Pipeline.AppId
229-
if appId <= 0 {
230-
impl.logger.Errorw("invalid appId in pipeline", "workflowRunnerId", workflowRunnerId, "pipelineId", cdWorkflowRunner.CdWorkflow.PipelineId, "appId", appId)
231-
return fmt.Errorf("invalid appId in pipeline: %d", appId)
232-
}
233-
impl.logger.Debugw("fetched appId from cd workflow runner", "workflowRunnerId", workflowRunnerId, "appId", appId)
234-
}
235-
236-
if environmentId <= 0 {
237-
environmentId = cdWorkflowRunner.CdWorkflow.Pipeline.EnvironmentId
238-
if environmentId <= 0 {
239-
impl.logger.Errorw("invalid environmentId in pipeline", "workflowRunnerId", workflowRunnerId, "pipelineId", cdWorkflowRunner.CdWorkflow.PipelineId, "environmentId", environmentId)
240-
return fmt.Errorf("invalid environmentId in pipeline: %d", environmentId)
241-
}
242-
impl.logger.Debugw("fetched environmentId from cd workflow runner", "workflowRunnerId", workflowRunnerId, "environmentId", environmentId)
243-
}
244-
}
245-
246-
// Check if entry exists
247-
existingEntry, err := impl.workflowStatusLatestRepository.GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx, pipelineId, workflowType)
248-
if err != nil && err != pg.ErrNoRows {
249-
impl.logger.Errorw("error in getting cd workflow status latest", "err", err, "pipelineId", pipelineId, "workflowType", workflowType)
250-
return err
251-
}
252-
169+
// Create new entry (always save, don't update)
253170
now := time.Now()
254-
if err == pg.ErrNoRows {
255-
// Create new entry
256-
model := &pipelineConfig.CdWorkflowStatusLatest{
257-
PipelineId: pipelineId,
258-
AppId: appId,
259-
EnvironmentId: environmentId,
260-
WorkflowType: workflowType,
261-
WorkflowRunnerId: workflowRunnerId,
262-
}
263-
model.CreatedBy = userId
264-
model.CreatedOn = now
265-
model.UpdatedBy = userId
266-
model.UpdatedOn = now
267-
268-
return impl.workflowStatusLatestRepository.SaveCdWorkflowStatusLatest(tx, model)
269-
} else {
270-
// Update existing entry
271-
existingEntry.WorkflowRunnerId = workflowRunnerId
272-
existingEntry.UpdatedBy = userId
273-
existingEntry.UpdatedOn = now
274-
275-
return impl.workflowStatusLatestRepository.UpdateCdWorkflowStatusLatest(tx, existingEntry)
171+
model := &pipelineConfig.CdWorkflowStatusLatest{
172+
PipelineId: pipelineId,
173+
AppId: appId,
174+
EnvironmentId: environmentId,
175+
WorkflowType: workflowType,
176+
WorkflowRunnerId: workflowRunnerId,
276177
}
178+
model.CreatedBy = userId
179+
model.CreatedOn = now
180+
model.UpdatedBy = userId
181+
model.UpdatedOn = now
182+
183+
return impl.workflowStatusLatestRepository.SaveCdWorkflowStatusLatest(tx, model)
277184
}
278185

279186
func (impl *WorkflowStatusLatestServiceImpl) GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) {

0 commit comments

Comments
 (0)