Skip to content

Commit 61a1c74

Browse files
authored
opt (#6747)
* opt * nil handling * removed * removed
1 parent 8cd66fe commit 61a1c74

File tree

9 files changed

+213
-38
lines changed

9 files changed

+213
-38
lines changed

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
469469
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
470470
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
471471
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
472+
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
472473
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
473474
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
474475
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

internal/sql/repository/pipelineConfig/WorkflowStatusLatestRepository.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import (
2525

2626
type WorkflowStatusLatestRepository interface {
2727
// CI Workflow Status Latest methods
28-
SaveCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error
29-
UpdateCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error
28+
SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error
29+
UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error
3030
GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error)
3131
GetCiWorkflowStatusLatestByAppId(appId int) ([]*CiWorkflowStatusLatest, error)
3232
GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error)
@@ -77,17 +77,29 @@ type CdWorkflowStatusLatest struct {
7777
}
7878

7979
// CI Workflow Status Latest methods implementation
80-
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error {
81-
err := impl.dbConnection.Insert(model)
80+
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error {
81+
var connection orm.DB
82+
if tx != nil {
83+
connection = tx
84+
} else {
85+
connection = impl.dbConnection
86+
}
87+
err := connection.Insert(model)
8288
if err != nil {
8389
impl.logger.Errorw("error in saving ci workflow status latest", "err", err, "model", model)
8490
return err
8591
}
8692
return nil
8793
}
8894

89-
func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error {
90-
_, err := impl.dbConnection.Model(model).WherePK().UpdateNotNull()
95+
func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error {
96+
var connection orm.DB
97+
if tx != nil {
98+
connection = tx
99+
} else {
100+
connection = impl.dbConnection
101+
}
102+
_, err := connection.Model(model).WherePK().UpdateNotNull()
91103
if err != nil {
92104
impl.logger.Errorw("error in updating ci workflow status latest", "err", err, "model", model)
93105
return err

pkg/pipeline/CiHandler.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/devtron-labs/devtron/pkg/pipeline/adapter"
3131
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
3232
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
33+
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
3334
"regexp"
3435
"slices"
3536
"strconv"
@@ -99,6 +100,7 @@ type CiHandlerImpl struct {
99100
k8sCommonService k8sPkg.K8sCommonService
100101
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService
101102
workflowStatusLatestRepository pipelineConfig.WorkflowStatusLatestRepository
103+
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
102104
}
103105

104106
func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository, gitSensorClient gitSensor.Client, ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
@@ -107,6 +109,7 @@ func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipeline
107109
imageTaggingService imageTagging.ImageTaggingService, k8sCommonService k8sPkg.K8sCommonService, appWorkflowRepository appWorkflow.AppWorkflowRepository, customTagService CustomTagService,
108110
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService,
109111
workflowStatusLatestRepository pipelineConfig.WorkflowStatusLatestRepository,
112+
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService,
110113
) *CiHandlerImpl {
111114
cih := &CiHandlerImpl{
112115
Logger: Logger,
@@ -130,6 +133,7 @@ func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipeline
130133
k8sCommonService: k8sCommonService,
131134
workFlowStageStatusService: workFlowStageStatusService,
132135
workflowStatusLatestRepository: workflowStatusLatestRepository,
136+
workflowStatusUpdateService: workflowStatusUpdateService,
133137
}
134138
config, err := types.GetCiConfig()
135139
if err != nil {
@@ -601,6 +605,19 @@ func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus eventProcessorBean.CiCd
601605
impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id))
602606
return savedWorkflow.Id, true, err
603607
}
608+
609+
// Update latest status table for CI workflow
610+
// Check if CiPipeline is loaded, if not pass 0 as appId to let the function fetch it
611+
appId := 0
612+
if savedWorkflow.CiPipeline != nil {
613+
appId = savedWorkflow.CiPipeline.AppId
614+
}
615+
err = impl.workflowStatusUpdateService.UpdateCiWorkflowStatusLatest(nil, savedWorkflow.CiPipelineId, appId, savedWorkflow.Id, savedWorkflow.TriggeredBy)
616+
if err != nil {
617+
impl.Logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", savedWorkflow.CiPipelineId, "workflowId", savedWorkflow.Id)
618+
// Don't return error here as the main workflow update was successful
619+
}
620+
604621
impl.sendCIFailEvent(savedWorkflow, status, message)
605622
return savedWorkflow.Id, true, nil
606623
}

pkg/pipeline/CiService.go

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/devtron-labs/devtron/pkg/pipeline/types"
2727
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
2828
"github.com/devtron-labs/devtron/pkg/sql"
29+
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
2930
util3 "github.com/devtron-labs/devtron/util"
3031
util2 "github.com/devtron-labs/devtron/util/event"
3132
"go.uber.org/zap"
@@ -39,28 +40,31 @@ type CiService interface {
3940
}
4041

4142
type CiServiceImpl struct {
42-
Logger *zap.SugaredLogger
43-
workflowStageStatusService workflowStatus.WorkFlowStageStatusService
44-
eventClient client.EventClient
45-
eventFactory client.EventFactory
46-
config *types.CiConfig
47-
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
48-
transactionManager sql.TransactionWrapper
43+
Logger *zap.SugaredLogger
44+
workflowStageStatusService workflowStatus.WorkFlowStageStatusService
45+
eventClient client.EventClient
46+
eventFactory client.EventFactory
47+
config *types.CiConfig
48+
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
49+
transactionManager sql.TransactionWrapper
50+
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
4951
}
5052

5153
func NewCiServiceImpl(Logger *zap.SugaredLogger,
5254
workflowStageStatusService workflowStatus.WorkFlowStageStatusService, eventClient client.EventClient,
5355
eventFactory client.EventFactory,
5456
ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
5557
transactionManager sql.TransactionWrapper,
58+
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService,
5659
) *CiServiceImpl {
5760
cis := &CiServiceImpl{
58-
Logger: Logger,
59-
workflowStageStatusService: workflowStageStatusService,
60-
eventClient: eventClient,
61-
eventFactory: eventFactory,
62-
ciWorkflowRepository: ciWorkflowRepository,
63-
transactionManager: transactionManager,
61+
Logger: Logger,
62+
workflowStageStatusService: workflowStageStatusService,
63+
eventClient: eventClient,
64+
eventFactory: eventFactory,
65+
ciWorkflowRepository: ciWorkflowRepository,
66+
transactionManager: transactionManager,
67+
workflowStatusUpdateService: workflowStatusUpdateService,
6468
}
6569
config, err := types.GetCiConfig()
6670
if err != nil {
@@ -131,11 +135,24 @@ func (impl *CiServiceImpl) SaveCiWorkflowWithStage(wf *pipelineConfig.CiWorkflow
131135
return err
132136
}
133137

138+
// Update latest status table for CI workflow within the transaction
139+
// Check if CiPipeline is loaded, if not pass 0 as appId to let the function fetch it
140+
appId := 0
141+
if wf.CiPipeline != nil {
142+
appId = wf.CiPipeline.AppId
143+
}
144+
err = impl.workflowStatusUpdateService.UpdateCiWorkflowStatusLatest(tx, wf.CiPipelineId, appId, wf.Id, wf.TriggeredBy)
145+
if err != nil {
146+
impl.Logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", wf.CiPipelineId, "workflowId", wf.Id)
147+
return err
148+
}
149+
134150
err = impl.transactionManager.CommitTx(tx)
135151
if err != nil {
136152
impl.Logger.Errorw("error in committing transaction", "workflowName", wf.Name, "error", err)
137153
return err
138154
}
155+
139156
return nil
140157

141158
}
@@ -167,11 +184,24 @@ func (impl *CiServiceImpl) UpdateCiWorkflowWithStage(wf *pipelineConfig.CiWorkfl
167184
return err
168185
}
169186

187+
// Update latest status table for CI workflow within the transaction
188+
// Check if CiPipeline is loaded, if not pass 0 as appId to let the function fetch it
189+
appId := 0
190+
if wf.CiPipeline != nil {
191+
appId = wf.CiPipeline.AppId
192+
}
193+
err = impl.workflowStatusUpdateService.UpdateCiWorkflowStatusLatest(tx, wf.CiPipelineId, appId, wf.Id, wf.TriggeredBy)
194+
if err != nil {
195+
impl.Logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", wf.CiPipelineId, "workflowId", wf.Id)
196+
return err
197+
}
198+
170199
err = impl.transactionManager.CommitTx(tx)
171200
if err != nil {
172201
impl.Logger.Errorw("error in committing transaction", "workflowName", wf.Name, "error", err)
173202
return err
174203
}
204+
175205
return nil
176206

177207
}

pkg/workflow/cd/CdWorkflowRunnerService.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,21 @@ func (impl *CdWorkflowRunnerServiceImpl) UpdateWfr(dto *bean.CdWorkflowRunnerDto
8080
impl.logger.Errorw("error in updating runner status in db", "runnerId", runnerDbObj.Id, "err", err)
8181
return err
8282
}
83+
84+
// Update latest status table for CD workflow
85+
// Check if CdWorkflow and Pipeline are loaded, if not pass 0 as appId/environmentId to let the function fetch them
86+
appId := 0
87+
environmentId := 0
88+
if runnerDbObj.CdWorkflow != nil && runnerDbObj.CdWorkflow.Pipeline != nil {
89+
appId = runnerDbObj.CdWorkflow.Pipeline.AppId
90+
environmentId = runnerDbObj.CdWorkflow.Pipeline.EnvironmentId
91+
}
92+
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(nil, runnerDbObj.CdWorkflow.PipelineId, appId, environmentId, runnerDbObj.Id, runnerDbObj.WorkflowType.String(), int32(updatedBy))
93+
if err != nil {
94+
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "pipelineId", runnerDbObj.CdWorkflow.PipelineId, "workflowRunnerId", runnerDbObj.Id)
95+
// Don't return error here as the main workflow update was successful
96+
}
97+
8398
return nil
8499
}
85100

@@ -169,6 +184,7 @@ func (impl *CdWorkflowRunnerServiceImpl) UpdateCdWorkflowRunnerWithStage(wfr *pi
169184
impl.logger.Errorw("error in committing transaction", "workflowName", wfr.Name, "error", err)
170185
return err
171186
}
187+
172188
return nil
173189

174190
}

pkg/workflow/dag/WorkflowDagExecutor.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import (
2121
"context"
2222
"encoding/json"
2323
"fmt"
24+
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
25+
"net/http"
26+
"strings"
27+
"sync"
28+
"time"
29+
2430
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
2531
"github.com/devtron-labs/common-lib/async"
2632
"github.com/devtron-labs/common-lib/utils"
@@ -71,10 +77,6 @@ import (
7177
util2 "github.com/devtron-labs/devtron/util/event"
7278
errors2 "k8s.io/apimachinery/pkg/api/errors"
7379
"k8s.io/client-go/rest"
74-
"net/http"
75-
"strings"
76-
"sync"
77-
"time"
7880

7981
"github.com/devtron-labs/devtron/api/bean"
8082
"github.com/devtron-labs/devtron/internal/sql/models"
@@ -158,7 +160,8 @@ type WorkflowDagExecutorImpl struct {
158160
workflowService executor.WorkflowService
159161
ciHandlerService trigger.HandlerService
160162
workflowTriggerAuditService auditService.WorkflowTriggerAuditService
161-
fluxApplicationService fluxApplication.FluxApplicationService
163+
fluxApplicationService fluxApplication.FluxApplicationService
164+
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
162165
}
163166

164167
func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pipelineConfig.PipelineRepository,
@@ -195,6 +198,7 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
195198
ciHandlerService trigger.HandlerService,
196199
workflowTriggerAuditService auditService.WorkflowTriggerAuditService,
197200
fluxApplicationService fluxApplication.FluxApplicationService,
201+
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService,
198202
) *WorkflowDagExecutorImpl {
199203
wde := &WorkflowDagExecutorImpl{logger: Logger,
200204
pipelineRepository: pipelineRepository,
@@ -231,6 +235,7 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
231235
ciHandlerService: ciHandlerService,
232236
workflowTriggerAuditService: workflowTriggerAuditService,
233237
fluxApplicationService: fluxApplicationService,
238+
workflowStatusUpdateService: workflowStatusUpdateService,
234239
}
235240
config, err := types.GetCdConfig()
236241
if err != nil {
@@ -939,7 +944,6 @@ func (impl *WorkflowDagExecutorImpl) UpdateCiWorkflowForCiSuccess(request *bean2
939944
impl.logger.Errorw("update wf failed for id ", "err", err)
940945
return err
941946
}
942-
943947
return nil
944948
}
945949

0 commit comments

Comments
 (0)